You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/27 18:04:51 UTC

[3/6] qpid-proton git commit: PROTON-1403: c libuv proactor fixes and tests

PROTON-1403: c libuv proactor fixes and tests

- c-proactor-tests run to transport closed
- c-proactor-tests allows running selected tests
- c-proactor-tests added more tests
- fix casting error, race conditions
- re-orginaize get/wait loops
- log proactor and listener events via PN_TRACE_LOG

exampletest.py grep for "listening" in output instead of connect retry:
faster than dummy connects and avoids confusing "no protocol header" errors.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/85585202
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/85585202
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/85585202

Branch: refs/heads/master
Commit: 855852029e1b4d8b5d4c939f5b9c616808701813
Parents: 54e8099
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Feb 21 10:55:32 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 11:32:13 2017 -0500

----------------------------------------------------------------------
 examples/c/proactor/broker.c    |   1 +
 examples/c/proactor/direct.c    |   1 +
 examples/c/proactor/test.py     |  17 +-
 examples/exampletest.py         |  38 ++--
 proton-c/src/proactor/libuv.c   | 331 ++++++++++++++++++-----------------
 proton-c/src/tests/proactor.c   |  72 ++++++--
 proton-c/src/tests/test_tools.h |  13 ++
 7 files changed, 261 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index a7d8863..370d2e8 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -472,6 +472,7 @@ int main(int argc, char **argv) {
 
   pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
   printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
+  fflush(stdout);
 
   if (url) pn_url_free(url);
   if (b.threads <= 0) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index de2d6aa..126ae4f 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -344,6 +344,7 @@ int main(int argc, char **argv) {
   app.proactor = pn_proactor();
   pn_proactor_listen(app.proactor, pn_listener(), host, port, 16);
   printf("listening on '%s:%s'\n", host, port);
+  fflush(stdout);
   if (url) pn_url_free(url);
 
   do {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 7d6f3fc..f6a6562 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -53,23 +53,13 @@ class CExampleTest(BrokerTestCase):
         r = self.proc(["receive", "-a", self.addr, "-m3"])
         self.assertEqual(receive_expect(3), r.wait_out())
 
-    def retry(self, args, max=10):
-        """Run until output does not contain "connection refused", up to max retries"""
-        while True:
-            try:
-                return self.proc(args).wait_out()
-            except ProcError, e:
-                if "connection refused" in e.args[0] and max > 0:
-                    max -= 1
-                    continue
-                raise
-
     def test_send_direct(self):
         """Send to direct server"""
         with bind0() as sock:
             addr = "127.0.0.1:%s/examples" % sock.port()
             d = self.proc(["direct", "-a", addr])
-            self.assertEqual("100 messages sent and acknowledged\n", self.retry(["send", "-a", addr]))
+            d.wait_re("listening")
+            self.assertEqual("100 messages sent and acknowledged\n", self.proc(["send", "-a", addr]).wait_out())
             self.assertIn(receive_expect(100), d.wait_out())
 
     def test_receive_direct(self):
@@ -77,7 +67,8 @@ class CExampleTest(BrokerTestCase):
         with bind0() as sock:
             addr = "127.0.0.1:%s/examples" % sock.port()
             d = self.proc(["direct", "-a", addr])
-            self.assertEqual(receive_expect(100), self.retry(["receive", "-a", addr]))
+            d.wait_re("listenin")
+            self.assertEqual(receive_expect(100), self.proc(["receive", "-a", addr]).wait_out())
             self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
index 8dbd1ed..546b426 100644
--- a/examples/exampletest.py
+++ b/examples/exampletest.py
@@ -104,6 +104,20 @@ class Proc(Popen):
             raise ProcError(self)
         return self.out
 
+    def wait_re(self, regexp, timeout=10):
+        """
+        Wait for regexp to appear in the output, returns the re.search match result.
+        The target process should flush() important output to ensure it appears.
+        """
+        if timeout:
+            deadline = time.time() + timeout
+        while timeout is None or time.time() < deadline:
+            match = re.search(regexp, self.out)
+            if match:
+                return match
+            time.sleep(0.01)    # Not very efficient
+        raise ProcError(self, "gave up waiting for '%s' after %ss" % (regexp, timeout))
+
 # Work-around older python unittest that lacks setUpClass.
 if hasattr(unittest.TestCase, 'setUpClass') and  hasattr(unittest.TestCase, 'tearDownClass'):
     TestCase = unittest.TestCase
@@ -144,21 +158,6 @@ class ExampleTestCase(TestCase):
         self.procs.append(p)
         return p
 
-def wait_port(port, timeout=10):
-    """Wait up to timeout for port to be connectable."""
-    if timeout:
-        deadline = time.time() + timeout
-    while (timeout is None or time.time() < deadline):
-        try:
-            s = socket.create_connection((None, port), timeout) # Works for IPv6 and v4
-            s.close()
-            return
-        except socket.error, e:
-            if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error.
-                raise
-    raise socket.timeout("waiting for port %s" % port)
-
-
 class BrokerTestCase(ExampleTestCase):
     """
     ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
@@ -171,13 +170,14 @@ class BrokerTestCase(ExampleTestCase):
         cls.port = sock.port()
         cls.addr = "127.0.0.1:%s/examples" % (cls.port)
         cls.broker = None       # In case Proc throws, create the attribute.
-        cls.broker = Proc(cls.broker_exe + ["-a", cls.addr])
+        cls.broker = Proc(cls.broker_exe + ["-a", cls.addr], bufsize=0)
         try:
-            wait_port(cls.port)
+            cls.broker.wait_re("listening")
         except Exception, e:
             cls.broker.kill()
-            raise ProcError(cls.broker, "timed out waiting for port")
-        sock.close()
+            raise
+        finally:
+            sock.close()
 
     @classmethod
     def tearDownClass(cls):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 8e2ff05..322f353 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -44,7 +44,7 @@
 
 /*
   libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
-  "wakeup" that can wake the uv_loop from another thread.
+  call that we use to make uv_run() return.
 
   To provide concurrency proactor uses a "leader-worker-follower" model, threads take
   turns at the roles:
@@ -97,6 +97,9 @@ typedef enum {
 typedef struct psocket_t {
   /* Immutable */
   pn_proactor_t *proactor;
+  char host[NI_MAXHOST];
+  char port[NI_MAXSERV];
+  bool is_conn;
 
   /* Protected by proactor.lock */
   struct psocket_t* next;
@@ -106,9 +109,6 @@ typedef struct psocket_t {
 
   /* Only used by leader thread when it owns the psocket */
   uv_tcp_t tcp;
-  char host[NI_MAXHOST];
-  char port[NI_MAXSERV];
-  bool is_conn;
 } psocket_t;
 
 /* Special value for psocket.next pointer when socket is not on any any list. */
@@ -165,7 +165,6 @@ struct pn_listener_t {
   pn_record_t *attachments;
   void *context;
   size_t backlog;
-  bool closing;                 /* close requested or closed by error */
 
   /* Only used in leader thread */
   size_t connections;           /* number of connections waiting to be accepted  */
@@ -201,15 +200,18 @@ struct pn_proactor_t {
 };
 
 /* Push ps to back of q. Must not be on a different queue */
-static void push_lh(queue *q, psocket_t *ps) {
-  assert(ps->next == &UNLISTED);
-  ps->next = NULL;
-  if (!q->front) {
-    q->front = q->back = ps;
-  } else {
-    q->back->next = ps;
-    q->back =  ps;
+static bool push_lh(queue *q, psocket_t *ps) {
+  if (ps->next == &UNLISTED) {
+    ps->next = NULL;
+    if (!q->front) {
+      q->front = q->back = ps;
+    } else {
+      q->back->next = ps;
+      q->back =  ps;
+    }
+    return true;
   }
+  return false;
 }
 
 /* Pop returns front of q or NULL if empty */
@@ -222,23 +224,33 @@ static psocket_t* pop_lh(queue *q) {
   return ps;
 }
 
+/* Notify the leader thread that there is something to do outside of uv_run() */
+static inline void notify(pn_proactor_t* p) {
+  uv_async_send(&p->async);
+}
+
+static void to_leader_lh(psocket_t *ps) {
+  if (push_lh(&ps->proactor->leader_q, ps)) {
+    ps->state = ON_LEADER;
+  }
+}
+
 /* Queue an action for the leader thread */
 static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
   uv_mutex_lock(&ps->proactor->lock);
   ps->action = action;
-  if (ps->next == &UNLISTED) {
-    ps->state = ON_LEADER;
-    push_lh(&ps->proactor->leader_q, ps);
-  }
+  to_leader_lh(ps);
   uv_mutex_unlock(&ps->proactor->lock);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
+  notify(ps->proactor);
 }
 
 /* Push to the worker thread */
 static void to_worker(psocket_t *ps) {
   uv_mutex_lock(&ps->proactor->lock);
-  ps->state = ON_WORKER;
-  push_lh(&ps->proactor->worker_q, ps);
+  if (push_lh(&ps->proactor->worker_q, ps)) {
+      ps->state = ON_WORKER;
+  }
+  notify(ps->proactor);
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
@@ -260,8 +272,8 @@ static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
     push_lh(&ps->proactor->leader_q, ps);
     ps->state = ON_LEADER;      /* Otherwise notify the leader */
   }
-  uv_mutex_unlock(&ps->proactor->lock);
   uv_async_send(&ps->proactor->async); /* Wake leader */
+  uv_mutex_unlock(&ps->proactor->lock);
 }
 
 static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -311,7 +323,7 @@ static void leader_count(pn_proactor_t *p, int change) {
   p->count += change;
   if (p->count == 0) {
     p->inactive = true;
-    uv_async_send(&p->async); /* Wake leader */
+    notify(p);
   }
   uv_mutex_unlock(&p->lock);
 }
@@ -409,10 +421,8 @@ static int leader_init(psocket_t *ps) {
 /* Outgoing connection */
 static void on_connect(uv_connect_t *connect, int err) {
   pconnection_t *pc = (pconnection_t*)connect->data;
-  if (!err) {
+  if (!pconnection_error(pc, err, "on connect to")) {
     pconnection_to_worker(pc);
-  } else {
-    pconnection_error(pc, err, "on connect to");
   }
 }
 
@@ -552,7 +562,7 @@ static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
 static void pconnection_to_uv(pconnection_t *pc) {
   to_uv(&pc->psocket);          /* Assume we're going to UV unless sent elsewhere */
   if (pn_connection_driver_finished(&pc->driver)) {
-    if (!uv_is_closing((uv_handle_t*)&pc->psocket)) {
+    if (!uv_is_closing((uv_handle_t*)&pc->psocket.tcp)) {
       uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
     }
     return;
@@ -653,8 +663,64 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
   return &p->batch;
 }
 
+void leader_wake_connection(psocket_t *ps) {
+  assert(ps->state == ON_LEADER);
+  pconnection_t *pc = as_pconnection(ps);
+  pn_connection_t *c = pc->driver.connection;
+  pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+  pconnection_to_worker(pc);
+}
+
+static void on_stopping(uv_handle_t* h, void* v) {
+  /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
+  if (h->type == UV_TCP && !uv_is_closing(h)) {
+    uv_close(h, on_close_psocket);
+  }
+}
+
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+  if (e) {
+    pn_logf("[%p]:(%s)\n", (void*)p, pn_event_type_name(pn_event_type(e)));
+  }
+  return e;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+  pn_listener_t *l = batch_listener(batch);
+  assert(l->psocket.state == ON_WORKER);
+  pn_event_t *prev = pn_collector_prev(l->collector);
+  if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
+    l->err = UV_EOF;
+  }
+  return log_event(l, pn_collector_next(l->collector));
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+  pn_proactor_t *p = batch_proactor(batch);
+  assert(p->batch_working);
+  return log_event(p, pn_collector_next(p->collector));
+}
+
+static void pn_listener_free(pn_listener_t *l) {
+  /* No  assert(l->psocket.state == ON_WORKER);  can be called during shutdown */
+  if (l) {
+    if (l->collector) pn_collector_free(l->collector);
+    if (l->condition) pn_condition_free(l->condition);
+    if (l->attachments) pn_free(l->attachments);
+    free(l);
+  }
+}
+
+void leader_listener_close(psocket_t *ps) {
+  assert(ps->state = ON_LEADER);
+  pn_listener_t *l = (pn_listener_t*)ps;
+  l->err = UV_EOF;
+  listener_to_uv(l);
+}
+
 /* Return the next event batch or 0 if no events are available in the worker_q */
-static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
+  /* FIXME aconway 2017-02-21: generate these in parallel? */
   if (!p->batch_working) {       /* Can generate proactor events */
     if (p->inactive) {
       p->inactive = false;
@@ -680,55 +746,9 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
   return 0;
 }
 
-pn_listener_t *pn_event_listener(pn_event_t *e) {
-  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
-}
+/* Process the leader_q and the UV loop, in the leader thread */
+static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
 
-pn_proactor_t *pn_event_proactor(pn_event_t *e) {
-  if (pn_event_class(e) == pn_proactor__class()) {
-    return (pn_proactor_t*)pn_event_context(e);
-  }
-  pn_listener_t *l = pn_event_listener(e);
-  if (l) {
-    return l->psocket.proactor;
-  }
-  pn_connection_t *c = pn_event_connection(e);
-  if (c) {
-    return pn_connection_proactor(pn_event_connection(e));
-  }
-  return NULL;
-}
-
-void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
-  pconnection_t *pc = batch_pconnection(batch);
-  if (pc) {
-    assert(pc->psocket.state == ON_WORKER);
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      /* Process all events before going back to leader */
-      pconnection_to_worker(pc);
-    } else {
-      to_leader(&pc->psocket, psocket_to_uv);
-    }
-    return;
-  }
-  pn_listener_t *l = batch_listener(batch);
-  if (l) {
-    assert(l->psocket.state == ON_WORKER);
-    to_leader(&l->psocket, psocket_to_uv);
-    return;
-  }
-  pn_proactor_t *bp = batch_proactor(batch);
-  if (bp == p) {
-    uv_mutex_lock(&p->lock);
-    p->batch_working = false;
-    uv_mutex_unlock(&p->lock);
-    uv_async_send(&p->async); /* Wake leader */
-    return;
-  }
-}
-
-/* Process the leader_q, in the leader thread */
-static void leader_process_lh(pn_proactor_t *p) {
   if (p->timeout_request) {
     p->timeout_request = false;
     if (p->timeout) {
@@ -738,7 +758,6 @@ static void leader_process_lh(pn_proactor_t *p) {
     }
   }
   for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
-    assert(ps->state == ON_LEADER);
     if (ps->action) {
       uv_mutex_unlock(&p->lock);
       ps->action(ps);
@@ -750,68 +769,111 @@ static void leader_process_lh(pn_proactor_t *p) {
       ps->wakeup = NULL;
       uv_mutex_lock(&p->lock);
     }
+    pn_event_batch_t *batch = get_batch_lh(p);
+    if (batch) return batch;
   }
+  uv_mutex_unlock(&p->lock);
+  uv_run(&p->loop, mode);
+  uv_mutex_lock(&p->lock);
+  return get_batch_lh(p);
 }
 
-/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+/*  ==== public API ==== */
+
+pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
   uv_mutex_lock(&p->lock);
-  /* Try to grab work immediately. */
   pn_event_batch_t *batch = get_batch_lh(p);
-  if (batch == NULL) {
-    /* No work available, follow the leader */
-    while (p->has_leader) {
-      uv_cond_wait(&p->cond, &p->lock);
-    }
-    /* Lead till there is work to do. */
+  if (batch == NULL && !p->has_leader) {
+    /* Try a non-blocking lead to generate some work */
     p->has_leader = true;
-    while (batch == NULL) {
-      leader_process_lh(p);
-      batch = get_batch_lh(p);
-      if (batch == NULL) {
-        uv_mutex_unlock(&p->lock);
-        uv_run(&p->loop, UV_RUN_ONCE);
-        uv_mutex_lock(&p->lock);
-      }
-    }
-    /* Signal the next leader and go to work */
+    batch = leader_lead_lh(p, UV_RUN_NOWAIT);
     p->has_leader = false;
-    uv_cond_signal(&p->cond);
+    uv_cond_signal(&p->cond);   /* Notify next leader */
   }
   uv_mutex_unlock(&p->lock);
   return batch;
 }
 
-pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   uv_mutex_lock(&p->lock);
   pn_event_batch_t *batch = get_batch_lh(p);
-  if (batch == NULL && !p->has_leader) {
-    /* If there is no leader, try a non-waiting lead to generate some work */
-    p->has_leader = true;
-    leader_process_lh(p);
-    uv_mutex_unlock(&p->lock);
-    uv_run(&p->loop, UV_RUN_NOWAIT);
-    uv_mutex_lock(&p->lock);
+  while (!batch && p->has_leader) {
+    uv_cond_wait(&p->cond, &p->lock); /* Follow the leader */
     batch = get_batch_lh(p);
+  }
+  if (!batch) {                 /* Become leader */
+    assert(!p->has_leader);     /* Implied by loop condition */
+    p->has_leader = true;
+    do {
+      batch = leader_lead_lh(p, UV_RUN_ONCE);
+    } while (!batch);
     p->has_leader = false;
+    uv_cond_signal(&p->cond);
   }
   uv_mutex_unlock(&p->lock);
   return batch;
 }
 
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  if (pc) {
+    assert(pc->psocket.state == ON_WORKER);
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      /* Process all events before going back to leader */
+      pconnection_to_worker(pc);
+    } else {
+      to_leader(&pc->psocket, psocket_to_uv);
+    }
+    return;
+  }
+  pn_listener_t *l = batch_listener(batch);
+  if (l) {
+    assert(l->psocket.state == ON_WORKER);
+    to_leader(&l->psocket, psocket_to_uv);
+    return;
+  }
+  pn_proactor_t *bp = batch_proactor(batch);
+  if (bp == p) {
+    uv_mutex_lock(&p->lock);
+    p->batch_working = false;
+    notify(p);
+    uv_mutex_unlock(&p->lock);
+    return;
+  }
+}
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+  if (pn_event_class(e) == pn_proactor__class()) {
+    return (pn_proactor_t*)pn_event_context(e);
+  }
+  pn_listener_t *l = pn_event_listener(e);
+  if (l) {
+    return l->psocket.proactor;
+  }
+  pn_connection_t *c = pn_event_connection(e);
+  if (c) {
+    return pn_connection_proactor(pn_event_connection(e));
+  }
+  return NULL;
+}
+
 void pn_proactor_interrupt(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
   ++p->interrupt;
+  notify(p);
   uv_mutex_unlock(&p->lock);
-  uv_async_send(&p->async);   /* Wake the UV loop */
 }
 
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
   p->timeout_request = true;
+  notify(p);
   uv_mutex_unlock(&p->lock);
-  uv_async_send(&p->async);   /* Wake the UV loop */
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -836,14 +898,6 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
   return pc ? pc->psocket.proactor : NULL;
 }
 
-void leader_wake_connection(psocket_t *ps) {
-  assert(ps->state == ON_LEADER);
-  pconnection_t *pc = as_pconnection(ps);
-  pn_connection_t *c = pc->driver.connection;
-  pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-  pconnection_to_worker(pc);
-}
-
 void pn_connection_wake(pn_connection_t* c) {
   /* May be called from any thread */
   wakeup(&get_pconnection(c)->psocket, leader_wake_connection);
@@ -858,18 +912,11 @@ pn_proactor_t *pn_proactor() {
   uv_mutex_init(&p->lock);
   uv_cond_init(&p->cond);
   uv_async_init(&p->loop, &p->async, NULL);
-  uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
+  uv_timer_init(&p->loop, &p->timer);
   p->timer.data = p;
   return p;
 }
 
-static void on_stopping(uv_handle_t* h, void* v) {
-  /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
-  if (h->type == UV_TCP && !uv_is_closing(h)) {
-    uv_close(h, on_close_psocket);
-  }
-}
-
 void pn_proactor_free(pn_proactor_t *p) {
   uv_timer_stop(&p->timer);
   uv_close((uv_handle_t*)&p->timer, NULL);
@@ -885,39 +932,6 @@ void pn_proactor_free(pn_proactor_t *p) {
   free(p);
 }
 
-static pn_event_t *log_event(void* p, pn_event_t *e) {
-  if (e) {
-    pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
-  }
-  return e;
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
-  pn_listener_t *l = batch_listener(batch);
-  assert(l->psocket.state == ON_WORKER);
-  pn_event_t *prev = pn_collector_prev(l->collector);
-  if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
-    l->err = UV_EOF;
-  }
-  return log_event(l, pn_collector_next(l->collector));
-}
-
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
-  pn_proactor_t *p = batch_proactor(batch);
-  assert(p->batch_working);
-  return log_event(p, pn_collector_next(p->collector));
-}
-
-static void pn_listener_free(pn_listener_t *l) {
-  /* No  assert(l->psocket.state == ON_WORKER);  can be called during shutdown */
-  if (l) {
-    if (l->collector) pn_collector_free(l->collector);
-    if (l->condition) pn_condition_free(l->condition);
-    if (l->attachments) pn_free(l->attachments);
-    free(l);
-  }
-}
-
 pn_listener_t *pn_listener(void) {
   pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
   if (l) {
@@ -933,13 +947,6 @@ pn_listener_t *pn_listener(void) {
   return l;
 }
 
-void leader_listener_close(psocket_t *ps) {
-  assert(ps->state = ON_LEADER);
-  pn_listener_t *l = (pn_listener_t*)ps;
-  l->err = UV_EOF;
-  listener_to_uv(l);
-}
-
 void pn_listener_close(pn_listener_t* l) {
   /* This can be called from any thread, not just the owner of l */
   wakeup(&l->psocket, leader_listener_close);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 6b95c11..6595a0b 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -98,7 +98,7 @@ static void proactor_test_init(proactor_test_t *pts, size_t n) {
    pn_proactor_get.  Continue till all handlers return H_FINISHED (and return 0) or one
    returns H_FAILED  (and return non-0)
 */
-int proactor_test_run(proactor_test_t *pts, size_t n) {
+static int proactor_test_run(proactor_test_t *pts, size_t n) {
   /* Make sure pts are initialized */
   proactor_test_init(pts, n);
   size_t finished = 0;
@@ -127,17 +127,11 @@ int proactor_test_run(proactor_test_t *pts, size_t n) {
 
 
 /* Handler for test_listen_connect, does both sides of the connection */
-handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
+static handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
   pn_connection_t *c = pn_event_connection(e);
   pn_listener_t *l = pn_event_listener(e);
 
   switch (pn_event_type(e)) {
-    /* Ignore these events */
-   case PN_CONNECTION_LOCAL_OPEN:
-   case PN_CONNECTION_BOUND:
-   case PN_CONNECTION_INIT:
-    return H_CONTINUE;
-
     /* Act on these events */
    case PN_LISTENER_ACCEPT: {
     pn_connection_t *accepted = pn_connection();
@@ -149,20 +143,22 @@ handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
    case PN_CONNECTION_REMOTE_OPEN:
     if (pn_connection_state(c) | PN_LOCAL_ACTIVE) { /* Client is fully open - the test is done */
       pn_connection_close(c);
-      return H_FINISHED;
     }  else {                   /* Server returns the open */
       pn_connection_open(c);
     }
+    return H_CONTINUE;
 
    case PN_CONNECTION_REMOTE_CLOSE:
     if (pn_connection_state(c) | PN_LOCAL_ACTIVE) {
       pn_connection_close(c);    /* Return the close */
     }
+    return H_CONTINUE;
+
+   case PN_TRANSPORT_CLOSED:
     return H_FINISHED;
 
    default:
-    TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
-    return H_FAILED;
+    return H_CONTINUE;
     break;
   }
 }
@@ -172,13 +168,13 @@ static void test_early_error(test_t *t) {
   pn_proactor_t *p = pn_proactor();
   pn_proactor_set_timeout(p, timeout); /* In case of hang */
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(p, c, "badhost", "amqp");
+  pn_proactor_connect(p, c, localhost, "1"); /* Bad port */
   pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
   TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
   TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
 
   pn_listener_t *l = pn_listener();
-  pn_proactor_listen(p, l, "badhost", "amqp", 1);
+  pn_proactor_listen(p, l, localhost, "1", 1); /* Bad port */
   etype = wait_for(p, PN_LISTENER_CLOSE);
   TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
   TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
@@ -204,6 +200,45 @@ static void test_listen_connect(test_t *t) {
   pn_proactor_free(server);
 }
 
+static handler_state_t connection_wakeup_handler(test_t *t, pn_event_t *e) {
+  pn_connection_t *c = pn_event_connection(e);
+  switch (pn_event_type(e)) {
+
+   case PN_CONNECTION_REMOTE_OPEN:
+    if (pn_connection_state(c) | PN_LOCAL_UNINIT) {
+      pn_connection_open(c);    /* Server returns the open */
+    }
+    return H_FINISHED;          /* Finish when open at both ends */
+
+   default:
+    /* Otherwise same as listen_connect_handler */
+    return listen_connect_handler(t, e);
+  }
+}
+
+/* Test waking up a connection that is idle */
+static void test_connection_wakeup(test_t *t) {
+  proactor_test_t pts[] =  { { t, connection_wakeup_handler }, { t, connection_wakeup_handler } };
+  proactor_test_init(pts, 2);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  test_port_t port = test_port();          /* Hold a port */
+  pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+  pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
+  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+    sock_close(port.sock);
+    pn_connection_t *c = pn_connection();
+    pn_proactor_connect(client, c, localhost, port.str);
+    proactor_test_run(pts, 2);                          /* Will finish when client is connected */
+    TEST_CHECK(t, NULL == pn_proactor_get(client), ""); /* Should be idle */
+    pn_connection_wake(c);
+    etype = wait_next(client);
+    /* FIXME aconway 2017-02-21: TEST_EVENT_TYPE */
+    TEST_CHECK(t, PN_CONNECTION_WAKE == etype, pn_event_type_name(etype));
+  }
+  pn_proactor_free(client);
+  pn_proactor_free(server);
+}
+
 /* Test that INACTIVE event is generated when last connections/listeners closes. */
 static void test_inactive(test_t *t) {
   proactor_test_t pts[] =  { { t, listen_connect_handler }, { t, listen_connect_handler }};
@@ -226,11 +261,12 @@ static void test_inactive(test_t *t) {
   pn_proactor_free(server);
 }
 
-int main(int argv, char** argc) {
+int main(int argc, char **argv) {
   int failed = 0;
-  RUN_TEST(failed, t, test_inactive(&t));
-  RUN_TEST(failed, t, test_interrupt_timeout(&t));
-  RUN_TEST(failed, t, test_early_error(&t));
-  RUN_TEST(failed, t, test_listen_connect(&t));
+  RUN_ARGV_TEST(failed, t, test_inactive(&t));
+  RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
+  RUN_ARGV_TEST(failed, t, test_early_error(&t));
+  RUN_ARGV_TEST(failed, t, test_listen_connect(&t));
+  RUN_ARGV_TEST(failed, t, test_connection_wakeup(&t));
   return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85585202/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 2663dcf..7006334 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -104,6 +104,19 @@ static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const ch
     }                                                           \
   } while(0)
 
+/* Like RUN_TEST but only if one of the argv strings is found in the test EXPR */
+#define RUN_ARGV_TEST(FAILED, T, EXPR) do {                             \
+    if (argc == 1) {                                                    \
+      RUN_TEST(FAILED, T, EXPR);                                        \
+    } else {                                                            \
+      for (int i = 1; i < argc; ++i) {                                  \
+        if (strstr(#EXPR, argv[i])) {                                   \
+          RUN_TEST(FAILED, T, EXPR);                                    \
+          break;                                                        \
+        }                                                               \
+      }                                                                 \
+    }                                                                   \
+  } while(0)
 
 /* Some very simple platform-secifics to acquire an unused socket */
 #if defined(WIN32)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org