You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/16 23:26:24 UTC

[1/6] qpid-proton git commit: PROTON-1351: link go binding with qpid-proton-core library

Repository: qpid-proton
Updated Branches:
  refs/heads/master cd612ffec -> 42d5e89ef


PROTON-1351: link go binding with qpid-proton-core library

The go binding was still being linked with the qpid-proton library, even though it
only depends on symbols in qpid-proton-core. Link it with qpid-proton-core.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a1d84b0e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a1d84b0e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a1d84b0e

Branch: refs/heads/master
Commit: a1d84b0e212ebd1b7a8b2f9a52dc83337995dca3
Parents: cd612ff
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Feb 15 21:27:00 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 16 16:52:27 2017 -0500

----------------------------------------------------------------------
 proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go     | 2 +-
 proton-c/bindings/go/src/qpid.apache.org/electron/doc.go | 2 +-
 proton-c/bindings/go/src/qpid.apache.org/proton/doc.go   | 2 +-
 proton-c/bindings/go/src/qpid.apache.org/proton/error.go | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a1d84b0e/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
index 701af55..c04c2b0 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
@@ -32,7 +32,7 @@ AMQP 1.0 is an open standard for inter-operable message exchange, see <http://ww
 */
 package amqp
 
-// #cgo LDFLAGS: -lqpid-proton
+// #cgo LDFLAGS: -lqpid-proton-core
 import "C"
 
 // This file is just for the package comment.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a1d84b0e/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index f4baa31..39137c0 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -50,7 +50,7 @@ More realistic examples: https://github.com/apache/qpid-proton/blob/master/examp
 */
 package electron
 
-//#cgo LDFLAGS: -lqpid-proton
+//#cgo LDFLAGS: -lqpid-proton-core
 import "C"
 
 // Just for package comment

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a1d84b0e/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
index 1049e71..39716e2 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
@@ -60,7 +60,7 @@ applications.
 */
 package proton
 
-// #cgo LDFLAGS: -lqpid-proton
+// #cgo LDFLAGS: -lqpid-proton-core
 import "C"
 
 // This file is just for the package comment.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a1d84b0e/proton-c/bindings/go/src/qpid.apache.org/proton/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/error.go
index 80d9680..5232fec 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/error.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/error.go
@@ -20,7 +20,7 @@ under the License.
 // Internal implementation details - ignore.
 package proton
 
-// #cgo LDFLAGS: -lqpid-proton
+// #cgo LDFLAGS: -lqpid-proton-core
 // #include <proton/error.h>
 // #include <proton/codec.h>
 import "C"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[5/6] qpid-proton git commit: PROTON-1403: Better port allocation for test servers using bind(0)

Posted by ac...@apache.org.
PROTON-1403: Better port allocation for test servers using bind(0)

Use bind(0) with SO_REUSEADDR to hold onto the bound port until the test server
is listening on it. Works in python and C.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9abc67de
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9abc67de
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9abc67de

Branch: refs/heads/master
Commit: 9abc67de8c40ffea0c988ad9408d513fbb400196
Parents: b987a6a
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Feb 16 11:25:07 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 16 18:25:17 2017 -0500

----------------------------------------------------------------------
 examples/c/proactor/test.py           |  24 +++---
 examples/exampletest.py               |  24 ++++--
 proton-c/src/core/connection_driver.c |   1 -
 proton-c/src/proactor/libuv.c         |   1 -
 proton-c/src/tests/proactor.c         |  12 ++-
 proton-c/src/tests/test_tools.h       | 118 ++++++++++++++++-------------
 6 files changed, 105 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9abc67de/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 45bb817..7d6f3fc 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -61,23 +61,25 @@ class CExampleTest(BrokerTestCase):
             except ProcError, e:
                 if "connection refused" in e.args[0] and max > 0:
                     max -= 1
-                    time.sleep(.01)
                     continue
                 raise
 
     def test_send_direct(self):
-        """Send first then receive"""
-        addr = "127.0.0.1:%s/examples" % (pick_port())
-        d = self.proc(["direct", "-a", addr])
-        self.assertEqual("100 messages sent and acknowledged\n", self.retry(["send", "-a", addr]))
-        self.assertIn(receive_expect(100), d.wait_out())
+        """Send to direct server"""
+        with bind0() as sock:
+            addr = "127.0.0.1:%s/examples" % sock.port()
+            d = self.proc(["direct", "-a", addr])
+            self.assertEqual("100 messages sent and acknowledged\n", self.retry(["send", "-a", addr]))
+            self.assertIn(receive_expect(100), d.wait_out())
 
     def test_receive_direct(self):
-        """Send first then receive"""
-        addr = "127.0.0.1:%s/examples" % (pick_port())
-        d = self.proc(["direct", "-a", addr])
-        self.assertEqual(receive_expect(100), self.retry(["receive", "-a", addr]))
-        self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
+        """Receive from direct server"""
+        with bind0() as sock:
+            addr = "127.0.0.1:%s/examples" % sock.port()
+            d = self.proc(["direct", "-a", addr])
+            self.assertEqual(receive_expect(100), self.retry(["receive", "-a", addr]))
+            self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
+
 
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9abc67de/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
index bf7ea9b..8dbd1ed 100644
--- a/examples/exampletest.py
+++ b/examples/exampletest.py
@@ -29,10 +29,18 @@ from copy import copy
 import platform
 from os.path import dirname as dirname
 
-def pick_port():
-    """Pick a random port."""
-    p =  randrange(10000, 20000)
-    return p
+def bind0():
+    """Bind a socket with bind(0) and SO_REUSEADDR to get a free port to listen on"""
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)    
+    sock.bind(('', 0))
+    return sock
+
+# Monkeypatch add port() and with support to socket.socket
+socket.socket.port = lambda(self): socket.getnameinfo(self.getsockname(), 0)[1]
+socket.socket.__enter__ = lambda(self): self
+def socket__exit__(self, *args): self.close()
+socket.socket.__exit__ = socket__exit__
 
 class ProcError(Exception):
     """An exception that captures failed process output"""
@@ -91,7 +99,7 @@ class Proc(Popen):
         t.join(timeout)
         if self.poll() is None:      # Still running
             self.kill()
-            raise ProcError(self, "timeout")
+            raise ProcError(self, "still running after %ss" % timeout)
         if expect is not None and self.poll() != expect:
             raise ProcError(self)
         return self.out
@@ -148,7 +156,7 @@ def wait_port(port, timeout=10):
         except socket.error, e:
             if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error.
                 raise
-    raise socket.timeout()
+    raise socket.timeout("waiting for port %s" % port)
 
 
 class BrokerTestCase(ExampleTestCase):
@@ -159,7 +167,8 @@ class BrokerTestCase(ExampleTestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.port = pick_port()
+        sock = bind0()
+        cls.port = sock.port()
         cls.addr = "127.0.0.1:%s/examples" % (cls.port)
         cls.broker = None       # In case Proc throws, create the attribute.
         cls.broker = Proc(cls.broker_exe + ["-a", cls.addr])
@@ -168,6 +177,7 @@ class BrokerTestCase(ExampleTestCase):
         except Exception, e:
             cls.broker.kill()
             raise ProcError(cls.broker, "timed out waiting for port")
+        sock.close()
 
     @classmethod
     def tearDownClass(cls):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9abc67de/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index 0d1db21..63eed09 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -127,7 +127,6 @@ pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) {
 }
 
 bool pn_connection_driver_has_event(pn_connection_driver_t *d) {
-  /* FIXME aconway 2017-02-15: this is ugly */
   pn_collector_t *c = pn_connection_collector(d->connection);
   return pn_collector_more(c) || (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c));
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9abc67de/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 0ccfcda..4ec7b03 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -427,7 +427,6 @@ static void on_connection(uv_stream_t* server, int err) {
   listener_to_worker(l);        /* If already ON_WORKER it will stay there */
 }
 
-/* FIXME aconway 2017-02-16:  listener events in unwatch*/
 static void leader_accept(pn_listener_t * l) {
   assert(l->accepting);
   pconnection_t *pc = l->accepting;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9abc67de/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index ae5b1f6..e90c45a 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -24,6 +24,7 @@
 #include <proton/listener.h>
 #include <proton/proactor.h>
 #include <proton/transport.h>
+#include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
@@ -176,15 +177,16 @@ static void test_listen_connect(test_t *t) {
   proactor_test_t *client = &pts[0], *server = &pts[1];
   proactor_test_init(pts, 2);
 
-  int port = pick_port();
+  sock_t sock = sock_bind0();          /* Hold a port */
   char port_str[16];
-  snprintf(port_str, sizeof(port_str), "%d", port);
+  snprintf(port_str, sizeof(port_str), "%d", sock_port(sock));
   pn_proactor_listen(server->proactor, pn_listener(), localhost, port_str, 4);
   pn_event_type_t etype = wait_for(server->proactor, PN_LISTENER_OPEN);
   if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
     pn_proactor_connect(client->proactor, pn_connection(), localhost, port_str);
     proactor_test_run(pts, 2);
   }
+  sock_close(sock);
   pn_proactor_free(client->proactor);
   pn_proactor_free(server->proactor);
 }
@@ -210,8 +212,10 @@ static void test_listen_connect_error(test_t *t) {
 
 int main(int argv, char** argc) {
   int failed = 0;
-  RUN_TEST(failed, t, test_interrupt_timeout(&t));
+  if (0) {
+    RUN_TEST(failed, t, test_interrupt_timeout(&t));
+    RUN_TEST(failed, t, test_listen_connect_error(&t));
+  }
   RUN_TEST(failed, t, test_listen_connect(&t));
-  RUN_TEST(failed, t, test_listen_connect_error(&t));
   return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9abc67de/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 047ab42..2619337 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -22,30 +22,45 @@
 
 #include <proton/type_compat.h>
 
+#include <errno.h>
 #include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 
-/* Call via ASSERT macro. */
-static void assert_fail_(const char* cond, const char* file, int line) {
-  printf("%s:%d: Assertion failed: %s\n", file, line, cond);
+/*
+  All output from test marcros goes to stdout not stderr, error messages are normal for a test.
+  Some errno handling functions are thread-unsafe
+  */
+
+
+/* Call via TEST_ASSERT macros */
+static void assert_fail_(const char* cond, const char* file, int line, const char *fmt, ...) {
+  printf("%s:%d: Assertion failed: %s", file, line, cond);
+  if (fmt && *fmt) {
+    va_list ap;
+    va_start(ap, fmt);
+    printf(" - ");
+    vprintf(fmt, ap);
+    printf("\n");
+    fflush(stdout);
+    va_end(ap);
+  }
   abort();
 }
 
 /* Unconditional assert (does not depend on NDEBUG) for tests. */
-#define ASSERT(expr)                                            \
-  ((expr) ?  (void)0 : assert_fail_(#expr, __FILE__, __LINE__))
+#define TEST_ASSERT(expr) \
+  ((expr) ?  (void)0 : assert_fail_(#expr, __FILE__, __LINE__, NULL))
 
-/* Call via macro ASSERT_PERROR */
-static void assert_perror_fail_(const char* cond, const char* file, int line) {
-  perror(cond);
-  printf("%s:%d: Assertion failed (error above): %s\n", file, line, cond);
-  abort();
-}
+/* Unconditional assert with printf-style message (does not depend on NDEBUG) for tests. */
+#define TEST_ASSERTF(expr, ...) \
+  ((expr) ?  (void)0 : assert_fail_(#expr, __FILE__, __LINE__, __VA_ARGS__))
 
-/* Like ASSERT but also calls perror() to print the current errno error. */
-#define ASSERT_PERROR(expr)                                             \
-  ((expr) ?  (void)0 : assert_perror_fail_(#expr, __FILE__, __LINE__))
+/* Like TEST_ASSERT but includes  errno string for err */
+/* TODO aconway 2017-02-16: not thread safe, replace with safe strerror_r or similar */
+#define TEST_ASSERT_ERRNO(expr, err) \
+  TEST_ASSERTF((expr), "%s", strerror(err))
 
 
 /* A struct to collect the results of a test.
@@ -61,12 +76,12 @@ static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const ch
   if (!expr) {
     va_list ap;
     va_start(ap, fmt);
-    fprintf(stderr, "%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr);
+    printf("%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr);
     if (fmt && *fmt) {
-      fprintf(stderr, " - ");
-      vfprintf(stderr, fmt, ap);
+      printf(" - ");
+      vprintf(fmt, ap);
     }
-    fprintf(stderr, "\n");
+    printf("\n");
     fflush(stderr);
     ++t->errors;
   }
@@ -89,6 +104,8 @@ static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const ch
     }                                                           \
   } while(0)
 
+
+/* Some very simple platform-secifics to acquire an unused socket */
 #if defined(WIN32)
 
 #include <winsock2.h>
@@ -96,45 +113,44 @@ static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const ch
 typedef SOCKET sock_t;
 static inline void sock_close(sock_t sock) { closesocket(sock); }
 
-#else
+#else  /* POSIX */
+
+typedef int sock_t;
+# include <netinet/in.h>
+# include <unistd.h>
+static inline void sock_close(sock_t sock) { close(sock); }
+#endif
 
-#include <netdb.h>
-#include <netinet/in.h>
-#include <time.h>
-#include <unistd.h>
 
-static int port_in_use(int port) {
-  /* Attempt to bind a dummy socket to test if the port is in use. */
-  int dummy_socket = socket(AF_INET, SOCK_STREAM, 0);
-  ASSERT_PERROR(dummy_socket >= 0);
+/* Create a socket and bind(LOOPBACK:0) to get a free port.
+   Use SO_REUSEADDR so other processes can bind and listen on this port.
+   Close the returned fd when the other process is listening.
+   Asserts on error.
+*/
+static sock_t sock_bind0(void) {
+  int sock =  socket(AF_INET, SOCK_STREAM, 0);
+  TEST_ASSERT_ERRNO(sock >= 0, errno);
+  int on = 1;
+  TEST_ASSERT_ERRNO(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)) == 0, errno);
   struct sockaddr_in addr = {0};
-  addr.sin_family = AF_INET;
-  addr.sin_addr.s_addr = INADDR_ANY;
-  addr.sin_port = htons(port);
-  int ret = bind(dummy_socket, (struct sockaddr *) &addr, sizeof(addr));
-  close(dummy_socket);
-  return ret < 0;
+  addr.sin_family = AF_INET;    /* set the type of connection to TCP/IP */
+  addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+  addr.sin_port = 0;            /* bind to port 0 */
+  TEST_ASSERT_ERRNO(bind(sock, (struct sockaddr*)&addr, sizeof(addr)) == 0, errno);
+  return sock;
 }
 
-/* Try to pick an unused port by picking random ports till we find one
-   that is not in use. This is not foolproof as some other process may
-   grab it before the caller binds or connects.
-*/
-static int pick_port(void) {
-  srand(time(NULL));
-  static int MAX_TRIES = 10;
+static int sock_port(sock_t sock) {
+  struct sockaddr addr = {0};
+  socklen_t len = sizeof(addr);
+  TEST_ASSERT_ERRNO(getsockname(sock, &addr, &len) == 0, errno);
   int port = -1;
-  int i = 0;
-  do {
-    /* Pick a random port. Avoid the standard OS ephemeral port range used by
-       bind(0) - ports can be allocated and re-allocated very rapidly there.
-    */
-    port =  (rand()%10000) + 10000;
-  } while (i++ < MAX_TRIES && port_in_use(port));
-  ASSERT(i < MAX_TRIES && "cannot pick a port");
-  return port;
+  switch (addr.sa_family) {
+   case AF_INET: port = ((struct sockaddr_in*)&addr)->sin_port; break;
+   case AF_INET6: port = ((struct sockaddr_in6*)&addr)->sin6_port; break;
+   default: TEST_ASSERTF(false, "unknown protocol type %d\n", addr.sa_family); break;
+  }
+  return ntohs(port);
 }
 
-#endif
-
 #endif // TESTS_TEST_TOOLS_H


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/6] qpid-proton git commit: NO-JIRA: Set the correct COMPILE_WARNING_FLAGS for clang C++

Posted by ac...@apache.org.
NO-JIRA: Set the correct COMPILE_WARNING_FLAGS for clang C++


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ce6626be
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ce6626be
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ce6626be

Branch: refs/heads/master
Commit: ce6626be74c0cb824bab91e80e4e81428a84c360
Parents: a1d84b0
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Feb 16 00:07:00 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 16 16:52:44 2017 -0500

----------------------------------------------------------------------
 proton-c/CMakeLists.txt | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce6626be/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index c73f9df..30a77e2 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -274,8 +274,9 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "SunPro")
 endif (CMAKE_COMPILER_IS_GNUCC)
 
 if (CMAKE_C_COMPILER_ID MATCHES "Clang")
+  set (COMPILE_WARNING_FLAGS  "-Wall -pedantic")
   if (ENABLE_WARNING_ERROR)
-    set (COMPILE_WARNING_FLAGS "-Werror -Wall -pedantic")
+    set (COMPILE_WARNING_FLAGS "-Werror ${COMPILE_WARNING_FLAGS}")
   endif (ENABLE_WARNING_ERROR)
 endif()
 
@@ -285,7 +286,7 @@ if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
   endif (ENABLE_WARNING_ERROR)
   # TODO aconway 2016-01-06: we should be able to clean up the code and turn on
   # some of these warnings.
-  set (CXX_WARNING_FLAGS "${WERROR} -pedantic -Weverything -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-float-equal -Wno-padded -Wno-sign-conversion -Wno-switch-enum -Wno-weak-vtables -Wno-exit-time-destructors -Wno-global-constructors -Wno-shorten-64-to-32 -Wno-documentation -Wno-documentation-unknown-command -Wno-old-style-cast -Wno-missing-noreturn")
+  set (CXX_WARNING_FLAGS "${COMPILE_WARNING_FLAGS} -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-float-equal -Wno-padded -Wno-sign-conversion -Wno-switch-enum -Wno-weak-vtables -Wno-exit-time-destructors -Wno-global-constructors -Wno-shorten-64-to-32 -Wno-documentation -Wno-documentation-unknown-command -Wno-old-style-cast -Wno-missing-noreturn")
 endif()
 
 # Make flags visible to examples.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/6] qpid-proton git commit: NO-JIRA: go binding build results in binary dir, not source dir

Posted by ac...@apache.org.
NO-JIRA: go binding build results in binary dir, not source dir

Avoid polluting source directory and avoid races when multiple builds are done
in parallel from the same source.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9bd99ebf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9bd99ebf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9bd99ebf

Branch: refs/heads/master
Commit: 9bd99ebf096ed83bd23ae05fe9cc0e6f65d9ac2e
Parents: ce6626b
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Feb 16 00:44:52 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 16 16:52:44 2017 -0500

----------------------------------------------------------------------
 .gitignore                          |  7 -------
 examples/go/CMakeLists.txt          |  8 +++-----
 proton-c/bindings/go/CMakeLists.txt | 17 ++++++++---------
 3 files changed, 11 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bd99ebf/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index aa9bc36..b769990 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,16 +31,9 @@ eclipse-classes
 # The usual location for proton-c build files
 proton-c/build
 
-# Executables built by go binding tests
-proton-c/bindings/go/bin
-
 # Testresults from the jenkins build script
 testresults
 
-# Go binding build output
-/proton-c/bindings/go/pkg
-/proton-c/bindings/go/bin
-
 # Python TOX test build output
 /proton-c/bindings/python/MANIFEST
 /proton-c/bindings/python/build

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bd99ebf/examples/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt
index c9aba01..861cdd8 100644
--- a/examples/go/CMakeLists.txt
+++ b/examples/go/CMakeLists.txt
@@ -20,12 +20,11 @@
 if(BUILD_GO)
 
   set(examples electron/broker electron/receive electron/send proton/broker)
-  file(GLOB_RECURSE example_source FOLLOW_SYMLINKS ${CMAKE_CURRENT_SOURCE_DIR}/*.go)
 
   # Build example exes
   foreach(example ${examples})
-    string(REPLACE / _ target ${example})
-    set(target "go_example_${target}")
+    string(REPLACE / - target ${example})
+    set(target "go-example-${target}")
     set(output ${CMAKE_CURRENT_BINARY_DIR}/${example})
     # Always run go_build, it will do nothing if there is nothing to do.
     # Otherwise it's too hard to get the dependencies right.
@@ -33,12 +32,11 @@ if(BUILD_GO)
       COMMAND ${GO_BUILD} ${GO_EXAMPLE_FLAGS} -o ${output} ${CMAKE_CURRENT_SOURCE_DIR}/${example}.go
       WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
       DEPENDS go-build)
-    list(APPEND example_targets ${target})
   endforeach()
 
   # Build test driver exe
   set(test_exe ${CMAKE_CURRENT_BINARY_DIR}/example_test)
-  add_custom_target(go_example_test ALL
+  add_custom_target(go-example-test ALL
     COMMAND ${GO_TEST} -c -o ${test_exe} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go
     WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bd99ebf/proton-c/bindings/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/CMakeLists.txt b/proton-c/bindings/go/CMakeLists.txt
index d0ffe9b..1cb9d6c 100644
--- a/proton-c/bindings/go/CMakeLists.txt
+++ b/proton-c/bindings/go/CMakeLists.txt
@@ -26,7 +26,6 @@ set(GO_TEST_FLAGS "-v" CACHE STRING "Flags for 'go test'")
 
 # Flags that differ for golang go and gcc go.
 if (go_ver MATCHES "gccgo")
-  # TODO aconway 2015-10-08: import cycles with -race under gccgo, investigate.
   set(GO_RPATH_FLAGS -gccgoflags "-Wl,-rpath=${CMAKE_BINARY_DIR}/proton-c")
 else()
   set(GO_RPATH_FLAGS -ldflags "-r ${CMAKE_BINARY_DIR}/proton-c")
@@ -37,7 +36,7 @@ separate_arguments(GO_TEST_FLAGS)
 
 # Following are CACHE INTERNAL so examples/CMakeLists.txt can see them.
 set(GO_ENV ${env_py} --
-  "GOPATH=${CMAKE_CURRENT_SOURCE_DIR}"
+  "GOPATH=${CMAKE_CURRENT_BINARY_DIR}"
   "CGO_CFLAGS=-I${CMAKE_SOURCE_DIR}/proton-c/include -I${CMAKE_BINARY_DIR}/proton-c/include"
   "CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/proton-c"
   "PN_INTEROP_DIR=${CMAKE_SOURCE_DIR}/tests/interop"
@@ -47,15 +46,15 @@ set(GO ${GO_ENV} ${GO_EXE} CACHE INTERNAL "Run go with environment set")
 
 set(GO_BUILD ${GO} build ${GO_BUILD_FLAGS} ${GO_RPATH_FLAGS} CACHE INTERNAL "Run go build")
 set(GO_INSTALL ${GO} install ${GO_BUILD_FLAGS} CACHE INTERNAL "Run go install" )
-set(GO_TEST ${GO} test ${GO_BUILD_FLAGS} ${GO_RPATH_FLAGS} ${GO_TEST_FLAGS} CACHE INTERNAL "Run go test")
+set(GO_TEST ${GO} test ${GO_TEST_FLAGS} ${GO_RPATH_FLAGS} CACHE INTERNAL "Run go test")
 
-# Go tools insist on standard Go layout which puts compiled code in the source tree :(
-# Build output is all under git-ignored pkg or bin subdirectories, they are removed by make clean.
-
-# The go build tools handle dependency checks and incremental builds better than
-# CMake so just run them every time, they do nothing if nothing needs to be
-# done.
+# The go build tools handle dependency checks and incremental builds better than CMake so
+# just run them every time, they do nothing if nothing needs to be done.
+#
+# The Go sources are copied to the binary directory so we can respect the standard Go tree
+# layout without polluting the source tree.
 add_custom_target(go-build ALL
+  COMMAND ${CMAKE_COMMAND} -E copy_directory ${CMAKE_CURRENT_SOURCE_DIR}/src ${CMAKE_CURRENT_BINARY_DIR}/src
   COMMAND ${GO_INSTALL} qpid.apache.org/...
   DEPENDS qpid-proton-core
   WORKING_DIRECTORY $ENV{PWD})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[6/6] qpid-proton git commit: PROTON-1403: Fixes for windows and platforms without libuv

Posted by ac...@apache.org.
PROTON-1403: Fixes for windows and platforms without libuv


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/42d5e89e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/42d5e89e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/42d5e89e

Branch: refs/heads/master
Commit: 42d5e89ef248e5dd85ca892e79696f845c34796f
Parents: 9abc67d
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Feb 16 13:16:30 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 16 18:25:21 2017 -0500

----------------------------------------------------------------------
 examples/c/proactor/CMakeLists.txt | 5 +----
 proton-c/CMakeLists.txt            | 1 +
 proton-c/src/proactor/libuv.c      | 4 ++--
 proton-c/src/tests/CMakeLists.txt  | 4 +++-
 4 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/42d5e89e/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index 153f35f..bb6cb0f 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -23,11 +23,8 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
 
 # Check if the proton library has a proactor implementation.
 include(CheckFunctionExists)
-include(CMakePushCheckState)
-cmake_push_check_state()
-set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${Proton_LIBRARIES})
+set(CMAKE_REQUIRED_LIBRARIES ${Proton_LIBRARIES})
 check_function_exists(pn_proactor HAS_PROACTOR)
-cmake_pop_check_state()
 
 if(HAS_PROACTOR)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/42d5e89e/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 0731b67..68ccc2c 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -618,6 +618,7 @@ configure_lib(PROTONLIB qpid-proton)
 configure_lib(PROTONCORELIB qpid-proton-core)
 
 if (qpid-proton-proactor)
+  set(HAS_PROACTOR 1)
   add_library (
     qpid-proton-proactor SHARED
     ${qpid-proton-proactor}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/42d5e89e/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 4ec7b03..0a10ac9 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -19,8 +19,6 @@
  *
  */
 
-#include <uv.h>
-
 #include <proton/condition.h>
 #include <proton/connection_driver.h>
 #include <proton/engine.h>
@@ -31,6 +29,8 @@
 #include <proton/transport.h>
 #include <proton/url.h>
 
+#include <uv.h>
+
 /* All asserts are cheap and should remain in a release build for debugability */
 #undef NDEBUG
 #include <assert.h>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/42d5e89e/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index 70b30a0..3ad114a 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -49,4 +49,6 @@ pn_add_c_test (c-reactor-tests reactor.c)
 pn_add_c_test (c-event-tests event.c)
 pn_add_c_test (c-data-tests data.c)
 pn_add_c_test (c-condition-tests condition.c)
-pn_add_c_test (c-proactor-tests proactor.c)
+if(HAS_PROACTOR)
+  pn_add_c_test (c-proactor-tests proactor.c)
+endif(HAS_PROACTOR)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/6] qpid-proton git commit: PROTON-1403: C proactor tests, fixes & additions

Posted by ac...@apache.org.
PROTON-1403: C proactor tests, fixes & additions

proactor API additions:
- PN_LISTENER_OPEN: event when listener is listening and connects will succeed.
- pn_proactor_grab(): non-blocking version of pn_proactor_wait(), used in tests

src/tests/test_tools.h - simple C test framework
src/tests/proactor.c - initial tests for basic proactor functionality
src/proactor/libuv.c
 - fixed some assertion bugs and memory leaks
 - renaming and simplifying the code
examples/broker.c: exit with non-0 if broker stops because of an error


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b987a6a7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b987a6a7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b987a6a7

Branch: refs/heads/master
Commit: b987a6a70f30dc593bbea6c98ebae36ec77e4b7d
Parents: 9bd99eb
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jan 13 16:41:43 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 16 17:58:20 2017 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +-
 examples/c/proactor/CMakeLists.txt              |   6 +-
 examples/c/proactor/broker.c                    |   6 +-
 examples/c/proactor/direct.c                    |   5 +-
 examples/c/proactor/send.c                      |   2 +-
 proton-c/CMakeLists.txt                         |   6 -
 .../cpp/include/proton/io/connection_driver.hpp |   1 +
 proton-c/include/proton/event.h                 |  17 +-
 proton-c/include/proton/listener.h              |   2 +-
 proton-c/include/proton/proactor.h              |  34 +-
 proton-c/src/core/connection_driver.c           |   4 +-
 proton-c/src/core/event.c                       |   2 +
 proton-c/src/proactor/libuv.c                   | 518 ++++++++++---------
 proton-c/src/tests/CMakeLists.txt               |   9 +-
 proton-c/src/tests/proactor.c                   | 217 ++++++++
 proton-c/src/tests/test_tools.h                 | 140 +++++
 16 files changed, 698 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b538ffd..294fd03 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -71,7 +71,7 @@ if (CMAKE_BUILD_TYPE MATCHES "Coverage")
   make_directory(coverage_results)
   add_custom_target(coverage
     WORKING_DIRECTORY ./coverage_results
-    COMMAND ${CMAKE_SOURCE_DIR}/bin/record-coverage.sh ${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
+    CgOMMAND ${CMAKE_SOURCE_DIR}/bin/record-coverage.sh ${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
 endif()
 
 if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index 4189cf5..153f35f 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -29,7 +29,7 @@ set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${Proton_LIBRARIES})
 check_function_exists(pn_proactor HAS_PROACTOR)
 cmake_pop_check_state()
 
-if (HAS_PROACTOR)
+if(HAS_PROACTOR)
 
 add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
 
@@ -48,6 +48,6 @@ foreach(name broker send receive direct)
 endforeach()
 
 set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
-add_test(c-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v)
+add_test(c-example-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v)
 
-endif()
+endif(HAS_PROACTOR)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index ebf4068..2a338e1 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -22,6 +22,7 @@
 #include <proton/connection_driver.h>
 #include <proton/proactor.h>
 #include <proton/engine.h>
+#include <proton/listener.h>
 #include <proton/sasl.h>
 #include <proton/transport.h>
 #include <proton/url.h>
@@ -288,8 +289,11 @@ static void session_unsub(broker_t *b, pn_session_t *ssn) {
   }
 }
 
+static int exit_code = 0;
+
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
+    exit_code = 1;
     const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
     fprintf(stderr, "%s: %s: %s\n", ename,
             pn_condition_get_name(cond), pn_condition_get_description(cond));
@@ -483,5 +487,5 @@ int main(int argc, char **argv) {
   }
   pn_proactor_free(b.proactor);
   free(threads);
-  return 0;
+  return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index 26f1b33..3d0a7d1 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -22,9 +22,10 @@
 #include <proton/connection.h>
 #include <proton/connection_driver.h>
 #include <proton/delivery.h>
-#include <proton/proactor.h>
 #include <proton/link.h>
+#include <proton/listener.h>
 #include <proton/message.h>
+#include <proton/proactor.h>
 #include <proton/sasl.h>
 #include <proton/session.h>
 #include <proton/transport.h>
@@ -59,7 +60,7 @@ typedef struct app_data_t {
 
 static const int BATCH = 1000; /* Batch size for unlimited receive */
 
-int exit_code = 0;
+static int exit_code = 0;
 
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 48fcecd..bba5d3e 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -50,7 +50,7 @@ typedef struct app_data_t {
   bool finished;
 } app_data_t;
 
-int exit_code = 0;
+static int exit_code = 0;
 
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 30a77e2..0731b67 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -278,12 +278,6 @@ if (CMAKE_C_COMPILER_ID MATCHES "Clang")
   if (ENABLE_WARNING_ERROR)
     set (COMPILE_WARNING_FLAGS "-Werror ${COMPILE_WARNING_FLAGS}")
   endif (ENABLE_WARNING_ERROR)
-endif()
-
-if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
-  if (ENABLE_WARNING_ERROR)
-    set (WERROR "-Werror")
-  endif (ENABLE_WARNING_ERROR)
   # TODO aconway 2016-01-06: we should be able to clean up the code and turn on
   # some of these warnings.
   set (CXX_WARNING_FLAGS "${COMPILE_WARNING_FLAGS} -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-float-equal -Wno-padded -Wno-sign-conversion -Wno-switch-enum -Wno-weak-vtables -Wno-exit-time-destructors -Wno-global-constructors -Wno-shorten-64-to-32 -Wno-documentation -Wno-documentation-unknown-command -Wno-old-style-cast -Wno-missing-noreturn")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index 4a0efe9..759b1fc 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -113,6 +113,7 @@ PN_CPP_CLASS_EXTERN connection_driver {
     ///
     PN_CPP_EXTERN connection_driver(proton::container&);
 #if PN_CPP_HAS_RVALUE_REFERENCES
+    /// @copydoc connection_driver()
     PN_CPP_EXTERN connection_driver(proton::container&, event_loop&& loop);
 #endif
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 4a88368..3cfcc82 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -321,7 +321,8 @@ typedef enum {
   PN_CONNECTION_WAKE,
 
   /**
-   * Indicates the listener is ready to call pn_listener_accept() 
+   * Indicates the listener has an incoming connection, call pn_listener_accept()
+   * to accept it.
    * Events of this type point to the @ref pn_listener_t.
    */
   PN_LISTENER_ACCEPT,
@@ -350,7 +351,13 @@ typedef enum {
    *
    * Events of this type point to the @ref pn_proactor_t.
    */
-  PN_PROACTOR_INACTIVE
+  PN_PROACTOR_INACTIVE,
+
+  /**
+   * Indicates the listener is listeneing.
+   * Events of this type point to the @ref pn_listener_t.
+   */
+  PN_LISTENER_OPEN
 
 } pn_event_type_t;
 
@@ -537,9 +544,9 @@ PN_EXTERN pn_transport_t *pn_event_transport(pn_event_t *event);
 PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event);
 
 /**
- * **Experimental** - A batch of events to handle. Call
- * pn_event_batch_next() in a loop until it returns NULL to handle
- * them.
+ * **Experimental** - A batch of events that must be handled in sequence.
+ * Call pn_event_batch_next() in a loop until it returns NULL to extract
+ * the events.
  */
 typedef struct pn_event_batch_t pn_event_batch_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index 18feca7..2038c06 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -20,7 +20,7 @@
  * under the License.
  */
 
-#include <proton/condition.h>
+#include <proton/import_export.h>
 #include <proton/types.h>
 
 #ifdef __cplusplus

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 71a7dda..af3acbc 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -20,10 +20,9 @@
  * under the License.
  */
 
-#include <proton/types.h>
-#include <proton/import_export.h>
-#include <proton/listener.h>
 #include <proton/event.h>
+#include <proton/import_export.h>
+#include <proton/types.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -95,25 +94,34 @@ PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listen
                        const char *host, const char *port, int backlog);
 
 /**
- * Wait for events to handle.
+ * Wait until there is at least one event to handle.
+ * Always returns a non-empty batch of events.
  *
- * Handle events in the returned batch by calling
- * pn_event_batch_next() until it returns NULL. You must call
- * pn_proactor_done() when you are finished with the batch.
+ * You must call pn_proactor_done() when you are finished with the batch, you
+ * must not use the batch pointer after calling pn_proactor_done().
  *
- * If you call pn_proactor_done() before finishing the batch, the
- * remaining events will be returned again by another call
- * pn_proactor_wait().  This is less efficient, but allows you to
- * handle part of a batch and then hand off the rest to another
- * thread.
+ * Normally it is most efficient to handle the entire batch in one thread, but
+ * you can call pn_proactor_done() on an unfinished the batch. The remaining
+ * events will be returned by another call to pn_proactor_done(), possibly in a
+ * different thread.
+ *
+ * @note You can generate events to force threads to wake up from
+ * pn_proactor_wait() using pn_proactor_interrupt(), pn_proactor_set_timeout()
+ * and pn_connection_wake()
  *
  * @note Thread-safe: can be called concurrently. Events in a single
  * batch must be handled in sequence, but batches returned by separate
- * calls to pn_proactor_wait() can be handled concurrently.
+ * calls can be handled concurrently.
  */
 PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
 
 /**
+ * Return a batch of events if one is available immediately, otherwise return NULL.  If it
+ * does return an event batch, the rules are the same as for pn_proactor_wait()
+ */
+PNP_EXTERN pn_event_batch_t *pn_proactor_grab(pn_proactor_t *proactor);
+
+/**
  * Call when done handling a batch of events.
  *
  * Must be called exactly once to match each call to

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index 3393e64..0d1db21 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -127,7 +127,9 @@ pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) {
 }
 
 bool pn_connection_driver_has_event(pn_connection_driver_t *d) {
-  return pn_collector_peek(pn_connection_collector(d->connection));
+  /* FIXME aconway 2017-02-15: this is ugly */
+  pn_collector_t *c = pn_connection_collector(d->connection);
+  return pn_collector_more(c) || (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c));
 }
 
 bool pn_connection_driver_finished(pn_connection_driver_t *d) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/core/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index 41ff6d1..e213c8f 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -395,6 +395,8 @@ const char *pn_event_type_name(pn_event_type_t type)
     return "PN_PROACTOR_TIMEOUT";
    case PN_PROACTOR_INACTIVE:
     return "PN_PROACTOR_INACTIVE";
+   case PN_LISTENER_OPEN:
+    return "PN_LISTENER_OPEN";
    default:
     return "PN_UNKNOWN";
   }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index d17136a..0ccfcda 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -24,6 +24,7 @@
 #include <proton/condition.h>
 #include <proton/connection_driver.h>
 #include <proton/engine.h>
+#include <proton/listener.h>
 #include <proton/message.h>
 #include <proton/object.h>
 #include <proton/proactor.h>
@@ -40,37 +41,34 @@
 #include <string.h>
 
 /*
-  libuv loop functions are thread unsafe. The only exception is uv_async_send()
-  which is a thread safe "wakeup" that can wake the uv_loop from another thread.
+  libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
+  "wakeup" that can wake the uv_loop from another thread.
 
-  To provide concurrency the proactor uses a "leader-worker-follower" model,
-  threads take turns at the roles:
+  To provide concurrency proactor uses a "leader-worker-follower" model, threads take
+  turns at the roles:
 
-  - a single "leader" calls libuv functions and runs the uv_loop in short bursts
-    to generate work. When there is work available it gives up leadership and
-    becomes a "worker"
+  - a single "leader" thread uses libuv, it runs the uv_loop the in short bursts to
+  generate work. Once there is work it becomes becomes a "worker" thread, another thread
+  takes over as leader.
 
-  - "workers" handle events concurrently for distinct connections/listeners
-    They do as much work as they can get, when none is left they become "followers"
+  - "workers" handle events for separate connections or listeners concurrently. They do as
+  much work as they can, when none is left they become "followers"
 
-  - "followers" wait for the leader to generate work and become workers.
-    When the leader itself becomes a worker, one of the followers takes over.
+  - "followers" wait for the leader to generate work. One follower becomes the new leader,
+  the others become workers or continue to follow till they can get work.
 
-  This model is symmetric: any thread can take on any role based on run-time
-  requirements. It also allows the IO and non-IO work associated with an IO
-  wake-up to be processed in a single thread with no context switches.
+  Any thread in a pool can take on any role necessary at run-time. All the work generated
+  by an IO wake-up for a single connection can be processed in a single single worker
+  thread to minimize context switching.
 
   Function naming:
-  - on_* - called in leader thread by uv_run().
+  - on_* - called in leader thread via  uv_run().
   - leader_* - called in leader thread (either leader_q processing or from an on_ function)
-  - worker_* - called in worker thread
   - *_lh - called with the relevant lock held
 
   LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their
-  UV handles have received an on_close(). Freeing resources is always initiated by
-  uv_close() of the uv_tcp_t handle, and completed in on_close() handler functions when it
-  is safe. The only exception is when an error occurs that prevents a pn_connection_t or
-  pn_listener_t from being associated with a uv handle at all.
+  UV handles have received a close callback. Freeing resources is initiated by uv_close()
+  of the uv_tcp_t handle, and executed in an on_close() handler when it is safe.
 */
 
 const char *COND_NAME = "proactor";
@@ -82,12 +80,12 @@ const char *AMQPS_PORT_NAME = "amqps";
 PN_HANDLE(PN_PROACTOR)
 
 /* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
-   Class definitions are for identification as pn_event_t context only.
+   CLASSDEF is for identification when used as a pn_event_t context.
 */
 PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
-/* A psocket (connection or listener) has the following *mutually exclusive* states. */
+/* A psocket (connection or listener) has the following mutually exclusive states. */
 typedef enum {
   ON_WORKER,               /* On worker_q or in use by user code in worker thread  */
   ON_LEADER,               /* On leader_q or in use the leader loop  */
@@ -105,12 +103,11 @@ typedef struct psocket_t {
   void (*action)(struct psocket_t*); /* deferred action for leader */
   void (*wakeup)(struct psocket_t*); /* wakeup action for leader */
 
-  /* Only used by leader when it owns the psocket */
+  /* Only used by leader thread when it owns the psocket */
   uv_tcp_t tcp;
   char host[NI_MAXHOST];
   char port[NI_MAXSERV];
   bool is_conn;
-
 } psocket_t;
 
 /* Special value for psocket.next pointer when socket is not on any any list. */
@@ -138,6 +135,7 @@ static inline const char* fixstr(const char* str) {
   return str[0] == '\001' ? NULL : str;
 }
 
+/* Holds a psocket and a pn_connection_driver  */
 typedef struct pconnection_t {
   psocket_t psocket;
 
@@ -150,10 +148,11 @@ typedef struct pconnection_t {
   uv_write_t write;
   uv_shutdown_t shutdown;
   size_t writing;               /* size of pending write request, 0 if none pending */
-  bool reading;                 /* true if a read request is pending */
-  bool server;                  /* accept, not connect */
+  bool server;                  /* accepting not connecting */
 } pconnection_t;
 
+
+/* pn_listener_t with a psocket_t  */
 struct pn_listener_t {
   psocket_t psocket;
 
@@ -169,6 +168,8 @@ struct pn_listener_t {
 
   /* Only used in leader thread */
   size_t connections;           /* number of connections waiting to be accepted  */
+  int err;                      /* uv error code, 0 = OK, UV_EOF = closed */
+  const char *what;             /* static description string */
 };
 
 typedef struct queue { psocket_t *front, *back; } queue;
@@ -198,9 +199,9 @@ struct pn_proactor_t {
   bool batch_working;          /* batch is being processed in a worker thread */
 };
 
-static bool push_lh(queue *q, psocket_t *ps) {
-  if (ps->next != &UNLISTED)  /* Don't move if already listed. */
-    return false;
+/* Push ps to back of q. Must not be on a different queue */
+static void push_lh(queue *q, psocket_t *ps) {
+  assert(ps->next == &UNLISTED);
   ps->next = NULL;
   if (!q->front) {
     q->front = q->back = ps;
@@ -208,9 +209,9 @@ static bool push_lh(queue *q, psocket_t *ps) {
     q->back->next = ps;
     q->back =  ps;
   }
-  return true;
 }
 
+/* Pop returns front of q or NULL if empty */
 static psocket_t* pop_lh(queue *q) {
   psocket_t *ps = q->front;
   if (ps) {
@@ -220,29 +221,49 @@ static psocket_t* pop_lh(queue *q) {
   return ps;
 }
 
-/* Set state and action and push to relevant queue */
-static inline void set_state_lh(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
-  /* Illegal if ps is already listed under a different state */
-  assert(ps->next == &UNLISTED || ps->state == state);
-  ps->state = state;
-  if (action && !ps->action) {
-    ps->action = action;
+/* Queue an action for the leader thread */
+static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  ps->action = action;
+  if (ps->next == &UNLISTED) {
+    ps->state = ON_LEADER;
+    push_lh(&ps->proactor->leader_q, ps);
+  }
+  uv_mutex_unlock(&ps->proactor->lock);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+}
+
+/* Push to the worker thread */
+static void to_worker(psocket_t *ps) {
+  uv_mutex_lock(&ps->proactor->lock);
+  /* If already ON_WORKER do nothing */
+  if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+    ps->state = ON_WORKER;
+    push_lh(&ps->proactor->worker_q, ps);
   }
-  switch(state) {
-   case ON_LEADER: push_lh(&ps->proactor->leader_q, ps); break;
-   case ON_WORKER: push_lh(&ps->proactor->worker_q, ps); break;
-   case ON_UV:
-    assert(ps->next == &UNLISTED);
-    break;           /* No queue for UV loop */
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Set state to ON_UV */
+static void to_uv(psocket_t *ps) {
+  uv_mutex_lock(&ps->proactor->lock);
+  if (ps->next == &UNLISTED) {
+    ps->state = ON_UV;
   }
+  uv_mutex_unlock(&ps->proactor->lock);
 }
 
-/* Set state and action, push to queue and notify leader. Thread safe. */
-static void set_state(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
+/* Called in any thread to set a wakeup action */
+static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
   uv_mutex_lock(&ps->proactor->lock);
-  set_state_lh(ps, state, action);
-  uv_async_send(&ps->proactor->async);
+  ps->wakeup = action;
+  /* If ON_WORKER we'll do the wakeup in pn_proactor_done() */
+  if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+    push_lh(&ps->proactor->leader_q, ps);
+    ps->state = ON_LEADER;      /* Otherwise notify the leader */
+  }
   uv_mutex_unlock(&ps->proactor->lock);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
 }
 
 static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -308,7 +329,6 @@ static void on_close_pconnection_final(uv_handle_t *h) {
 
 /* Close event for uv_tcp_t of a psocket_t */
 static void on_close_psocket(uv_handle_t *h) {
-  /* No assert(ps->state == ON_UV); may be called in other states during shutdown. */
   psocket_t *ps = (psocket_t*)h->data;
   if (ps->is_conn) {
     leader_count(ps->proactor, -1);
@@ -329,29 +349,43 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
   return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
 }
 
-static void leader_unwatch(psocket_t *ps);
+static void pconnection_to_worker(pconnection_t *pc);
+static void listener_to_worker(pn_listener_t *l);
 
-static void leader_error(psocket_t *ps, int err, const char* what) {
-  assert(ps->state != ON_WORKER);
-  if (ps->is_conn) {
-    pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
+int pconnection_error(pconnection_t *pc, int err, const char* what) {
+  if (err) {
+    pn_connection_driver_t *driver = &pc->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
     pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
-                                what, fixstr(ps->host), fixstr(ps->port),
+                                what, fixstr(pc->psocket.host), fixstr(pc->psocket.port),
                                 uv_strerror(err));
     pn_connection_driver_close(driver);
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
-                        what, fixstr(ps->host), fixstr(ps->port),
-                        uv_strerror(err));
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
-    l->closing = true;
+    pconnection_to_worker(pc);
+  }
+  return err;
+}
+
+static int listener_error(pn_listener_t *l, int err, const char* what) {
+  if (err) {
+    l->err = err;
+    l->what = what;
+    listener_to_worker(l);
   }
-  leader_unwatch(ps);               /* Worker to handle the error */
+  return err;
 }
 
-/* uv-initialization */
+static int psocket_error(psocket_t *ps, int err, const char* what) {
+  if (err) {
+    if (ps->is_conn) {
+      pconnection_error(as_pconnection(ps), err, "initialization");
+    } else {
+      listener_error(as_listener(ps), err, "initialization");
+    }
+  }
+  return err;
+}
+
+/* psocket uv-initialization */
 static int leader_init(psocket_t *ps) {
   ps->state = ON_LEADER;
   leader_count(ps->proactor, +1);
@@ -365,9 +399,8 @@ static int leader_init(psocket_t *ps) {
         pc->timer.data = ps;
       }
     }
-  }
-  if (err) {
-    leader_error(ps, err, "initialization");
+  } else {
+    psocket_error(ps, err, "initialization");
   }
   return err;
 }
@@ -375,33 +408,27 @@ static int leader_init(psocket_t *ps) {
 /* Outgoing connection */
 static void on_connect(uv_connect_t *connect, int err) {
   pconnection_t *pc = (pconnection_t*)connect->data;
-  assert(pc->psocket.state == ON_UV);
   if (!err) {
-    leader_unwatch(&pc->psocket);
+    pconnection_to_worker(pc);
   } else {
-    leader_error(&pc->psocket, err, "on connect to");
+    pconnection_error(pc, err, "on connect to");
   }
 }
 
 /* Incoming connection ready to be accepted */
 static void on_connection(uv_stream_t* server, int err) {
-  /* Unlike most on_* functions, this one can be called by the leader thrad when the
+  /* Unlike most on_* functions, this one can be called by the leader thread when the
    * listener is ON_WORKER, because there's no way to stop libuv from calling
-   * on_connection() in leader_unwatch().  Just increase a counter and deal with it in the
-   * worker thread.
+   * on_connection().  Just increase a counter and generate events in to_worker.
    */
   pn_listener_t *l = (pn_listener_t*) server->data;
-  assert(l->psocket.state == ON_UV);
-  if (!err) {
-    ++l->connections;
-    leader_unwatch(&l->psocket);
-  } else {
-    leader_error(&l->psocket, err, "on connection from");
-  }
+  l->err = err;
+  if (!err) ++l->connections;
+  listener_to_worker(l);        /* If already ON_WORKER it will stay there */
 }
 
+/* FIXME aconway 2017-02-16:  listener events in unwatch*/
 static void leader_accept(pn_listener_t * l) {
-  assert(l->psocket.state == ON_UV);
   assert(l->accepting);
   pconnection_t *pc = l->accepting;
   l->accepting = NULL;
@@ -410,10 +437,10 @@ static void leader_accept(pn_listener_t * l) {
     err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
   }
   if (!err) {
-    leader_unwatch(&pc->psocket);
+    pconnection_to_worker(pc);
   } else {
-    leader_error(&pc->psocket, err, "accepting from");
-    leader_error(&l->psocket, err, "accepting from");
+    pconnection_error(pc, err, "accepting from");
+    listener_error(l, err, "accepting from");
   }
 }
 
@@ -440,7 +467,7 @@ static void leader_connect(psocket_t *ps) {
   if (!err) {
     ps->state = ON_UV;
   } else {
-    leader_error(ps, err, "connecting to");
+    psocket_error(ps, err, "connecting to");
   }
 }
 
@@ -457,32 +484,26 @@ static void leader_listen(psocket_t *ps) {
     err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
   }
   if (!err) {
-    set_state(ps, ON_UV, NULL);
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+    listener_to_worker(l);      /* Let worker see the OPEN event */
   } else {
-    leader_error(ps, err, "listening on");
+    listener_error(l, err, "listening on");
   }
 }
 
 /* Generate tick events and return millis till next tick or 0 if no tick is required */
 static pn_millis_t leader_tick(pconnection_t *pc) {
   assert(pc->psocket.state != ON_WORKER);
-  pn_transport_t *t = pc->driver.transport;
-  if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
-    uint64_t now = uv_now(pc->timer.loop);
-    uint64_t next = pn_transport_tick(t, now);
-    return next ? next - now : 0;
-  }
-  return 0;
+  uint64_t now = uv_now(pc->timer.loop);
+  uint64_t next = pn_transport_tick(pc->driver.transport, now);
+  return next ? next - now : 0;
 }
 
 static void on_tick(uv_timer_t *timer) {
-  if (!timer->data) return;     /* timer closed */
   pconnection_t *pc = (pconnection_t*)timer->data;
-  assert(pc->psocket.state == ON_UV);
-  uv_timer_stop(&pc->timer);
-  pn_millis_t next = leader_tick(pc);
+  pn_millis_t next = leader_tick(pc); /* May generate events */
   if (pn_connection_driver_has_event(&pc->driver)) {
-    leader_unwatch(&pc->psocket);
+    pconnection_to_worker(pc);
   } else if (next) {
     uv_timer_start(&pc->timer, on_tick, next, 0);
   }
@@ -490,31 +511,28 @@ static void on_tick(uv_timer_t *timer) {
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
-  assert(pc->psocket.state == ON_UV);
   if (nread >= 0) {
     pn_connection_driver_read_done(&pc->driver, nread);
-    leader_unwatch(&pc->psocket); /* Handle events */
+    pconnection_to_worker(pc);
   } else if (nread == UV_EOF) { /* hangup */
     pn_connection_driver_read_close(&pc->driver);
-    leader_unwatch(&pc->psocket);
+    pconnection_to_worker(pc);
   } else {
-    leader_error(&pc->psocket, nread, "on read from");
+    pconnection_error(pc, nread, "on read from");
   }
 }
 
 static void on_write(uv_write_t* write, int err) {
   pconnection_t *pc = (pconnection_t*)write->data;
-  assert(pc->psocket.state == ON_UV);
-  size_t writing = pc->writing;
-  pc->writing = 0;              /* This write is done regardless of outcome */
   if (err == 0) {
-    pn_connection_driver_write_done(&pc->driver, writing);
-    leader_unwatch(&pc->psocket);
+    pn_connection_driver_write_done(&pc->driver, pc->writing);
+    pconnection_to_worker(pc);
   } else if (err == UV_ECANCELED) {
-    leader_unwatch(&pc->psocket);    /* cancelled by leader_unwatch, complete the job */
+    pconnection_to_worker(pc);
   } else {
-    leader_error(&pc->psocket, err, "on write to");
+    pconnection_error(pc, err, "on write to");
   }
+  pc->writing = 0;
 }
 
 static void on_timeout(uv_timer_t *timer) {
@@ -527,88 +545,111 @@ static void on_timeout(uv_timer_t *timer) {
 // Read buffer allocation function for uv, just returns the transports read buffer.
 static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
-  assert(pc->psocket.state == ON_UV);
   pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
   *buf = uv_buf_init(rbuf.start, rbuf.size);
 }
 
-/* Monitor a socket in the UV loop */
-static void leader_watch(psocket_t *ps) {
-  assert(ps->state != ON_WORKER);
-  int err = 0;
-  set_state(ps, ON_UV, NULL); /* Assume we are going to UV loop unless sent to worker or leader. */
-
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection(ps);
-    if (pn_connection_driver_finished(&pc->driver)) {
-      uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
-      return;
+static void pconnection_to_uv(pconnection_t *pc) {
+  to_uv(&pc->psocket);          /* Assume we're going to UV unless sent elsewhere */
+  if (pn_connection_driver_finished(&pc->driver)) {
+    if (!uv_is_closing((uv_handle_t*)&pc->psocket)) {
+      uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
     }
-    pn_millis_t next_tick = leader_tick(pc);
-    pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      /* Ticks and checking buffers have generated events, send back to worker to process */
-      set_state(ps, ON_WORKER, NULL);
+    return;
+  }
+  pn_millis_t next_tick = leader_tick(pc);
+  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+  if (pn_connection_driver_has_event(&pc->driver)) {
+    to_worker(&pc->psocket);  /* Ticks/buffer checks generated events */
+    return;
+  }
+  if (next_tick &&
+      pconnection_error(pc, uv_timer_start(&pc->timer, on_tick, next_tick, 0), "timer start")) {
+    return;
+  }
+  if (wbuf.size > 0) {
+    uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+    if (pconnection_error(
+          pc, uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write), "write"))
       return;
+    pc->writing = wbuf.size;
+  } else if (pn_connection_driver_write_closed(&pc->driver)) {
+    pc->shutdown.data = &pc->psocket;
+    if (pconnection_error(
+          pc, uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL), "shutdown write"))
+      return;
+  }
+  if (rbuf.size > 0) {
+    if (pconnection_error(
+          pc, uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read), "read"))
+        return;
+  }
+}
+
+static void listener_to_uv(pn_listener_t *l) {
+  to_uv(&l->psocket);           /* Assume we're going to UV unless sent elsewhere */
+  if (l->err) {
+    if (!uv_is_closing((uv_handle_t*)&l->psocket.tcp)) {
+      uv_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
     }
-    if (next_tick) {
-      uv_timer_start(&pc->timer, on_tick, next_tick, 0);
-    }
-    if (wbuf.size > 0 && !pc->writing) {
-      pc->writing = wbuf.size;
-      uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-      err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-    } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
-      pc->shutdown.data = ps;
-      err = uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+  } else {
+    if (l->accepting) {
+      leader_accept(l);
     }
-    if (rbuf.size > 0 && !pc->reading) {
-      pc->reading = true;
-      err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+    if (l->connections) {
+      listener_to_worker(l);
     }
+  }
+}
+
+/* Monitor a psocket_t in the UV loop */
+static void psocket_to_uv(psocket_t *ps) {
+  if (ps->is_conn) {
+    pconnection_to_uv(as_pconnection(ps));
   } else {
-    pn_listener_t *l = as_listener(ps);
-    if (l->closing && pn_collector_peek(l->collector)) {
-      uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
-      return;
-    } else {
-      if (l->accepting) {
-        leader_accept(l);
-      }
-      if (l->connections) {
-        leader_unwatch(ps);
-      }
-    }
+    listener_to_uv(as_listener(ps));
   }
-  if (err) {
-    leader_error(ps, err, "re-watching");
+}
+
+/* Detach a connection from IO and put it on the worker queue */
+static void pconnection_to_worker(pconnection_t *pc) {
+  /* Can't go to worker if a write is outstanding or the batch is empty */
+  if (!pc->writing && pn_connection_driver_has_event(&pc->driver)) {
+    uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+    uv_timer_stop(&pc->timer);
   }
+  to_worker(&pc->psocket);
 }
 
-/* Detach a socket from IO and put it on the worker queue */
-static void leader_unwatch(psocket_t *ps) {
-  assert(ps->state != ON_WORKER); /* From ON_UV or ON_LEADER */
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection(ps);
-    if (!pn_connection_driver_has_event(&pc->driver)) {
-      /* Don't return an empty event batch, re-attach to UV loop */
-      leader_watch(ps);
-      return;
-    } else {
-      if (pc->writing) {
-        uv_cancel((uv_req_t*)&pc->write);
-      }
-      if (pc->reading) {
-        pc->reading = false;
-        uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
-      }
-      if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-        uv_timer_stop(&pc->timer);
-      }
+/* TODO aconway 2017-02-16: simplify collector API*/
+static bool collector_has_next(pn_collector_t *c) {
+  return pn_collector_more(c) ||
+    (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c));
+}
+
+/* Can't really detach a listener, as on_connection can always be called.
+   Generate events here safely.
+*/
+static void listener_to_worker(pn_listener_t *l) {
+  if (collector_has_next(l->collector)) { /* Already have events */
+    to_worker(&l->psocket);
+  } else if (l->err) {
+    if (l->err != UV_EOF) {
+      pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+                          l->what, fixstr(l->psocket.host), fixstr(l->psocket.port),
+                          uv_strerror(l->err));
     }
+    l->err = 0;
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+    to_worker(&l->psocket);
+  } else if (l->connections) {    /* Generate accept events one at a time */
+    --l->connections;
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+    to_worker(&l->psocket);
+  } else {
+    listener_to_uv(l);
   }
-  set_state(ps, ON_WORKER, NULL);
 }
 
 /* Set the event in the proactor's batch  */
@@ -618,7 +659,7 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
   return &p->batch;
 }
 
-/* Return the next event batch or 0 if no events are ready */
+/* Return the next event batch or 0 if no events are available in the worker_q */
 static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
   if (!p->batch_working) {       /* Can generate proactor events */
     if (p->inactive) {
@@ -637,33 +678,14 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
   for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
     assert(ps->state == ON_WORKER);
     if (ps->is_conn) {
-      pconnection_t *pc = as_pconnection(ps);
-      return &pc->driver.batch;
+      return &as_pconnection(ps)->driver.batch;
     } else {                    /* Listener */
-      pn_listener_t *l = as_listener(ps);
-      /* Generate accept events one at a time */
-      if (l->connections && !pn_collector_peek(l->collector)) {
-        --l->connections;
-        pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
-      }
-      return &l->batch;
+      return &as_listener(ps)->batch;
     }
-    set_state_lh(ps, ON_LEADER, NULL); /* No event, back to leader */
   }
   return 0;
 }
 
-/* Called in any thread to set a wakeup action */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (action && !ps->wakeup) {
-    ps->wakeup = action;
-  }
-  set_state_lh(ps, ON_LEADER, NULL);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
 pn_listener_t *pn_event_listener(pn_event_t *e) {
   return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
 }
@@ -689,26 +711,52 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
     assert(pc->psocket.state == ON_WORKER);
     if (pn_connection_driver_has_event(&pc->driver)) {
       /* Process all events before going back to leader */
-      set_state(&pc->psocket, ON_WORKER, NULL);
+      pconnection_to_worker(pc);
     } else {
-      set_state(&pc->psocket, ON_LEADER, leader_watch);
+      to_leader(&pc->psocket, psocket_to_uv);
     }
     return;
   }
   pn_listener_t *l = batch_listener(batch);
   if (l) {
     assert(l->psocket.state == ON_WORKER);
-    set_state(&l->psocket, ON_LEADER, leader_watch);
+    to_leader(&l->psocket, psocket_to_uv);
     return;
   }
   pn_proactor_t *bp = batch_proactor(batch);
   if (bp == p) {
     uv_mutex_lock(&p->lock);
     p->batch_working = false;
-    uv_async_send(&p->async); /* Wake leader */
     uv_mutex_unlock(&p->lock);
     return;
   }
+  uv_async_send(&p->async); /* Wake leader */
+}
+
+/* Process the leader_q, in the leader thread */
+static void leader_process_lh(pn_proactor_t *p) {
+  if (p->timeout_request) {
+    p->timeout_request = false;
+    if (p->timeout) {
+      uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+    } else {
+      uv_timer_stop(&p->timer);
+    }
+  }
+  for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+    assert(ps->state == ON_LEADER);
+    if (ps->wakeup) {
+      uv_mutex_unlock(&p->lock);
+      ps->wakeup(ps);
+      ps->wakeup = NULL;
+      uv_mutex_lock(&p->lock);
+    } else if (ps->action) {
+      uv_mutex_unlock(&p->lock);
+      ps->action(ps);
+      ps->action = NULL;
+      uv_mutex_lock(&p->lock);
+    }
+  }
 }
 
 /* Run follower/leader loop till we can return an event and be a worker */
@@ -724,28 +772,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
     /* Lead till there is work to do. */
     p->has_leader = true;
     while (batch == NULL) {
-      if (p->timeout_request) {
-        p->timeout_request = false;
-        if (p->timeout) {
-          uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
-        } else {
-          uv_timer_stop(&p->timer);
-        }
-      }
-      for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
-        assert(ps->state == ON_LEADER);
-        if (ps->wakeup) {
-          uv_mutex_unlock(&p->lock);
-          ps->wakeup(ps);
-          ps->wakeup = NULL;
-          uv_mutex_lock(&p->lock);
-        } else if (ps->action) {
-          uv_mutex_unlock(&p->lock);
-          ps->action(ps);
-          ps->action = NULL;
-          uv_mutex_lock(&p->lock);
-        }
-      }
+      leader_process_lh(p);
       batch = get_batch_lh(p);
       if (batch == NULL) {
         uv_mutex_unlock(&p->lock);
@@ -753,7 +780,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
         uv_mutex_lock(&p->lock);
       }
     }
-    /* Signal the next leader and return to work */
+    /* Signal the next leader and go to work */
     p->has_leader = false;
     uv_cond_signal(&p->cond);
   }
@@ -761,19 +788,36 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   return batch;
 }
 
+pn_event_batch_t *pn_proactor_grab(struct pn_proactor_t* p) {
+  uv_mutex_lock(&p->lock);
+  pn_event_batch_t *batch = get_batch_lh(p);
+  if (batch == NULL && !p->has_leader) {
+    /* If there is no leader, try a non-waiting lead to generate some work */
+    p->has_leader = true;
+    leader_process_lh(p);
+    uv_mutex_unlock(&p->lock);
+    uv_run(&p->loop, UV_RUN_NOWAIT);
+    uv_mutex_lock(&p->lock);
+    batch = get_batch_lh(p);
+    p->has_leader = false;
+  }
+  uv_mutex_unlock(&p->lock);
+  return batch;
+}
+
 void pn_proactor_interrupt(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
   ++p->interrupt;
-  uv_async_send(&p->async);   /* Interrupt the UV loop */
   uv_mutex_unlock(&p->lock);
+  uv_async_send(&p->async);   /* Interrupt the UV loop */
 }
 
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
   p->timeout_request = true;
-  uv_async_send(&p->async);   /* Interrupt the UV loop */
   uv_mutex_unlock(&p->lock);
+  uv_async_send(&p->async);   /* Interrupt the UV loop */
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -781,7 +825,7 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host,
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
-  set_state(&pc->psocket, ON_LEADER, leader_connect);
+  to_leader(&pc->psocket, leader_connect);
   return 0;
 }
 
@@ -789,7 +833,7 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, con
 {
   psocket_init(&l->psocket, p, false, host, port);
   l->backlog = backlog;
-  set_state(&l->psocket, ON_LEADER, leader_listen);
+  to_leader(&l->psocket, leader_listen);
   return 0;
 }
 
@@ -803,7 +847,7 @@ void leader_wake_connection(psocket_t *ps) {
   pconnection_t *pc = as_pconnection(ps);
   pn_connection_t *c = pc->driver.connection;
   pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-  leader_unwatch(ps);
+  pconnection_to_worker(pc);
 }
 
 void pn_connection_wake(pn_connection_t* c) {
@@ -850,11 +894,17 @@ void pn_proactor_free(pn_proactor_t *p) {
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
   assert(l->psocket.state == ON_WORKER);
+  pn_event_t *prev = pn_collector_prev(l->collector);
+  if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
+    l->err = UV_EOF;
+  }
   return pn_collector_next(l->collector);
 }
 
 static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
-  return pn_collector_next(batch_proactor(batch)->collector);
+  pn_proactor_t *p = batch_proactor(batch);
+  assert(p->batch_working);
+  return pn_collector_next(p->collector);
 }
 
 static void pn_listener_free(pn_listener_t *l) {
@@ -867,7 +917,7 @@ static void pn_listener_free(pn_listener_t *l) {
   }
 }
 
-pn_listener_t *pn_listener() {
+pn_listener_t *pn_listener(void) {
   pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
   if (l) {
     l->batch.next_event = listener_batch_next;
@@ -885,8 +935,8 @@ pn_listener_t *pn_listener() {
 void leader_listener_close(psocket_t *ps) {
   assert(ps->state = ON_LEADER);
   pn_listener_t *l = (pn_listener_t*)ps;
-  l->closing = true;
-  leader_watch(ps);
+  l->err = UV_EOF;
+  listener_to_uv(l);
 }
 
 void pn_listener_close(pn_listener_t* l) {
@@ -895,12 +945,10 @@ void pn_listener_close(pn_listener_t* l) {
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
-  assert(l->psocket.state == ON_WORKER);
   return l ? l->psocket.proactor : NULL;
 }
 
 pn_condition_t* pn_listener_condition(pn_listener_t* l) {
-  assert(l->psocket.state == ON_WORKER);
   return l->condition;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index 59c7665..70b30a0 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -20,15 +20,15 @@
 add_definitions(${COMPILE_WARNING_FLAGS} ${COMPILE_PLATFORM_FLAGS})
 
 if (ENABLE_VALGRIND AND VALGRIND_EXE)
-  set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=1 --quiet
+  set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=42 --quiet
                    --leak-check=full --trace-children=yes)
 endif ()
 
-macro (pn_add_c_test test file)
-  add_executable (${test} ${file})
+macro (pn_add_c_test test)
+  add_executable (${test} ${ARGN})
   target_link_libraries (${test} qpid-proton)
   if (BUILD_WITH_CXX)
-    set_source_files_properties (${file} PROPERTIES LANGUAGE CXX)
+    set_source_files_properties (${ARGN} PROPERTIES LANGUAGE CXX)
   endif (BUILD_WITH_CXX)
   if (CMAKE_SYSTEM_NAME STREQUAL Windows)
     add_test (NAME ${test}
@@ -49,3 +49,4 @@ pn_add_c_test (c-reactor-tests reactor.c)
 pn_add_c_test (c-event-tests event.c)
 pn_add_c_test (c-data-tests data.c)
 pn_add_c_test (c-condition-tests condition.c)
+pn_add_c_test (c-proactor-tests proactor.c)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
new file mode 100644
index 0000000..ae5b1f6
--- /dev/null
+++ b/proton-c/src/tests/proactor.c
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_tools.h"
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <stdlib.h>
+#include <string.h>
+
+static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */
+
+static const char *localhost = "127.0.0.1"; /* host for connect/listen */
+
+/* Wait for the next single event, return its type */
+static pn_event_type_t wait_next(pn_proactor_t *proactor) {
+  pn_event_batch_t *events = pn_proactor_wait(proactor);
+  pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
+  pn_proactor_done(proactor, events);
+  return etype;
+}
+
+/* Get events until an event of `type` or a PN_TRANSPORT_CLOSED/PN_PROACTOR_TIMEOUT */
+static pn_event_type_t wait_for(pn_proactor_t *proactor, pn_event_type_t etype) {
+  while (true) {
+    pn_event_type_t t = wait_next(proactor);
+    if (t == etype || t == PN_PROACTOR_TIMEOUT) {
+      return t;
+    }
+  }
+}
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
+static void test_interrupt_timeout(test_t *t) {
+  pn_proactor_t *p = pn_proactor();
+  pn_proactor_interrupt(p);
+  pn_event_type_t etype = wait_next(p);
+  TEST_CHECK(t, PN_PROACTOR_INTERRUPT == etype, pn_event_type_name(etype));
+  pn_proactor_set_timeout(p, 1); /* very short timeout */
+  etype = wait_next(p);
+  TEST_CHECK(t, PN_PROACTOR_TIMEOUT == etype, pn_event_type_name(etype));
+  pn_proactor_free(p);
+}
+
+/* Test handler return value  */
+typedef enum {
+  H_CONTINUE,                   /**@<< handler wants more events */
+  H_FINISHED,                   /**@<< handler completed without error */
+  H_FAILED                      /**@<< handler hit an error and cannot continue */
+} handler_state_t;
+
+typedef handler_state_t (*test_handler_fn)(test_t *, pn_event_t*);
+
+/* Proactor and handler that take part in a test */
+typedef struct proactor_test_t {
+  test_t *t;
+  test_handler_fn handler;
+  pn_proactor_t *proactor;
+  handler_state_t state;                    /* Result of last handler call */
+} proactor_test_t;
+
+
+/* Initialize an array of proactor_test_t */
+static void proactor_test_init(proactor_test_t *pts, size_t n) {
+  for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+    if (!pt->proactor) pt->proactor = pn_proactor();
+    pn_proactor_set_timeout(pt->proactor, timeout);
+    pt->state = H_CONTINUE;
+  }
+}
+
+/* Iterate over an array of proactors, draining or handling events with the non-blocking
+   pn_proactor_grab.  Continue till all handlers return H_FINISHED (and return 0) or one
+   returns H_FAILED  (and return non-0)
+*/
+int proactor_test_run(proactor_test_t *pts, size_t n) {
+  /* Make sure pts are initialized */
+  proactor_test_init(pts, n);
+  size_t finished = 0;
+  do {
+    finished = 0;
+    for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+      pn_event_batch_t *events = pn_proactor_grab(pt->proactor);
+      if (events) {
+          pn_event_t *e;
+          while ((e = pn_event_batch_next(events))) {
+            if (pt->state == H_CONTINUE) {
+              pt->state = pt->handler(pt->t, e);
+            }
+          }
+          pn_proactor_done(pt->proactor, events);
+      }
+      switch (pt->state) {
+       case H_CONTINUE: break;
+       case H_FINISHED: ++finished; break;
+       case H_FAILED: return 1;
+      }
+    }
+  } while (finished < n);
+  return 0;
+}
+
+
+/* Simple test of client connect to a listening server */
+handler_state_t listen_connect_server(test_t *t, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+    /* Ignore these events */
+   case PN_LISTENER_OPEN:
+   case PN_CONNECTION_LOCAL_OPEN:
+   case PN_CONNECTION_REMOTE_OPEN:
+   case PN_CONNECTION_BOUND:
+    return H_CONTINUE;
+
+    /* Act on these events */
+   case PN_LISTENER_ACCEPT:
+     pn_listener_accept(pn_event_listener(e), pn_connection());
+     return H_CONTINUE;
+   case PN_CONNECTION_INIT:
+    pn_connection_open(pn_event_connection(e));
+    return H_CONTINUE;
+   case PN_CONNECTION_REMOTE_CLOSE:
+    return H_FINISHED;
+
+   default:
+    TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+    return H_FAILED;
+    break;
+  }
+}
+
+handler_state_t listen_connect_client(test_t *t, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+    /* Ignore these events */
+   case PN_CONNECTION_LOCAL_OPEN:
+   case PN_CONNECTION_BOUND:
+    return H_CONTINUE;
+
+    /* Act on these events */
+   case PN_CONNECTION_INIT:
+    pn_connection_open(pn_event_connection(e));
+    return H_CONTINUE;
+   case PN_CONNECTION_REMOTE_OPEN:
+    pn_connection_close(pn_event_connection(e));
+    return H_FINISHED;
+
+    /* Unexpected events */
+   default:
+    TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+    return H_FAILED;
+    break;
+  }
+}
+
+/* Simplest client/server interaction */
+static void test_listen_connect(test_t *t) {
+  proactor_test_t pts[] =  { { t, listen_connect_client }, { t, listen_connect_server } };
+  proactor_test_t *client = &pts[0], *server = &pts[1];
+  proactor_test_init(pts, 2);
+
+  int port = pick_port();
+  char port_str[16];
+  snprintf(port_str, sizeof(port_str), "%d", port);
+  pn_proactor_listen(server->proactor, pn_listener(), localhost, port_str, 4);
+  pn_event_type_t etype = wait_for(server->proactor, PN_LISTENER_OPEN);
+  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+    pn_proactor_connect(client->proactor, pn_connection(), localhost, port_str);
+    proactor_test_run(pts, 2);
+  }
+  pn_proactor_free(client->proactor);
+  pn_proactor_free(server->proactor);
+}
+
+/* Test error handling */
+static void test_listen_connect_error(test_t *t) {
+  pn_proactor_t *p = pn_proactor();
+  pn_proactor_set_timeout(p, timeout); /* In case of hang */
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect(p, c, "nosuchost", "nosuchport");
+  pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
+  TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
+  TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
+
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(p, l, "nosuchost", "nosuchport", 1);
+  etype = wait_for(p, PN_LISTENER_CLOSE);
+  TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
+  TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
+
+  pn_proactor_free(p);
+}
+
+int main(int argv, char** argc) {
+  int failed = 0;
+  RUN_TEST(failed, t, test_interrupt_timeout(&t));
+  RUN_TEST(failed, t, test_listen_connect(&t));
+  RUN_TEST(failed, t, test_listen_connect_error(&t));
+  return failed;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
new file mode 100644
index 0000000..047ab42
--- /dev/null
+++ b/proton-c/src/tests/test_tools.h
@@ -0,0 +1,140 @@
+#ifndef TESTS_TEST_TOOLS_H
+#define TESTS_TEST_TOOLS_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/type_compat.h>
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+/* Call via ASSERT macro. */
+static void assert_fail_(const char* cond, const char* file, int line) {
+  printf("%s:%d: Assertion failed: %s\n", file, line, cond);
+  abort();
+}
+
+/* Unconditional assert (does not depend on NDEBUG) for tests. */
+#define ASSERT(expr)                                            \
+  ((expr) ?  (void)0 : assert_fail_(#expr, __FILE__, __LINE__))
+
+/* Call via macro ASSERT_PERROR */
+static void assert_perror_fail_(const char* cond, const char* file, int line) {
+  perror(cond);
+  printf("%s:%d: Assertion failed (error above): %s\n", file, line, cond);
+  abort();
+}
+
+/* Like ASSERT but also calls perror() to print the current errno error. */
+#define ASSERT_PERROR(expr)                                             \
+  ((expr) ?  (void)0 : assert_perror_fail_(#expr, __FILE__, __LINE__))
+
+
+/* A struct to collect the results of a test.
+ * Declare and initialize with TEST_START(t) where t will be declared as a test_t
+ */
+typedef struct test_t {
+  const char* name;
+  int errors;
+} test_t;
+
+/* if !expr print the printf-style error and increment t->errors. Use via macros. Returns expr. */
+static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) {
+  if (!expr) {
+    va_list ap;
+    va_start(ap, fmt);
+    fprintf(stderr, "%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr);
+    if (fmt && *fmt) {
+      fprintf(stderr, " - ");
+      vfprintf(stderr, fmt, ap);
+    }
+    fprintf(stderr, "\n");
+    fflush(stderr);
+    ++t->errors;
+  }
+  return expr;
+}
+
+#define TEST_CHECK(TEST, EXPR, ...) test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+
+/* T is name of a test_t variable, EXPR is the test expression (which should update T)
+   FAILED is incremented if the test has errors
+*/
+#define RUN_TEST(FAILED, T, EXPR) do {                          \
+    printf("TEST: %s\n", #EXPR);                                \
+    fflush(stdout);                                             \
+    test_t T = { #EXPR, 0 };                                    \
+    (EXPR);                                                     \
+    if (T.errors) {                                             \
+      printf("FAIL: %s (%d errors)\n", #EXPR, T.errors);        \
+      ++(FAILED);                                               \
+    }                                                           \
+  } while(0)
+
+#if defined(WIN32)
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+typedef SOCKET sock_t;
+static inline void sock_close(sock_t sock) { closesocket(sock); }
+
+#else
+
+#include <netdb.h>
+#include <netinet/in.h>
+#include <time.h>
+#include <unistd.h>
+
+static int port_in_use(int port) {
+  /* Attempt to bind a dummy socket to test if the port is in use. */
+  int dummy_socket = socket(AF_INET, SOCK_STREAM, 0);
+  ASSERT_PERROR(dummy_socket >= 0);
+  struct sockaddr_in addr = {0};
+  addr.sin_family = AF_INET;
+  addr.sin_addr.s_addr = INADDR_ANY;
+  addr.sin_port = htons(port);
+  int ret = bind(dummy_socket, (struct sockaddr *) &addr, sizeof(addr));
+  close(dummy_socket);
+  return ret < 0;
+}
+
+/* Try to pick an unused port by picking random ports till we find one
+   that is not in use. This is not foolproof as some other process may
+   grab it before the caller binds or connects.
+*/
+static int pick_port(void) {
+  srand(time(NULL));
+  static int MAX_TRIES = 10;
+  int port = -1;
+  int i = 0;
+  do {
+    /* Pick a random port. Avoid the standard OS ephemeral port range used by
+       bind(0) - ports can be allocated and re-allocated very rapidly there.
+    */
+    port =  (rand()%10000) + 10000;
+  } while (i++ < MAX_TRIES && port_in_use(port));
+  ASSERT(i < MAX_TRIES && "cannot pick a port");
+  return port;
+}
+
+#endif
+
+#endif // TESTS_TEST_TOOLS_H


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org