You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:51:16 UTC

[09/38] qpid-proton git commit: c proactor: improved robustness and testing

c proactor: improved robustness and testing

Added assert self-tests to the libuv.c proactor. The assertions are kept in a
release build to fail fast and aid debugging.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ec70d73d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ec70d73d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ec70d73d

Branch: refs/heads/go1
Commit: ec70d73dd5e5b58eaf64f5e137104fd9d4042e70
Parents: afacb16
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Feb 10 21:44:25 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Feb 10 21:49:59 2017 -0500

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c | 557 ++++++++++++++++++++-----------------
 1 file changed, 306 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ec70d73d/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 42bbfab..6064bd6 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -30,7 +30,10 @@
 #include <proton/transport.h>
 #include <proton/url.h>
 
+/* All asserts are cheap and should remain in a release build for debugability */
+#undef NDEBUG
 #include <assert.h>
+
 #include <stddef.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -58,12 +61,10 @@
   wake-up to be processed in a single thread with no context switches.
 
   Function naming:
-  - on_ - called in leader thread via uv_run().
-  - leader_ - called in leader thread, while processing the leader_q.
-  - owner_ - called in owning thread, leader or worker but not concurrently.
-
-  Note on_ and leader_ functions can call each other, the prefix indicates the
-  path they are most often called on.
+  - on_* - called in leader thread by uv_run().
+  - leader_* - called in leader thread (either leader_q processing or from an on_ function)
+  - worker_* - called in worker thread
+  - *_lh - called with the relevant lock held
 */
 
 const char *COND_NAME = "proactor";
@@ -80,6 +81,13 @@ PN_HANDLE(PN_PROACTOR)
 PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
+/* A psocket (connection or listener) has the following *mutually exclusive* states. */
+typedef enum {
+  ON_WORKER,               /* On worker_q or in use by user code in worker thread  */
+  ON_LEADER,               /* On leader_q or in use the leader loop  */
+  ON_UV                    /* Scheduled for a UV event, or in use by leader thread in on_ handler*/
+} psocket_state_t;
+
 /* common to connection and listener */
 typedef struct psocket_t {
   /* Immutable */
@@ -87,14 +95,16 @@ typedef struct psocket_t {
 
   /* Protected by proactor.lock */
   struct psocket_t* next;
-  void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
+  psocket_state_t state;
+  void (*action)(struct psocket_t*); /* deferred action for leader */
+  void (*wakeup)(struct psocket_t*); /* wakeup action for leader */
 
-  /* Only used by leader */
+  /* Only used by leader when it owns the psocket */
   uv_tcp_t tcp;
-  void (*action)(struct psocket_t*); /* deferred action for leader */
-  bool is_conn:1;
   char host[NI_MAXHOST];
   char port[NI_MAXSERV];
+  bool is_conn;
+
 } psocket_t;
 
 /* Special value for psocket.next pointer when socket is not on any any list. */
@@ -105,11 +115,12 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const ch
   ps->next = &UNLISTED;
   ps->is_conn = is_conn;
   ps->tcp.data = ps;
+  ps->state = ON_WORKER;
 
   /* For platforms that don't know about "amqp" and "amqps" service names. */
-  if (strcmp(port, AMQP_PORT_NAME) == 0)
+  if (port && strcmp(port, AMQP_PORT_NAME) == 0)
     port = AMQP_PORT;
-  else if (strcmp(port, AMQPS_PORT_NAME) == 0)
+  else if (port && strcmp(port, AMQPS_PORT_NAME) == 0)
     port = AMQPS_PORT;
   /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
   strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
@@ -132,24 +143,27 @@ typedef struct pconnection_t {
   uv_timer_t timer;
   uv_write_t write;
   uv_shutdown_t shutdown;
-  size_t writing;
-  bool reading:1;
-  bool server:1;                /* accept, not connect */
+  size_t writing;               /* size of pending write request, 0 if none pending */
+  bool reading;                 /* true if a read request is pending */
+  bool server;                  /* accept, not connect */
 } pconnection_t;
 
 struct pn_listener_t {
   psocket_t psocket;
 
   /* Only used by owner thread */
-  pconnection_t *accepting;     /* accept in progress */
+  pconnection_t *accepting;     /* set in worker, used in UV loop for accept */
   pn_condition_t *condition;
   pn_collector_t *collector;
   pn_event_batch_t batch;
   pn_record_t *attachments;
   void *context;
   size_t backlog;
-};
+  bool closing;                 /* close requested or closed by error */
 
+  /* Only used in leader thread */
+  size_t connections;           /* number of connections waiting to be accepted  */
+};
 
 typedef struct queue { psocket_t *front, *back; } queue;
 
@@ -166,17 +180,16 @@ struct pn_proactor_t {
 
   /* Protected by lock */
   uv_mutex_t lock;
-  queue start_q;
-  queue worker_q;
-  queue leader_q;
+  queue worker_q;               /* psockets ready for work, to be returned via pn_proactor_wait()  */
+  queue leader_q;               /* psockets waiting for attention by the leader thread */
   size_t interrupt;             /* pending interrupts */
   pn_millis_t timeout;
   size_t count;                 /* psocket count */
-  bool inactive:1;
-  bool timeout_request:1;
-  bool timeout_elapsed:1;
-  bool has_leader:1;
-  bool batch_working:1;          /* batch belongs to a worker.  */
+  bool inactive;
+  bool timeout_request;
+  bool timeout_elapsed;
+  bool has_leader;
+  bool batch_working;          /* batch is being processed in a worker thread */
 };
 
 static bool push_lh(queue *q, psocket_t *ps) {
@@ -201,90 +214,46 @@ static psocket_t* pop_lh(queue *q) {
   return ps;
 }
 
-static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
-  return ps->is_conn ? (pconnection_t*)ps : NULL;
-}
-
-static inline pn_listener_t *as_listener(psocket_t* ps) {
-  return ps->is_conn ? NULL: (pn_listener_t*)ps;
-}
-
-/* Put ps on the leader queue for processing. Thread safe. */
-static void to_leader_lh(psocket_t *ps) {
-  push_lh(&ps->proactor->leader_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-}
-
-static void to_leader(psocket_t *ps) {
-  uv_mutex_lock(&ps->proactor->lock);
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Detach from IO and put ps on the worker queue */
-static void leader_to_worker(psocket_t *ps) {
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection_t(ps);
-    /* Don't detach if there are no events yet. */
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      if (pc->writing) {
-        pc->writing  = 0;
-        uv_cancel((uv_req_t*)&pc->write);
-      }
-      if (pc->reading) {
-        pc->reading = false;
-        uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
-      }
-      if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-        uv_timer_stop(&pc->timer);
-      }
-    }
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    uv_read_stop((uv_stream_t*)&l->psocket.tcp);
-  }
-  uv_mutex_lock(&ps->proactor->lock);
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set a deferred action for leader, if not already set. */
-static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (!ps->action) {
+/* Set state and action and push to relevant queue */
+static inline void set_state_lh(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
+  /* Illegal if ps is already listed under a different state */
+  assert(ps->next == &UNLISTED || ps->state == state);
+  ps->state = state;
+  if (action && !ps->action) {
     ps->action = action;
   }
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
+  switch(state) {
+   case ON_LEADER: push_lh(&ps->proactor->leader_q, ps); break;
+   case ON_WORKER: push_lh(&ps->proactor->worker_q, ps); break;
+   case ON_UV:
+    assert(ps->next == &UNLISTED);
+    break;           /* No queue for UV loop */
+  }
 }
 
-/* Owner thread send to worker thread. Set deferred action if not already set. */
-static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
+/* Set state and action, push to queue and notify leader. Thread safe. */
+static void set_state(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
   uv_mutex_lock(&ps->proactor->lock);
-  if (!ps->action) {
-    ps->action = action;
-  }
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
+  set_state_lh(ps, state, action);
+  uv_async_send(&ps->proactor->async);
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
+static inline pconnection_t *as_pconnection(psocket_t* ps) {
+  return ps->is_conn ? (pconnection_t*)ps : NULL;
+}
 
-/* Re-queue for further work */
-static void worker_requeue(psocket_t* ps) {
-  uv_mutex_lock(&ps->proactor->lock);
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-  uv_mutex_unlock(&ps->proactor->lock);
+static inline pn_listener_t *as_listener(psocket_t* ps) {
+  return ps->is_conn ? NULL: (pn_listener_t*)ps;
 }
 
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
+static pconnection_t *new_pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
   pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
-  if (!pc) return NULL;
-  if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
+  if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     return NULL;
   }
   psocket_init(&pc->psocket, p,  true, host, port);
+  pc->write.data = &pc->psocket;
   if (server) {
     pn_transport_set_server(pc->driver.transport);
   }
@@ -319,74 +288,49 @@ static void leader_count(pn_proactor_t *p, int change) {
   uv_mutex_unlock(&p->lock);
 }
 
-/* Free if there are no uv callbacks pending and no events */
-static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      leader_to_worker(&pc->psocket);         /* Return to worker */
-    } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
-      /* All UV requests are finished */
-      pn_connection_driver_destroy(&pc->driver);
-      leader_count(pc->psocket.proactor, -1);
-      free(pc);
-    }
+/* Final close event for a a pconnection_t */
+static void on_close_pconnection_final(uv_handle_t *h) {
+  pconnection_t *pc = (pconnection_t*)h->data;
+  free(pc);
 }
 
-/* Free if there are no uv callbacks pending and no events */
-static void leader_listener_maybe_free(pn_listener_t *l) {
-    if (pn_collector_peek(l->collector)) {
-      leader_to_worker(&l->psocket);         /* Return to worker */
-    } else if (!l->psocket.tcp.data) {
-      pn_condition_free(l->condition);
-      leader_count(l->psocket.proactor, -1);
-      free(l);
-    }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_maybe_free(psocket_t *ps) {
-  if (ps->is_conn) {
-    leader_pconnection_t_maybe_free(as_pconnection_t(ps));
-  } else {
-    leader_listener_maybe_free(as_listener(ps));
-  }
+/* Close event for uv_tcp_t of a pconnection_t */
+static void on_close_pconnection(uv_handle_t *h) {
+  pconnection_t *pc = (pconnection_t*)h->data;
+  assert(pc->psocket.state == ON_UV);
+  leader_count(pc->psocket.proactor, -1);
+  pn_connection_driver_destroy(&pc->driver);
+  uv_timer_stop(&pc->timer);
+  /* Close the timer with the final event to free the pconnection_t */
+  uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
 }
 
-static void on_close(uv_handle_t *h) {
-  psocket_t *ps = (psocket_t*)h->data;
-  h->data = NULL;               /* Mark closed */
-  leader_maybe_free(ps);
+/* Close event for uv_tcp_t of a pn_listener_t */
+static void on_close_listener(uv_handle_t *h) {
+  pn_listener_t *l = (pn_listener_t*)h->data;
+  pn_condition_free(l->condition);
+  free(l);
 }
 
-static void on_shutdown(uv_shutdown_t *shutdown, int err) {
-  psocket_t *ps = (psocket_t*)shutdown->data;
-  shutdown->data = NULL;        /* Mark closed */
-  leader_maybe_free(ps);
+static inline void leader_finished(psocket_t *ps) {
+  set_state(ps, ON_UV, NULL);
+  uv_close((uv_handle_t*)&ps->tcp, ps->is_conn ? on_close_pconnection : on_close_listener);
 }
 
-static inline void leader_close(psocket_t *ps) {
-  if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
-    uv_close((uv_handle_t*)&ps->tcp, on_close);
-  }
-  pconnection_t *pc = as_pconnection_t(ps);
-  if (pc) {
-    pn_connection_driver_close(&pc->driver);
-    if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-      uv_timer_stop(&pc->timer);
-      uv_close((uv_handle_t*)&pc->timer, on_close);
-    }
+static pconnection_t *get_pconnection(pn_connection_t* c) {
+  if (!c) {
+    return NULL;
   }
-  leader_maybe_free(ps);
-}
-
-static pconnection_t *get_pconnection_t(pn_connection_t* c) {
-  if (!c) return NULL;
   pn_record_t *r = pn_connection_attachments(c);
   return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
 }
 
+static void leader_unwatch(psocket_t *ps);
+
 static void leader_error(psocket_t *ps, int err, const char* what) {
+  assert(ps->state != ON_WORKER);
   if (ps->is_conn) {
-    pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+    pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
     pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
                                 what, fixstr(ps->host), fixstr(ps->port),
@@ -398,16 +342,18 @@ static void leader_error(psocket_t *ps, int err, const char* what) {
                         what, fixstr(ps->host), fixstr(ps->port),
                         uv_strerror(err));
     pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+    l->closing = true;
   }
-  leader_to_worker(ps);               /* Worker to handle the error */
+  leader_unwatch(ps);               /* Worker to handle the error */
 }
 
 /* uv-initialization */
 static int leader_init(psocket_t *ps) {
+  ps->state = ON_LEADER;
   leader_count(ps->proactor, +1);
   int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
   if (!err) {
-    pconnection_t *pc = as_pconnection_t(ps);
+    pconnection_t *pc = as_pconnection(ps);
     if (pc) {
       pc->connect.data = ps;
       int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
@@ -422,40 +368,53 @@ static int leader_init(psocket_t *ps) {
   return err;
 }
 
-/* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
+/* Outgoing connection */
+static void on_connect(uv_connect_t *connect, int err) {
+  pconnection_t *pc = (pconnection_t*)connect->data;
+  assert(pc->psocket.state == ON_UV);
   if (!err) {
-    leader_to_worker(&pc->psocket);
+    leader_unwatch(&pc->psocket);
   } else {
-    leader_error(&pc->psocket, err, what);
+    leader_error(&pc->psocket, err, "on connect to");
   }
 }
 
-static void on_connect(uv_connect_t *connect, int err) {
-  leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
-}
-
-static void on_accept(uv_stream_t* server, int err) {
+/* Incoming connection ready to be accepted */
+static void on_connection(uv_stream_t* server, int err) {
+  /* Unlike most on_* functions, this one can be called by the leader thrad when the
+   * listener is ON_WORKER, because there's no way to stop libuv from calling
+   * on_connection() in leader_unwatch().  Just increase a counter and deal with it in the
+   * worker thread.
+   */
   pn_listener_t *l = (pn_listener_t*) server->data;
-  if (err) {
-    leader_error(&l->psocket, err, "on accept");
+  assert(l->psocket.state == ON_UV);
+  if (!err) {
+    ++l->connections;
+    leader_unwatch(&l->psocket);
+  } else {
+    leader_error(&l->psocket, err, "on incoming connection from");
   }
-  pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
-  leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
 }
 
-static void leader_accept(psocket_t *ps) {
-  pn_listener_t * l = as_listener(ps);
+static void leader_accept(pn_listener_t * l) {
+  assert(l->psocket.state == ON_UV);
+  assert(l->accepting);
   pconnection_t *pc = l->accepting;
   l->accepting = NULL;
-  if (pc) {
-    int err = leader_init(&pc->psocket);
-    if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
-    leader_connect_accept(pc, err, "on accept");
+  int err = leader_init(&pc->psocket);
+  if (!err) {
+    err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+  }
+  if (!err) {
+    leader_unwatch(&pc->psocket);
+  } else {
+    leader_error(&pc->psocket, err, "accepting from");
+    leader_error(&l->psocket, err, "accepting from");
   }
 }
 
 static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
+  assert(ps->state == ON_LEADER);
   int err = leader_init(ps);
   struct addrinfo hints = { 0 };
   if (server) hints.ai_flags = AI_PASSIVE;
@@ -466,55 +425,75 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
 }
 
 static void leader_connect(psocket_t *ps) {
-  pconnection_t *pc = as_pconnection_t(ps);
+  assert(ps->state == ON_LEADER);
+  pconnection_t *pc = as_pconnection(ps);
   uv_getaddrinfo_t info;
   int err = leader_resolve(ps, &info, false);
   if (!err) {
     err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
     uv_freeaddrinfo(info.addrinfo);
   }
-  if (err) {
-    leader_error(ps, err, "connect to");
+  if (!err) {
+    ps->state = ON_UV;
+  } else {
+    leader_error(ps, err, "connecting to");
   }
 }
 
 static void leader_listen(psocket_t *ps) {
+  assert(ps->state == ON_LEADER);
   pn_listener_t *l = as_listener(ps);
-   uv_getaddrinfo_t info;
+  uv_getaddrinfo_t info;
   int err = leader_resolve(ps, &info, true);
   if (!err) {
     err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
     uv_freeaddrinfo(info.addrinfo);
   }
-  if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
-  if (err) {
-    leader_error(ps, err, "listen on ");
+  if (!err) {
+    err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
+  }
+  if (!err) {
+    set_state(ps, ON_UV, NULL);
+  } else {
+    leader_error(ps, err, "listening on");
   }
 }
 
-static void on_tick(uv_timer_t *timer) {
-  pconnection_t *pc = (pconnection_t*)timer->data;
+/* Generate tick events and return millis till next tick or 0 if no tick is required */
+static pn_millis_t leader_tick(pconnection_t *pc) {
+  assert(pc->psocket.state != ON_WORKER);
   pn_transport_t *t = pc->driver.transport;
   if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
-    uv_timer_stop(&pc->timer);
     uint64_t now = uv_now(pc->timer.loop);
     uint64_t next = pn_transport_tick(t, now);
-    if (next) {
-      uv_timer_start(&pc->timer, on_tick, next - now, 0);
-    }
+    return next ? next - now : 0;
+  }
+  return 0;
+}
+
+static void on_tick(uv_timer_t *timer) {
+  if (!timer->data) return;     /* timer closed */
+  pconnection_t *pc = (pconnection_t*)timer->data;
+  assert(pc->psocket.state == ON_UV);
+  uv_timer_stop(&pc->timer);
+  pn_millis_t next = leader_tick(pc);
+  if (pn_connection_driver_has_event(&pc->driver)) {
+    leader_unwatch(&pc->psocket);
+  } else if (next) {
+    uv_timer_start(&pc->timer, on_tick, next, 0);
   }
 }
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
+  assert(pc->psocket.state == ON_UV);
   if (nread >= 0) {
     pn_connection_driver_read_done(&pc->driver, nread);
     on_tick(&pc->timer);         /* check for tick changes. */
-    leader_to_worker(&pc->psocket);
     /* Reading continues automatically until stopped. */
   } else if (nread == UV_EOF) { /* hangup */
     pn_connection_driver_read_close(&pc->driver);
-    leader_maybe_free(&pc->psocket);
+    leader_unwatch(&pc->psocket);
   } else {
     leader_error(&pc->psocket, nread, "on read from");
   }
@@ -522,16 +501,17 @@ static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
 
 static void on_write(uv_write_t* write, int err) {
   pconnection_t *pc = (pconnection_t*)write->data;
-  write->data = NULL;
+  assert(pc->psocket.state == ON_UV);
+  size_t writing = pc->writing;
+  pc->writing = 0;              /* This write is done regardless of outcome */
   if (err == 0) {
-    pn_connection_driver_write_done(&pc->driver, pc->writing);
-    leader_to_worker(&pc->psocket);
+    pn_connection_driver_write_done(&pc->driver, writing);
+    leader_unwatch(&pc->psocket);
   } else if (err == UV_ECANCELED) {
-    leader_maybe_free(&pc->psocket);
+    leader_unwatch(&pc->psocket);    /* cancelled by leader_unwatch, complete the job */
   } else {
     leader_error(&pc->psocket, err, "on write to");
   }
-  pc->writing = 0;              /* Need to send a new write request */
 }
 
 static void on_timeout(uv_timer_t *timer) {
@@ -544,47 +524,93 @@ static void on_timeout(uv_timer_t *timer) {
 // Read buffer allocation function for uv, just returns the transports read buffer.
 static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
+  assert(pc->psocket.state == ON_UV);
   pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
   *buf = uv_buf_init(rbuf.start, rbuf.size);
 }
 
-static void leader_rewatch(psocket_t *ps) {
+/* Monitor a socket in the UV loop */
+static void leader_watch(psocket_t *ps) {
+  assert(ps->state == ON_LEADER);
   int err = 0;
+  set_state(ps, ON_UV, NULL); /* Assume we are going to UV loop unless sent to worker or leader. */
+
   if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection_t(ps);
-    if (pc->timer.data) {         /* uv-initialized */
-      on_tick(&pc->timer);        /* Re-enable ticks if required */
+    pconnection_t *pc = as_pconnection(ps);
+    if (pn_connection_driver_finished(&pc->driver)) {
+      leader_finished(ps);
+      return;
     }
+    pn_millis_t next_tick = leader_tick(pc);
     pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
     pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-
-    /* Ticks and checking buffers can generate events, process before proceeding */
     if (pn_connection_driver_has_event(&pc->driver)) {
-      leader_to_worker(ps);
-    } else {                      /* Re-watch for IO */
-      if (wbuf.size > 0 && !pc->writing) {
-        pc->writing = wbuf.size;
-        uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-        pc->write.data = ps;
-        uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-      } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
-        pc->shutdown.data = ps;
-        uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
-      }
-      if (rbuf.size > 0 && !pc->reading) {
-        pc->reading = true;
-        err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
-      }
+      /* Ticks and checking buffers have generated events, send back to worker to process */
+      set_state(ps, ON_WORKER, NULL);
+      return;
+    }
+    if (next_tick) {
+      uv_timer_start(&pc->timer, on_tick, next_tick, 0);
+    }
+    if (wbuf.size > 0 && !pc->writing) {
+      pc->writing = wbuf.size;
+      uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+      err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+    } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+      pc->shutdown.data = ps;
+      err = uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+    }
+    if (rbuf.size > 0 && !pc->reading) {
+      pc->reading = true;
+      err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
     }
   } else {
     pn_listener_t *l = as_listener(ps);
-    err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+    if (l->closing && pn_collector_peek(l->collector)) {
+        leader_finished(&l->psocket);
+    } else {
+      if (l->accepting) {
+        leader_accept(l);
+      }
+      if (l->connections) {
+        leader_unwatch(ps);
+      }
+    }
   }
   if (err) {
-    leader_error(ps, err, "rewatch");
+    leader_error(ps, err, "re-watching");
   }
 }
 
+/* Detach a socket from IO and put it on the worker queue */
+static void leader_unwatch(psocket_t *ps) {
+  assert(ps->state != ON_WORKER); /* From ON_UV or ON_LEADER */
+  if (ps->is_conn) {
+    pconnection_t *pc = as_pconnection(ps);
+    if (!pn_connection_driver_has_event(&pc->driver)) {
+      /* Don't return an empty event batch */
+      if (ps->state == ON_UV) {
+        return;                 /* Just leave it in the UV loop */
+      } else {
+        leader_watch(ps);     /* Re-attach to UV loop */
+      }
+      return;
+    } else {
+      if (pc->writing) {
+        uv_cancel((uv_req_t*)&pc->write);
+      }
+      if (pc->reading) {
+        pc->reading = false;
+        uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+      }
+      if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+        uv_timer_stop(&pc->timer);
+      }
+    }
+  }
+  set_state(ps, ON_WORKER, NULL);
+}
+
 /* Set the event in the proactor's batch  */
 static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
   pn_collector_put(p->collector, pn_proactor__class(), p, t);
@@ -609,23 +635,32 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
     }
   }
   for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
+    assert(ps->state == ON_WORKER);
     if (ps->is_conn) {
-      pconnection_t *pc = as_pconnection_t(ps);
+      pconnection_t *pc = as_pconnection(ps);
       return &pc->driver.batch;
     } else {                    /* Listener */
       pn_listener_t *l = as_listener(ps);
+      /* Generate accept events one at a time */
+      if (l->connections && !pn_collector_peek(l->collector)) {
+        --l->connections;
+        pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+      }
       return &l->batch;
     }
-    to_leader(ps);      /* No event, back to leader */
+    set_state_lh(ps, ON_LEADER, NULL); /* No event, back to leader */
   }
   return 0;
 }
 
-/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
+/* Called in any thread to set a wakeup action */
 static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
   uv_mutex_lock(&ps->proactor->lock);
-  ps->wakeup = action;
-  to_leader_lh(ps);
+  if (action && !ps->wakeup) {
+    ps->wakeup = action;
+  }
+  set_state_lh(ps, ON_LEADER, NULL);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
@@ -634,30 +669,36 @@ pn_listener_t *pn_event_listener(pn_event_t *e) {
 }
 
 pn_proactor_t *pn_event_proactor(pn_event_t *e) {
-  if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+  if (pn_event_class(e) == pn_proactor__class()) {
+    return (pn_proactor_t*)pn_event_context(e);
+  }
   pn_listener_t *l = pn_event_listener(e);
-  if (l) return l->psocket.proactor;
+  if (l) {
+    return l->psocket.proactor;
+  }
   pn_connection_t *c = pn_event_connection(e);
-  if (c) return pn_connection_proactor(pn_event_connection(e));
+  if (c) {
+    return pn_connection_proactor(pn_event_connection(e));
+  }
   return NULL;
 }
 
 void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
   if (pc) {
+    assert(pc->psocket.state == ON_WORKER);
     if (pn_connection_driver_has_event(&pc->driver)) {
-      /* Process all events before going back to IO. */
-      worker_requeue(&pc->psocket);
-    } else if (pn_connection_driver_finished(&pc->driver)) {
-      owner_to_leader(&pc->psocket, leader_close);
+      /* Process all events before going back to leader */
+      set_state(&pc->psocket, ON_WORKER, NULL);
     } else {
-      owner_to_leader(&pc->psocket, leader_rewatch);
+      set_state(&pc->psocket, ON_LEADER, leader_watch);
     }
     return;
   }
   pn_listener_t *l = batch_listener(batch);
   if (l) {
-    owner_to_leader(&l->psocket, leader_rewatch);
+    assert(l->psocket.state == ON_WORKER);
+    set_state(&l->psocket, ON_LEADER, leader_watch);
     return;
   }
   pn_proactor_t *bp = batch_proactor(batch);
@@ -692,14 +733,16 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
         }
       }
       for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
-        void (*action)(psocket_t*) = ps->action;
-        void (*wakeup)(psocket_t*) = ps->wakeup;
-        ps->action = NULL;
-        ps->wakeup = NULL;
-        if (action || wakeup) {
+        assert(ps->state == ON_LEADER);
+        if (ps->wakeup) {
+          uv_mutex_unlock(&p->lock);
+          ps->wakeup(ps);
+          ps->wakeup = NULL;
+          uv_mutex_lock(&p->lock);
+        } else if (ps->action) {
           uv_mutex_unlock(&p->lock);
-          if (action) action(ps);
-          if (wakeup) wakeup(ps);
+          ps->action(ps);
+          ps->action = NULL;
           uv_mutex_lock(&p->lock);
         }
       }
@@ -734,12 +777,11 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
-  pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
+  pconnection_t *pc = new_pconnection(p, c, false, host, port);
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
-  /* Process PN_CONNECTION_INIT before binding */
-  owner_to_worker(&pc->psocket, leader_connect);
+  set_state(&pc->psocket, ON_LEADER, leader_connect);
   return 0;
 }
 
@@ -747,24 +789,26 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, con
 {
   psocket_init(&l->psocket, p, false, host, port);
   l->backlog = backlog;
-  owner_to_leader(&l->psocket, leader_listen);
+  set_state(&l->psocket, ON_LEADER, leader_listen);
   return 0;
 }
 
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
-  pconnection_t *pc = get_pconnection_t(c);
+  pconnection_t *pc = get_pconnection(c);
   return pc ? pc->psocket.proactor : NULL;
 }
 
 void leader_wake_connection(psocket_t *ps) {
-  pconnection_t *pc = as_pconnection_t(ps);
+  assert(ps->state == ON_LEADER);
+  pconnection_t *pc = as_pconnection(ps);
   pn_connection_t *c = pc->driver.connection;
   pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-  leader_to_worker(ps);
+  leader_unwatch(ps);
 }
 
 void pn_connection_wake(pn_connection_t* c) {
-  wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
+  /* May be called from any thread */
+  wakeup(&get_pconnection(c)->psocket, leader_wake_connection);
 }
 
 pn_proactor_t *pn_proactor() {
@@ -782,9 +826,11 @@ pn_proactor_t *pn_proactor() {
 }
 
 static void on_stopping(uv_handle_t* h, void* v) {
-  uv_close(h, NULL);           /* Close this handle */
+  if (!uv_is_closing(h)) {
+    uv_close(h, NULL);           /* Close this handle */
+  }
   if (!uv_loop_alive(h->loop)) /* Everything closed */
-    uv_stop(h->loop);        /* Stop the loop, pn_proactor_destroy() can return */
+    uv_stop(h->loop);          /* Stop the loop, pn_proactor_destroy() can return */
 }
 
 void pn_proactor_free(pn_proactor_t *p) {
@@ -799,10 +845,7 @@ void pn_proactor_free(pn_proactor_t *p) {
 
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
-  pn_event_t *handled = pn_collector_prev(l->collector);
-  if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
-    owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
-  }
+  assert(l->psocket.state == ON_WORKER);
   return pn_collector_next(l->collector);
 }
 
@@ -811,6 +854,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
 }
 
 static void pn_listener_free(pn_listener_t *l) {
+  assert(l->psocket.state == ON_WORKER);
   if (l) {
     if (!l->collector) pn_collector_free(l->collector);
     if (!l->condition) pn_condition_free(l->condition);
@@ -834,40 +878,51 @@ pn_listener_t *pn_listener() {
   return l;
 }
 
+void leader_listener_close(psocket_t *ps) {
+  assert(ps->state = ON_LEADER);
+  pn_listener_t *l = (pn_listener_t*)ps;
+  l->closing = true;
+  leader_watch(ps);
+}
+
 void pn_listener_close(pn_listener_t* l) {
-  wakeup(&l->psocket, leader_close);
+  /* This can be called from any thread, not just the owner of l */
+  wakeup(&l->psocket, leader_listener_close);
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+  assert(l->psocket.state == ON_WORKER);
   return l ? l->psocket.proactor : NULL;
 }
 
 pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+  assert(l->psocket.state == ON_WORKER);
   return l->condition;
 }
 
 void *pn_listener_get_context(pn_listener_t *l) {
+  assert(l->psocket.state == ON_WORKER);
   return l->context;
 }
 
 void pn_listener_set_context(pn_listener_t *l, void *context) {
+  assert(l->psocket.state == ON_WORKER);
   l->context = context;
 }
 
 pn_record_t *pn_listener_attachments(pn_listener_t *l) {
+  assert(l->psocket.state == ON_WORKER);
   return l->attachments;
 }
 
 int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+  assert(l->psocket.state == ON_WORKER);
   if (l->accepting) {
     return PN_STATE_ERR;        /* Only one at a time */
   }
-  l->accepting = new_pconnection_t(
-      l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+  l->accepting = new_pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
   if (!l->accepting) {
     return UV_ENOMEM;
   }
-  owner_to_leader(&l->psocket, leader_accept);
   return 0;
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org