You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/24 19:18:27 UTC

[2/2] qpid-proton git commit: PROTON-1445: Change proactor ownership model

PROTON-1445: Change proactor ownership model

Changed ownership model for pn_connection_t and pn_listener_t managed by the
proactor: instead of proactor freeing automatically after the final event, the
user must free on or after the final event.

There are 2 basic use cases:

1. Free connection/listener immediately on the last event.

2. Keep connection/listener in memory until all application pointers are cleaned up.

Proactor ownership does 1. very well, but makes 2. very difficult without
exposing reference-counts.

User ownership supports both reasonably easily: 1. is implemented by calling
pn_connection_free() or pn_listener_free() in the event handler on the last
event.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a85c89ac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a85c89ac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a85c89ac

Branch: refs/heads/master
Commit: a85c89ac8c1b8a19fae0b32cab6549e8d9d1ce24
Parents: 432fcb5
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 23 19:13:15 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 24 15:17:58 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/listener.h |  12 ++-
 proton-c/include/proton/proactor.h |   4 +-
 proton-c/src/proactor/libuv.c      |  20 +++-
 proton-c/src/tests/proactor.c      | 157 +++++++++++++++++---------------
 proton-c/src/tests/test_tools.h    |  19 +---
 5 files changed, 115 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a85c89ac/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index 6646dd1..bf52150 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -39,17 +39,21 @@ extern "C" {
  */
 
 /**
- * Create a listener.
+ * Create a listener to pass to pn_proactor_listen()
+ *
+ * Must be freed with pn_listener_free()
  *
  * You can use pn_listener_set_context() or pn_listener_attachments() to set
  * application data that can be accessed when accepting connections.
- *
- * You must pass the returned listener to pn_proactor_listen(), the proactor
- * will free the listener when it is no longer active.
  */
 PNP_EXTERN pn_listener_t *pn_listener(void);
 
 /**
+ * Free a listener. Must not be in use, see pn_proactor_listen()
+ */
+PNP_EXTERN void pn_listener_free(pn_listener_t *l);
+
+/**
  * Asynchronously accept a connection using the listener.
  *
  * @param[in] connection the listener takes ownership, do not free.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a85c89ac/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 345e3fc..974b432 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -65,7 +65,7 @@ PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
  * returned by pn_proactor_wait()
  *
  * @param[in] proactor the proactor object
- * @param[in] connection the proactor takes ownership, do not free
+ * @param[in] connection must not be freed until after the final PN_TRANSPORT_CLOSED event or pn_proactor_free()
  * @param[in] addr the network address (not AMQP address) to connect to. May
  * be in the form "host:port" or an "amqp://" or "amqps://" URL. The `/path` part of
  * the URL is ignored.
@@ -86,7 +86,7 @@ PNP_EXTERN int pn_proactor_connect(
  *
  *
  * @param[in] proactor the proactor object
- * @param[in] listener proactor takes ownership of listener, do not free
+ * @param[in] listener must not be freed until after the final PN_LISTENER_CLOSE event or pn_proactor_free()
  * @param[in] addr the network address (not AMQP address) to connect to in "host:port"
  *
  * The host can be a host name, IPV4 or IPV6 literal, or the empty string. The empty

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a85c89ac/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 1ba840a..728ba7d 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -215,6 +215,7 @@ struct pn_listener_t {
   pn_collector_t *collector;
   pconnection_queue_t accept;   /* pconnection_t list for accepting */
   listener_state state;
+  int refcount;                 /* Free when proactor and user are done */
 };
 
 typedef enum { TM_NONE, TM_REQUEST, TM_PENDING, TM_FIRED } timeout_state_t;
@@ -304,6 +305,7 @@ static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool ser
   if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     return NULL;
   }
+  pn_incref(c);                 /* User owns original ref, make one for the proactor */
   work_init(&pc->work, p,  T_CONNECTION);
   pc->next = pconnection_unqueued;
   pc->write.data = &pc->work;
@@ -366,10 +368,16 @@ static void pconnection_free(pconnection_t *pc) {
   if (pc->addr.getaddrinfo.addrinfo) {
     uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Interrupted after resolve */
   }
-  pn_incref(pc);                /* Make sure we don't do a circular free */
+  /* Don't let driver_destroy call pn_connection_free(), the user does that */
+  pn_connection_t *c = pc->driver.connection;
+  pc->driver.connection = NULL;
+  pn_collector_t *collector = pn_connection_collector(c);
+  if (collector) {
+    pn_collector_release(collector); /* Break circular refs */
+  }
   pn_connection_driver_destroy(&pc->driver);
-  pn_decref(pc);
-  /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+  /* The user either has or will call pn_connection_free(), drop our ref. */
+  pn_decref(c);
 }
 
 /* Final close event for for a pconnection_t, disconnects from proactor */
@@ -628,8 +636,8 @@ static void leader_listen_lh(pn_listener_t *l) {
   }
 }
 
-static void pn_listener_free(pn_listener_t *l) {
-  if (l) {
+void pn_listener_free(pn_listener_t *l) {
+  if (l && --l->refcount == 0) {
     if (l->addr.getaddrinfo.addrinfo) { /* Interrupted after resolve */
       uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo);
     }
@@ -1042,6 +1050,7 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int
   work_init(&l->work, p, T_LISTENER);
   parse_addr(&l->addr, addr);
   l->backlog = backlog;
+  ++l->refcount;
   work_start(&l->work);
   return 0;
 }
@@ -1128,6 +1137,7 @@ void pn_connection_wake(pn_connection_t* c) {
 pn_listener_t *pn_listener(void) {
   pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
   if (l) {
+    l->refcount = 1;
     l->batch.next_event = listener_batch_next;
     l->collector = pn_collector();
     l->condition = pn_condition();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a85c89ac/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 34a1880..f0431cb 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -32,7 +32,10 @@ static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
 
 static const char *localhost = "127.0.0.1"; /* host for connect/listen */
 
-typedef pn_event_t *(*test_handler_fn)(test_t *, pn_event_t*);
+typedef pn_event_type_t (*test_handler_fn)(test_t *, pn_event_t*);
+
+/* Save the last condition description of a handled event here  */
+char last_condition[1024] = {0};
 
 /* Proactor and handler that take part in a test */
 typedef struct proactor_test_t {
@@ -61,18 +64,34 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) {
 
 #define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
 
+static void save_condition(pn_event_t *e) {
+  /* FIXME aconway 2017-03-23: extend pn_event_condition to include listener */
+  last_condition[0] = '\0';
+  pn_condition_t *cond = NULL;
+  if (pn_event_listener(e)) {
+    cond = pn_listener_condition(pn_event_listener(e));
+  } else {
+    cond = pn_event_condition(e);
+  }
+  if (cond && pn_condition_is_set(cond)) {
+    const char *desc = pn_condition_get_description(cond);
+    strncpy(last_condition, desc, sizeof(last_condition));
+  }
+}
+
 /* Process events on a proactor array until a handler returns an event, or
  * all proactors return NULL
  */
-static pn_event_t *proactor_test_get(proactor_test_t *pts, size_t n) {
+static pn_event_type_t proactor_test_get(proactor_test_t *pts, size_t n) {
   while (true) {
     bool busy = false;
     for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
       pn_event_batch_t *eb =  pn_proactor_get(pt->proactor);
       if (eb) {
         busy = true;
-        pn_event_t *ret = NULL;
+        pn_event_type_t ret = PN_EVENT_NONE;
         for (pn_event_t* e = pn_event_batch_next(eb); e; e = pn_event_batch_next(eb)) {
+          save_condition(e);
           ret = pt->handler(pt->t, e);
           if (ret) break;
         }
@@ -81,15 +100,15 @@ static pn_event_t *proactor_test_get(proactor_test_t *pts, size_t n) {
       }
     }
     if (!busy) {
-      return NULL;
+      return PN_EVENT_NONE;
     }
   }
 }
 
 /* Run an array of proactors till a handler returns an event. */
-static pn_event_t *proactor_test_run(proactor_test_t *pts, size_t n) {
-  pn_event_t *e;
-  while ((e = proactor_test_get(pts, n)) == NULL)
+static pn_event_type_t proactor_test_run(proactor_test_t *pts, size_t n) {
+  pn_event_type_t e;
+  while ((e = proactor_test_get(pts, n)) == PN_EVENT_NONE)
          ;
   return e;
 }
@@ -139,31 +158,37 @@ static void test_interrupt_timeout(test_t *t) {
 }
 
 /* Common handler for simple client/server interactions,  */
-static pn_event_t *common_handler(test_t *t, pn_event_t *e) {
+static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
   pn_connection_t *c = pn_event_connection(e);
   pn_listener_t *l = pn_event_listener(e);
 
   switch (pn_event_type(e)) {
 
-    /* Stop on these events */
+    /* Cleanup events */
    case PN_LISTENER_CLOSE:
+    pn_listener_free(pn_event_listener(e));
+    return PN_LISTENER_CLOSE;
+   case PN_TRANSPORT_CLOSED:
+    pn_connection_free(pn_event_connection(e));
+    return PN_TRANSPORT_CLOSED;
+
+    /* Stop on these events */
    case PN_LISTENER_OPEN:
    case PN_PROACTOR_INACTIVE:
    case PN_PROACTOR_TIMEOUT:
-   case PN_TRANSPORT_CLOSED:
-    return e;
+    return pn_event_type(e);
 
    case PN_LISTENER_ACCEPT:
     pn_listener_accept(l, pn_connection());
-    return NULL;
+    return PN_EVENT_NONE;
 
    case PN_CONNECTION_REMOTE_OPEN:
     pn_connection_open(c);      /* Return the open (no-op if already open) */
-    return NULL;
+    return PN_EVENT_NONE;
 
    case PN_CONNECTION_REMOTE_CLOSE:
     pn_connection_close(c);     /* Return the close */
-    return NULL;
+    return PN_EVENT_NONE;
 
     /* Ignored these events */
    case PN_CONNECTION_INIT:
@@ -174,20 +199,20 @@ static pn_event_t *common_handler(test_t *t, pn_event_t *e) {
    case PN_TRANSPORT_ERROR:
    case PN_TRANSPORT_HEAD_CLOSED:
    case PN_TRANSPORT_TAIL_CLOSED:
-    return NULL;
+    return PN_EVENT_NONE;
 
    default:
     TEST_ERRORF(t, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
-    return NULL;                   /* Fail the test but keep going */
+    return PN_EVENT_NONE;                   /* Fail the test but keep going */
   }
 }
 
 /* close a connection when it is remote open */
-static pn_event_t *open_close_handler(test_t *t, pn_event_t *e) {
+static pn_event_type_t open_close_handler(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
    case PN_CONNECTION_REMOTE_OPEN:
     pn_connection_close(pn_event_connection(e));
-    return NULL;          /* common_handler will finish on TRANSPORT_CLOSED */
+    return PN_EVENT_NONE;          /* common_handler will finish on TRANSPORT_CLOSED */
    default:
     return common_handler(t, e);
   }
@@ -200,28 +225,28 @@ static void test_client_server(test_t *t) {
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port(localhost);
   pn_proactor_listen(server, pn_listener(), port.host_port, 4);
-  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
   /* Connect and wait for close at both ends */
   pn_proactor_connect(client, pn_connection(), port.host_port);
-  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   /* Connect and wait for close at both ends */
   pn_proactor_connect(client, pn_connection(), port.host_port);
-  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
 
   PROACTOR_TEST_FREE(pts);
 }
 
 /* Return on connection open, close and return on wake */
-static pn_event_t *open_wake_handler(test_t *t, pn_event_t *e) {
+static pn_event_type_t open_wake_handler(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
    case PN_CONNECTION_REMOTE_OPEN:
-    return e;
+    return pn_event_type(e);
    case PN_CONNECTION_WAKE:
     pn_connection_close(pn_event_connection(e));
-    return e;
+    return pn_event_type(e);
    default:
     return common_handler(t, e);
   }
@@ -234,17 +259,17 @@ static void test_connection_wake(test_t *t) {
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port(localhost);          /* Hold a port */
   pn_proactor_listen(server, pn_listener(), port.host_port, 4);
-  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
 
   pn_connection_t *c = pn_connection();
   pn_incref(c);                 /* Keep c alive after proactor frees it */
   pn_proactor_connect(client, c, port.host_port);
-  TEST_EVENT_TYPE(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
   pn_connection_wake(c);
-  TEST_EVENT_TYPE(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
-  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
     /* The pn_connection_t is still valid so wake is legal but a no-op */
   pn_connection_wake(c);
 
@@ -263,21 +288,21 @@ static void test_inactive(test_t *t) {
 
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, port.host_port,  4);
-  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, port.host_port);
-  TEST_EVENT_TYPE(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_wake(c);
-  TEST_EVENT_TYPE(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
   /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
-  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
   /* server won't be INACTIVE until listener is closed */
   TEST_CHECK(t, pn_proactor_get(server) == NULL);
   pn_listener_close(l);
-  TEST_EVENT_TYPE(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   sock_close(port.sock);
   PROACTOR_TEST_FREE(pts);
@@ -293,32 +318,28 @@ static void test_errors(test_t *t) {
   /* Invalid connect/listen parameters */
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, "127.0.0.1:xxx");
-  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_CHECK_COND(t, "xxx", pn_transport_condition(pn_connection_transport(c)));
-  TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_STR_IN(t, "xxx", last_condition);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, "127.0.0.1:xxx", 1);
-  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
-  TEST_EVENT_TYPE(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  TEST_CHECK_COND(t, "xxx", pn_listener_condition(l));
-  TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  TEST_STR_IN(t, "xxx", last_condition);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   /* Connect with no listener */
   c = pn_connection();
   pn_proactor_connect(client, c, port.host_port);
-  if (TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts))) {
-    TEST_CHECK_COND(t, "connection refused", pn_transport_condition(pn_connection_transport(c)));
-    TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts))) {
+    TEST_STR_IN(t, "connection refused", last_condition);
+    TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
     sock_close(port.sock);
     PROACTOR_TEST_FREE(pts);
   }
 }
 
-static inline const char *event_listener_desc(pn_event_t *e) {
-  return pn_condition_get_description(pn_listener_condition(pn_event_listener(e)));
-}
-
 /* Test that we can control listen/select on ipv6/v4 and listen on both by default */
 static void test_ipv4_ipv6(test_t *t) {
   proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
@@ -328,48 +349,42 @@ static void test_ipv4_ipv6(test_t *t) {
   /* Listen on all interfaces for IPv6 only. If this fails, skip IPv6 tests */
   test_port_t port6 = test_port("[::]");
   pn_proactor_listen(server, pn_listener(), port6.host_port, 4);
-  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port6.sock);
-  pn_event_t *e = PROACTOR_TEST_GET(pts);
-  bool has_ipv6 = (pn_event_type(e) != PN_LISTENER_CLOSE);
+  pn_event_type_t e = PROACTOR_TEST_GET(pts);
+  bool has_ipv6 = (e != PN_LISTENER_CLOSE);
   if (!has_ipv6) {
-    TEST_LOGF(t, "skip IPv6 tests: %s", event_listener_desc(e));
+    TEST_LOGF(t, "skip IPv6 tests: %s", last_condition);
   }
   PROACTOR_TEST_DRAIN(pts);
 
   /* Listen on all interfaces for IPv4 only. */
   test_port_t port4 = test_port("0.0.0.0");
   pn_proactor_listen(server, pn_listener(), port4.host_port, 4);
-  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port4.sock);
-  e = PROACTOR_TEST_GET(pts);
-  if (pn_event_type(e) == PN_LISTENER_CLOSE) {
-    TEST_ERRORF(t, "listener error: %s",  event_listener_desc(e));
-  }
+  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);
   PROACTOR_TEST_DRAIN(pts);
 
   /* Empty address listens on both IPv4 and IPv6 on all interfaces */
   test_port_t port = test_port("");
   pn_proactor_listen(server, pn_listener(), port.host_port, 4);
-  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
   e = PROACTOR_TEST_GET(pts);
-  if (pn_event_type(e) == PN_LISTENER_CLOSE) {
-    TEST_ERRORF(t, "listener error: %s",  event_listener_desc(e));
-  }
-  PROACTOR_TEST_DRAIN(pts);
+  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);   PROACTOR_TEST_DRAIN(pts);
 
 #define EXPECT_CONNECT(TP, HOST) do {                                   \
     pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
-    pn_event_t *e = TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \
-    if (e) TEST_CHECK_NO_COND(t, pn_transport_condition(pn_event_transport(e))); \
+    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));    \
+    TEST_STR_EQUAL(t, "", last_condition);                              \
     PROACTOR_TEST_DRAIN(pts);                                           \
   } while(0)
 
 #define EXPECT_FAIL(TP, HOST) do {                                      \
     pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
-    pn_event_t *e = TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \
-    if (e) TEST_CHECK_COND(t, "refused", pn_transport_condition(pn_event_transport(e))); \
+    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));    \
+    TEST_STR_IN(t, "refused", last_condition);                          \
     PROACTOR_TEST_DRAIN(pts);                                           \
   } while(0)
 
@@ -399,7 +414,7 @@ static void test_free_cleanup(test_t *t) {
   test_port_t ports[3] = { test_port(localhost), test_port(localhost), test_port(localhost) };
   for (int i = 0; i < 3; ++i) {
     pn_proactor_listen(server, pn_listener(), ports[i].host_port, 2);
-    TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+    TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
     sock_close(ports[i].sock);
     pn_proactor_connect(client, pn_connection(), ports[i].host_port);
     pn_proactor_connect(client, pn_connection(), ports[i].host_port);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a85c89ac/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 26f3b35..0c913ff 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -130,22 +130,11 @@ static inline bool test_etype_equal_(test_t *t, int want, int got, const char *f
                      pn_event_type_name((pn_event_type_t)got));
 }
 
-#define TEST_CHECK_COND(T, WANT, COND) do {                             \
-    pn_condition_t *cond = (COND);                                      \
-    if (TEST_CHECKF((T), pn_condition_is_set(cond), "expecting error")) { \
-      const char* description = pn_condition_get_description(cond);     \
-      if (!strstr(description, (WANT))) {                               \
-        TEST_ERRORF((T), "expected '%s' in '%s'", (WANT), description); \
-      }                                                                 \
-    }                                                                   \
-  } while(0)
+#define TEST_STR_EQUAL(TEST, WANT, GOT) \
+  TEST_CHECKF((TEST), !strcmp((WANT), (GOT)), " got '%s'", (GOT))
 
-#define TEST_CHECK_NO_COND(T, COND) do {                                \
-    pn_condition_t *cond = (COND);                                      \
-    if (cond && pn_condition_is_set(cond)) {                            \
-      TEST_ERRORF((T), "unexpected condition: %s", pn_condition_get_description(cond)); \
-    }                                                                   \
-  } while(0)
+#define TEST_STR_IN(TEST, WANT, GOT) \
+  TEST_CHECKF((TEST), strstr((GOT), (WANT)), " got '%s'", (GOT))
 
 #define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \
   test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org