You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/03 22:12:58 UTC

[09/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728: Reorganize the source tree

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tests/object.c
----------------------------------------------------------------------
diff --git a/c/tests/object.c b/c/tests/object.c
new file mode 100644
index 0000000..8a1d00e
--- /dev/null
+++ b/c/tests/object.c
@@ -0,0 +1,1115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <proton/object.h>
+
+#define assert(E) ((E) ? 0 : (abort(), 0))
+
+static char mem;
+static void *END = &mem;
+
+static pn_list_t *build_list(size_t capacity, ...)
+{
+  pn_list_t *result = pn_list(PN_OBJECT, capacity);
+  va_list ap;
+
+  va_start(ap, capacity);
+  while (true) {
+    void *arg = va_arg(ap, void *);
+    if (arg == END) {
+      break;
+    }
+
+    pn_list_add(result, arg);
+    pn_class_decref(PN_OBJECT, arg);
+  }
+  va_end(ap);
+
+  return result;
+}
+
+static pn_map_t *build_map(float load_factor, size_t capacity, ...)
+{
+  pn_map_t *result = pn_map(PN_OBJECT, PN_OBJECT, capacity, load_factor);
+  va_list ap;
+
+  void *prev = NULL;
+
+  va_start(ap, capacity);
+  int count = 0;
+  while (true) {
+    void *arg = va_arg(ap, void *);
+    bool last = arg == END;
+    if (arg == END) {
+      arg = NULL;
+    }
+
+    if (count % 2) {
+      pn_map_put(result, prev, arg);
+      pn_class_decref(PN_OBJECT, prev);
+      pn_class_decref(PN_OBJECT, arg);
+    } else {
+      prev = arg;
+    }
+
+    if (last) {
+      break;
+    }
+
+    count++;
+  }
+  va_end(ap);
+
+  return result;
+}
+
+static void noop(void *o) {}
+static uintptr_t zero(void *o) { return 0; }
+static intptr_t delta(void *a, void *b) { return (uintptr_t) b - (uintptr_t) a; }
+
+#define CID_noop CID_pn_object
+#define noop_initialize noop
+#define noop_finalize noop
+#define noop_hashcode zero
+#define noop_compare delta
+#define noop_inspect NULL
+
+static const pn_class_t noop_class = PN_CLASS(noop);
+
+static void test_class(const pn_class_t *clazz, size_t size)
+{
+  void *a = pn_class_new(clazz, size);
+  void *b = pn_class_new(clazz, size);
+
+  assert(!pn_class_equals(clazz, a, b));
+  assert(pn_class_equals(clazz, a, a));
+  assert(pn_class_equals(clazz, b, b));
+  assert(!pn_class_equals(clazz, a, NULL));
+  assert(!pn_class_equals(clazz, NULL, a));
+
+  int rca = pn_class_refcount(clazz, a);
+  int rcb = pn_class_refcount(clazz, b);
+
+  assert(rca == -1 || rca == 1);
+  assert(rcb == -1 || rcb == 1);
+
+  pn_class_incref(clazz, a);
+
+  rca = pn_class_refcount(clazz, a);
+  assert(rca == -1 || rca == 2);
+
+  pn_class_decref(clazz, a);
+
+  rca = pn_class_refcount(clazz, a);
+  assert(rca == -1 || rca == 1);
+
+  pn_class_free(clazz, a);
+  pn_class_free(clazz, b);
+}
+
+static void test_new(size_t size, const pn_class_t *clazz)
+{
+  void *obj = pn_class_new(clazz, size);
+  assert(obj);
+  assert(pn_class_refcount(PN_OBJECT, obj) == 1);
+  assert(pn_class(obj) == clazz);
+  char *bytes = (char *) obj;
+  for (size_t i = 0; i < size; i++) {
+    // touch everything for valgrind
+    bytes[i] = i;
+  }
+  pn_free(obj);
+}
+
+static void finalizer(void *object)
+{
+  int **called = (int **) object;
+  (**called)++;
+}
+
+#define CID_finalizer CID_pn_object
+#define finalizer_initialize NULL
+#define finalizer_finalize finalizer
+#define finalizer_hashcode NULL
+#define finalizer_compare NULL
+#define finalizer_inspect NULL
+
+static void test_finalize(void)
+{
+  static pn_class_t clazz = PN_CLASS(finalizer);
+
+  int **obj = (int **) pn_class_new(&clazz, sizeof(int *));
+  assert(obj);
+
+  int called = 0;
+  *obj = &called;
+  pn_free(obj);
+
+  assert(called == 1);
+}
+
+static void test_free(void)
+{
+  // just to make sure it doesn't seg fault or anything
+  pn_free(NULL);
+}
+
+static uintptr_t hashcode(void *obj) { return (uintptr_t) obj; }
+
+#define CID_hashcode CID_pn_object
+#define hashcode_initialize NULL
+#define hashcode_finalize NULL
+#define hashcode_compare NULL
+#define hashcode_hashcode hashcode
+#define hashcode_inspect NULL
+
+static void test_hashcode(void)
+{
+  static pn_class_t clazz = PN_CLASS(hashcode);
+  void *obj = pn_class_new(&clazz, 0);
+  assert(obj);
+  assert(pn_hashcode(obj) == (uintptr_t) obj);
+  assert(pn_hashcode(NULL) == 0);
+  pn_free(obj);
+}
+
+#define CID_compare CID_pn_object
+#define compare_initialize NULL
+#define compare_finalize NULL
+#define compare_compare delta
+#define compare_hashcode NULL
+#define compare_inspect NULL
+
+static void test_compare(void)
+{
+  static pn_class_t clazz = PN_CLASS(compare);
+
+  void *a = pn_class_new(&clazz, 0);
+  assert(a);
+  void *b = pn_class_new(&clazz, 0);
+  assert(b);
+
+  assert(pn_compare(a, b));
+  assert(!pn_equals(a, b));
+  assert(!pn_compare(a, a));
+  assert(pn_equals(a, a));
+  assert(!pn_compare(b, b));
+  assert(pn_equals(b, b));
+  assert(pn_compare(a, b) == (intptr_t) ((uintptr_t) b - (uintptr_t) a));
+
+  assert(pn_compare(NULL, b));
+  assert(!pn_equals(NULL, b));
+
+  assert(pn_compare(a, NULL));
+  assert(!pn_equals(a, NULL));
+
+  assert(!pn_compare(NULL, NULL));
+  assert(pn_equals(NULL, NULL));
+
+  pn_free(a);
+  pn_free(b);
+}
+
+static void test_refcounting(int refs)
+{
+  void *obj = pn_class_new(PN_OBJECT, 0);
+
+  assert(pn_refcount(obj) == 1);
+
+  for (int i = 0; i < refs; i++) {
+    pn_incref(obj);
+    assert(pn_refcount(obj) == i + 2);
+  }
+
+  assert(pn_refcount(obj) == refs + 1);
+
+  for (int i = 0; i < refs; i++) {
+    pn_decref(obj);
+    assert(pn_refcount(obj) == refs - i);
+  }
+
+  assert(pn_refcount(obj) == 1);
+
+  pn_free(obj);
+}
+
+static void test_list(size_t capacity)
+{
+  pn_list_t *list = pn_list(PN_WEAKREF, 0);
+  assert(pn_list_size(list) == 0);
+  assert(!pn_list_add(list, (void *) 0));
+  assert(!pn_list_add(list, (void *) 1));
+  assert(!pn_list_add(list, (void *) 2));
+  assert(!pn_list_add(list, (void *) 3));
+  assert(pn_list_get(list, 0) == (void *) 0);
+  assert(pn_list_get(list, 1) == (void *) 1);
+  assert(pn_list_get(list, 2) == (void *) 2);
+  assert(pn_list_get(list, 3) == (void *) 3);
+  assert(pn_list_size(list) == 4);
+  pn_list_del(list, 1, 2);
+  assert(pn_list_size(list) == 2);
+  assert(pn_list_get(list, 0) == (void *) 0);
+  assert(pn_list_get(list, 1) == (void *) 3);
+  pn_decref(list);
+}
+
+static void test_list_refcount(size_t capacity)
+{
+  void *one = pn_class_new(PN_OBJECT, 0);
+  void *two = pn_class_new(PN_OBJECT, 0);
+  void *three = pn_class_new(PN_OBJECT, 0);
+  void *four = pn_class_new(PN_OBJECT, 0);
+
+  pn_list_t *list = pn_list(PN_OBJECT, 0);
+  assert(!pn_list_add(list, one));
+  assert(!pn_list_add(list, two));
+  assert(!pn_list_add(list, three));
+  assert(!pn_list_add(list, four));
+  assert(pn_list_get(list, 0) == one);
+  assert(pn_list_get(list, 1) == two);
+  assert(pn_list_get(list, 2) == three);
+  assert(pn_list_get(list, 3) == four);
+  assert(pn_list_size(list) == 4);
+
+  assert(pn_refcount(one) == 2);
+  assert(pn_refcount(two) == 2);
+  assert(pn_refcount(three) == 2);
+  assert(pn_refcount(four) == 2);
+
+  pn_list_del(list, 1, 2);
+  assert(pn_list_size(list) == 2);
+
+  assert(pn_refcount(one) == 2);
+  assert(pn_refcount(two) == 1);
+  assert(pn_refcount(three) == 1);
+  assert(pn_refcount(four) == 2);
+
+  assert(pn_list_get(list, 0) == one);
+  assert(pn_list_get(list, 1) == four);
+
+  assert(!pn_list_add(list, one));
+
+  assert(pn_list_size(list) == 3);
+  assert(pn_refcount(one) == 3);
+
+  pn_decref(list);
+
+  assert(pn_refcount(one) == 1);
+  assert(pn_refcount(two) == 1);
+  assert(pn_refcount(three) == 1);
+  assert(pn_refcount(four) == 1);
+
+  pn_decref(one);
+  pn_decref(two);
+  pn_decref(three);
+  pn_decref(four);
+}
+
+static void check_list_index(pn_list_t *list, void *value, ssize_t idx)
+{
+  assert(pn_list_index(list, value) == idx);
+}
+
+static void test_list_index(void)
+{
+  pn_list_t *l = pn_list(PN_WEAKREF, 0);
+  void *one = pn_string("one");
+  void *two = pn_string("two");
+  void *three = pn_string("three");
+  void *dup1 = pn_string("dup");
+  void *dup2 = pn_string("dup");
+  void *last = pn_string("last");
+
+  pn_list_add(l, one);
+  pn_list_add(l, two);
+  pn_list_add(l, three);
+  pn_list_add(l, dup1);
+  pn_list_add(l, dup2);
+  pn_list_add(l, last);
+
+  check_list_index(l, one, 0);
+  check_list_index(l, two, 1);
+  check_list_index(l, three, 2);
+  check_list_index(l, dup1, 3);
+  check_list_index(l, dup2, 3);
+  check_list_index(l, last, 5);
+
+  void *nonexistent = pn_string("nonexistent");
+
+  check_list_index(l, nonexistent, -1);
+
+  pn_free(l);
+  pn_free(one);
+  pn_free(two);
+  pn_free(three);
+  pn_free(dup1);
+  pn_free(dup2);
+  pn_free(last);
+  pn_free(nonexistent);
+}
+
+static bool pn_strequals(const char *a, const char *b)
+{
+  return !strcmp(a, b);
+}
+
+static void test_build_list(void)
+{
+  pn_list_t *l = build_list(0,
+                            pn_string("one"),
+                            pn_string("two"),
+                            pn_string("three"),
+                            END);
+
+  assert(pn_list_size(l) == 3);
+
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_list_get(l, 0)),
+                      "one"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_list_get(l, 1)),
+                      "two"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_list_get(l, 2)),
+                      "three"));
+
+  pn_free(l);
+}
+
+static void test_build_map(void)
+{
+  pn_map_t *m = build_map(0.75, 0,
+                          pn_string("key"),
+                          pn_string("value"),
+                          pn_string("key2"),
+                          pn_string("value2"),
+                          END);
+
+  assert(pn_map_size(m) == 2);
+
+  pn_string_t *key = pn_string(NULL);
+
+  pn_string_set(key, "key");
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_map_get(m, key)),
+                      "value"));
+  pn_string_set(key, "key2");
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_map_get(m, key)),
+                      "value2"));
+
+  pn_free(m);
+  pn_free(key);
+}
+
+static void test_build_map_odd(void)
+{
+  pn_map_t *m = build_map(0.75, 0,
+                          pn_string("key"),
+                          pn_string("value"),
+                          pn_string("key2"),
+                          pn_string("value2"),
+                          pn_string("key3"),
+                          END);
+
+  assert(pn_map_size(m) == 3);
+
+  pn_string_t *key = pn_string(NULL);
+
+  pn_string_set(key, "key");
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_map_get(m, key)),
+                      "value"));
+  pn_string_set(key, "key2");
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_map_get(m, key)),
+                      "value2"));
+  pn_string_set(key, "key3");
+  assert(pn_map_get(m, key) == NULL);
+
+  pn_free(m);
+  pn_free(key);
+}
+
+static void test_map(void)
+{
+  void *one = pn_class_new(PN_OBJECT, 0);
+  void *two = pn_class_new(PN_OBJECT, 0);
+  void *three = pn_class_new(PN_OBJECT, 0);
+
+  pn_map_t *map = pn_map(PN_OBJECT, PN_OBJECT, 4, 0.75);
+  assert(pn_map_size(map) == 0);
+
+  pn_string_t *key = pn_string("key");
+  pn_string_t *dup = pn_string("key");
+  pn_string_t *key1 = pn_string("key1");
+  pn_string_t *key2 = pn_string("key2");
+
+  assert(!pn_map_put(map, key, one));
+  assert(pn_map_size(map) == 1);
+  assert(!pn_map_put(map, key1, two));
+  assert(pn_map_size(map) == 2);
+  assert(!pn_map_put(map, key2, three));
+  assert(pn_map_size(map) == 3);
+
+  assert(pn_map_get(map, dup) == one);
+
+  assert(!pn_map_put(map, dup, one));
+  assert(pn_map_size(map) == 3);
+
+  assert(!pn_map_put(map, dup, two));
+  assert(pn_map_size(map) == 3);
+  assert(pn_map_get(map, dup) == two);
+
+  assert(pn_refcount(key) == 2);
+  assert(pn_refcount(dup) == 1);
+  assert(pn_refcount(key1) == 2);
+  assert(pn_refcount(key2) == 2);
+
+  assert(pn_refcount(one) == 1);
+  assert(pn_refcount(two) == 3);
+  assert(pn_refcount(three) == 2);
+
+  pn_map_del(map, key1);
+  assert(pn_map_size(map) == 2);
+
+  assert(pn_refcount(key) == 2);
+  assert(pn_refcount(dup) == 1);
+  assert(pn_refcount(key1) == 1);
+  assert(pn_refcount(key2) == 2);
+
+  assert(pn_refcount(one) == 1);
+  assert(pn_refcount(two) == 2);
+  assert(pn_refcount(three) == 2);
+
+  pn_decref(one);
+  pn_decref(two);
+  pn_decref(three);
+
+  pn_decref(key);
+  pn_decref(dup);
+  pn_decref(key1);
+  pn_decref(key2);
+
+  pn_decref(map);
+}
+
+static void test_hash(void)
+{
+  void *one = pn_class_new(PN_OBJECT, 0);
+  void *two = pn_class_new(PN_OBJECT, 0);
+  void *three = pn_class_new(PN_OBJECT, 0);
+
+  pn_hash_t *hash = pn_hash(PN_OBJECT, 4, 0.75);
+  pn_hash_put(hash, 0, NULL);
+  pn_hash_put(hash, 1, one);
+  pn_hash_put(hash, 2, two);
+  pn_hash_put(hash, 3, three);
+  pn_hash_put(hash, 4, one);
+  pn_hash_put(hash, 5, two);
+  pn_hash_put(hash, 6, three);
+  pn_hash_put(hash, 7, one);
+  pn_hash_put(hash, 8, two);
+  pn_hash_put(hash, 9, three);
+  pn_hash_put(hash, 10, one);
+  pn_hash_put(hash, 11, two);
+  pn_hash_put(hash, 12, three);
+  pn_hash_put(hash, 18, one);
+
+  assert(pn_hash_get(hash, 2) == two);
+  assert(pn_hash_get(hash, 5) == two);
+  assert(pn_hash_get(hash, 18) == one);
+  assert(pn_hash_get(hash, 0) == NULL);
+
+  assert(pn_hash_size(hash) == 14);
+
+  pn_hash_del(hash, 5);
+  assert(pn_hash_get(hash, 5) == NULL);
+  assert(pn_hash_size(hash) == 13);
+  pn_hash_del(hash, 18);
+  assert(pn_hash_get(hash, 18) == NULL);
+  assert(pn_hash_size(hash) == 12);
+
+  pn_decref(hash);
+
+  pn_decref(one);
+  pn_decref(two);
+  pn_decref(three);
+}
+
+
+// collider class: all objects have same hash, no two objects compare equal
+static intptr_t collider_compare(void *a, void *b)
+{
+  if (a == b) return 0;
+  return (a > b) ? 1 : -1;
+}
+
+static uintptr_t collider_hashcode(void *obj)
+{
+  return 23;
+}
+
+#define CID_collider CID_pn_object
+#define collider_initialize NULL
+#define collider_finalize NULL
+#define collider_inspect NULL
+
+static void test_map_links(void)
+{
+  const pn_class_t collider_clazz = PN_CLASS(collider);
+  void *keys[3];
+  for (int i = 0; i < 3; i++)
+    keys[i] = pn_class_new(&collider_clazz, 0);
+
+  // test deleting a head, middle link, tail
+
+  for (int delete_idx=0; delete_idx < 3; delete_idx++) {
+    pn_map_t *map = pn_map(PN_WEAKREF, PN_WEAKREF, 0, 0.75);
+    // create a chain of entries that have same head (from identical key hashcode)
+    for (int i = 0; i < 3; i++) {
+      pn_map_put(map, keys[i], keys[i]);
+    }
+    pn_map_del(map, keys[delete_idx]);
+    for (int i = 0; i < 3; i++) {
+      void *value = (i == delete_idx) ? NULL : keys[i];
+      assert (pn_map_get(map, keys[i]) == value);
+    }
+    pn_free(map);
+  }
+  for (int i = 0; i < 3; i++)
+    pn_free(keys[i]);
+}
+
+
+static bool equals(const char *a, const char *b)
+{
+  if (a == NULL && b == NULL) {
+    return true;
+  }
+
+  if (a == NULL || b == NULL) {
+    return false;
+  }
+
+  return !strcmp(a, b);
+}
+
+static void test_string(const char *value)
+{
+  size_t size = value ? strlen(value) : 0;
+
+  pn_string_t *str = pn_string(value);
+  assert(equals(pn_string_get(str), value));
+  assert(pn_string_size(str) == size);
+
+  pn_string_t *strn = pn_stringn(value, size);
+  assert(equals(pn_string_get(strn), value));
+  assert(pn_string_size(strn) == size);
+
+  pn_string_t *strset = pn_string(NULL);
+  pn_string_set(strset, value);
+  assert(equals(pn_string_get(strset), value));
+  assert(pn_string_size(strset) == size);
+
+  pn_string_t *strsetn = pn_string(NULL);
+  pn_string_setn(strsetn, value, size);
+  assert(equals(pn_string_get(strsetn), value));
+  assert(pn_string_size(strsetn) == size);
+
+  assert(pn_hashcode(str) == pn_hashcode(strn));
+  assert(pn_hashcode(str) == pn_hashcode(strset));
+  assert(pn_hashcode(str) == pn_hashcode(strsetn));
+
+  assert(!pn_compare(str, str));
+  assert(!pn_compare(str, strn));
+  assert(!pn_compare(str, strset));
+  assert(!pn_compare(str, strsetn));
+
+  pn_free(str);
+  pn_free(strn);
+  pn_free(strset);
+  pn_free(strsetn);
+}
+
+static void test_stringn(const char *value, size_t size)
+{
+  pn_string_t *strn = pn_stringn(value, size);
+  assert(equals(pn_string_get(strn), value));
+  assert(pn_string_size(strn) == size);
+
+  pn_string_t *strsetn = pn_string(NULL);
+  pn_string_setn(strsetn, value, size);
+  assert(equals(pn_string_get(strsetn), value));
+  assert(pn_string_size(strsetn) == size);
+
+  assert(pn_hashcode(strn) == pn_hashcode(strsetn));
+  assert(!pn_compare(strn, strsetn));
+
+  pn_free(strn);
+  pn_free(strsetn);
+}
+
+static void test_string_format(void)
+{
+  pn_string_t *str = pn_string("");
+  assert(str);
+  int err = pn_string_format(str, "%s", "this is a string that should be long "
+                             "enough to force growth but just in case we'll "
+                             "tack this other really long string on for the "
+                             "heck of it");
+  assert(err == 0);
+  pn_free(str);
+}
+
+static void test_string_addf(void)
+{
+  pn_string_t *str = pn_string("hello ");
+  assert(str);
+  int err = pn_string_addf(str, "%s", "this is a string that should be long "
+                           "enough to force growth but just in case we'll "
+                           "tack this other really long string on for the "
+                           "heck of it");
+  assert(err == 0);
+  pn_free(str);
+}
+
+static void test_map_iteration(int n)
+{
+  pn_list_t *pairs = pn_list(PN_OBJECT, 2*n);
+  for (int i = 0; i < n; i++) {
+    void *key = pn_class_new(PN_OBJECT, 0);
+    void *value = pn_class_new(PN_OBJECT, 0);
+    pn_list_add(pairs, key);
+    pn_list_add(pairs, value);
+    pn_decref(key);
+    pn_decref(value);
+  }
+
+  pn_map_t *map = pn_map(PN_OBJECT, PN_OBJECT, 0, 0.75);
+
+  assert(pn_map_head(map) == 0);
+
+  for (int i = 0; i < n; i++) {
+    pn_map_put(map, pn_list_get(pairs, 2*i), pn_list_get(pairs, 2*i + 1));
+  }
+
+  for (pn_handle_t entry = pn_map_head(map); entry; entry = pn_map_next(map, entry))
+  {
+    void *key = pn_map_key(map, entry);
+    void *value = pn_map_value(map, entry);
+    ssize_t idx = pn_list_index(pairs, key);
+    assert(idx >= 0);
+
+    assert(pn_list_get(pairs, idx) == key);
+    assert(pn_list_get(pairs, idx + 1) == value);
+
+    pn_list_del(pairs, idx, 2);
+  }
+
+  assert(pn_list_size(pairs) == 0);
+
+  pn_decref(map);
+  pn_decref(pairs);
+}
+
+void test_inspect(void *o, const char *expected)
+{
+  pn_string_t *dst = pn_string(NULL);
+  pn_inspect(o, dst);
+  assert(pn_strequals(pn_string_get(dst), expected));
+  pn_free(dst);
+}
+
+void test_list_inspect(void)
+{
+  pn_list_t *l = build_list(0, END);
+  test_inspect(l, "[]");
+  pn_free(l);
+
+  l = build_list(0, pn_string("one"), END);
+  test_inspect(l, "[\"one\"]");
+  pn_free(l);
+
+  l = build_list(0,
+                 pn_string("one"),
+                 pn_string("two"),
+                 END);
+  test_inspect(l, "[\"one\", \"two\"]");
+  pn_free(l);
+
+  l = build_list(0,
+                 pn_string("one"),
+                 pn_string("two"),
+                 pn_string("three"),
+                 END);
+  test_inspect(l, "[\"one\", \"two\", \"three\"]");
+  pn_free(l);
+}
+
+void test_map_inspect(void)
+{
+  // note that when there is more than one entry in a map, the order
+  // of the entries is dependent on the hashes involved, it will be
+  // deterministic though
+  pn_map_t *m = build_map(0.75, 0, END);
+  test_inspect(m, "{}");
+  pn_free(m);
+
+  m = build_map(0.75, 0,
+                pn_string("key"), pn_string("value"),
+                END);
+  test_inspect(m, "{\"key\": \"value\"}");
+  pn_free(m);
+
+  m = build_map(0.75, 0,
+                pn_string("k1"), pn_string("v1"),
+                pn_string("k2"), pn_string("v2"),
+                END);
+  test_inspect(m, "{\"k1\": \"v1\", \"k2\": \"v2\"}");
+  pn_free(m);
+
+  m = build_map(0.75, 0,
+                pn_string("k1"), pn_string("v1"),
+                pn_string("k2"), pn_string("v2"),
+                pn_string("k3"), pn_string("v3"),
+                END);
+  test_inspect(m, "{\"k3\": \"v3\", \"k1\": \"v1\", \"k2\": \"v2\"}");
+  pn_free(m);
+}
+
+void test_map_coalesced_chain(void)
+{
+  pn_hash_t *map = pn_hash(PN_OBJECT, 16, 0.75);
+  pn_string_t *values[9] = {
+      pn_string("a"),
+      pn_string("b"),
+      pn_string("c"),
+      pn_string("d"),
+      pn_string("e"),
+      pn_string("f"),
+      pn_string("g"),
+      pn_string("h"),
+      pn_string("i")
+  };
+  //add some items:
+  pn_hash_put(map, 1, values[0]);
+  pn_hash_put(map, 2, values[1]);
+  pn_hash_put(map, 3, values[2]);
+
+  //use up all non-addressable elements:
+  pn_hash_put(map, 14, values[3]);
+  pn_hash_put(map, 15, values[4]);
+  pn_hash_put(map, 16, values[5]);
+
+  //use an addressable element for a key that doesn't map to it:
+  pn_hash_put(map, 4, values[6]);
+  pn_hash_put(map, 17, values[7]);
+  assert(pn_hash_size(map) == 8);
+
+  //free up one non-addressable entry:
+  pn_hash_del(map, 16);
+  assert(pn_hash_get(map, 16) == NULL);
+  assert(pn_hash_size(map) == 7);
+
+  //add a key whose addressable slot is already taken (by 17),
+  //generating a coalesced chain:
+  pn_hash_put(map, 12, values[8]);
+
+  //remove an entry from the coalesced chain:
+  pn_hash_del(map, 4);
+  assert(pn_hash_get(map, 4) == NULL);
+
+  //test lookup of all entries:
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 1)), "a"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 2)), "b"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 3)), "c"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 14)), "d"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 15)), "e"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 17)), "h"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 12)), "i"));
+  assert(pn_hash_size(map) == 7);
+
+  //cleanup:
+  for (pn_handle_t i = pn_hash_head(map); i; i = pn_hash_head(map)) {
+    pn_hash_del(map, pn_hash_key(map, i));
+  }
+  assert(pn_hash_size(map) == 0);
+
+  for (size_t i = 0; i < 9; ++i) {
+      pn_free(values[i]);
+  }
+  pn_free(map);
+}
+
+void test_map_coalesced_chain2(void)
+{
+  pn_hash_t *map = pn_hash(PN_OBJECT, 16, 0.75);
+  pn_string_t *values[10] = {
+      pn_string("a"),
+      pn_string("b"),
+      pn_string("c"),
+      pn_string("d"),
+      pn_string("e"),
+      pn_string("f"),
+      pn_string("g"),
+      pn_string("h"),
+      pn_string("i"),
+      pn_string("j")
+  };
+  //add some items:
+  pn_hash_put(map, 1, values[0]);//a
+  pn_hash_put(map, 2, values[1]);//b
+  pn_hash_put(map, 3, values[2]);//c
+
+  //use up all non-addressable elements:
+  pn_hash_put(map, 14, values[3]);//d
+  pn_hash_put(map, 15, values[4]);//e
+  pn_hash_put(map, 16, values[5]);//f
+  //take slot from addressable region
+  pn_hash_put(map, 29, values[6]);//g, goes into slot 12
+
+  //free up one non-addressable entry:
+  pn_hash_del(map, 14);
+  assert(pn_hash_get(map, 14) == NULL);
+
+  //add a key whose addressable slot is already taken (by 29),
+  //generating a coalesced chain:
+  pn_hash_put(map, 12, values[7]);//h
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 12)), "h"));
+  //delete from tail of coalesced chain:
+  pn_hash_del(map, 12);
+  assert(pn_hash_get(map, 12) == NULL);
+
+  //extend chain into cellar again, then coalesce again extending back
+  //into addressable region
+  pn_hash_put(map, 42, values[8]);//i
+  pn_hash_put(map, 25, values[9]);//j
+  //delete entry from coalesced chain, where next element in chain is
+  //in cellar:
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 29)), "g"));
+  pn_hash_del(map, 29);
+
+  //test lookup of all entries:
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 1)), "a"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 2)), "b"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 3)), "c"));
+  //d was deleted
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 15)), "e"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 16)), "f"));
+  //g was deleted, h was deleted
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 42)), "i"));
+  assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 25)), "j"));
+  assert(pn_hash_size(map) == 7);
+
+  //cleanup:
+  for (pn_handle_t i = pn_hash_head(map); i; i = pn_hash_head(map)) {
+    pn_hash_del(map, pn_hash_key(map, i));
+  }
+  assert(pn_hash_size(map) == 0);
+
+  for (size_t i = 0; i < 10; ++i) {
+      pn_free(values[i]);
+  }
+  pn_free(map);
+}
+
+void test_list_compare(void)
+{
+  pn_list_t *a = pn_list(PN_OBJECT, 0);
+  pn_list_t *b = pn_list(PN_OBJECT, 0);
+
+  assert(pn_equals(a, b));
+
+  void *one = pn_class_new(PN_OBJECT, 0);
+  void *two = pn_class_new(PN_OBJECT, 0);
+  void *three = pn_class_new(PN_OBJECT, 0);
+
+  pn_list_add(a, one);
+  assert(!pn_equals(a, b));
+  pn_list_add(b, one);
+  assert(pn_equals(a, b));
+
+  pn_list_add(b, two);
+  assert(!pn_equals(a, b));
+  pn_list_add(a, two);
+  assert(pn_equals(a, b));
+
+  pn_list_add(a, three);
+  assert(!pn_equals(a, b));
+  pn_list_add(b, three);
+  assert(pn_equals(a, b));
+
+  pn_free(a); pn_free(b);
+  pn_free(one); pn_free(two); pn_free(three);
+}
+
+typedef struct {
+  pn_list_t *list;
+  size_t index;
+} pn_it_state_t;
+
+static void *pn_it_next(void *state) {
+  pn_it_state_t *it = (pn_it_state_t *) state;
+  if (it->index < pn_list_size(it->list)) {
+    return pn_list_get(it->list, it->index++);
+  } else {
+    return NULL;
+  }
+}
+
+void test_iterator(void)
+{
+  pn_list_t *list = build_list(0,
+                               pn_string("one"),
+                               pn_string("two"),
+                               pn_string("three"),
+                               pn_string("four"),
+                               END);
+  pn_iterator_t *it = pn_iterator();
+  pn_it_state_t *state = (pn_it_state_t *) pn_iterator_start
+    (it, pn_it_next, sizeof(pn_it_state_t));
+  state->list = list;
+  state->index = 0;
+
+  void *obj;
+  int index = 0;
+  while ((obj = pn_iterator_next(it))) {
+    assert(obj == pn_list_get(list, index));
+    ++index;
+  }
+  assert(index == 4);
+
+  pn_free(list);
+  pn_free(it);
+}
+
+void test_heap(int seed, int size)
+{
+  srand(seed);
+  pn_list_t *list = pn_list(PN_VOID, 0);
+
+  intptr_t min = 0;
+  intptr_t max = 0;
+
+  for (int i = 0; i < size; i++) {
+    intptr_t r = rand();
+
+    if (i == 0) {
+      min = r;
+      max = r;
+    } else {
+      if (r < min) {
+        min = r;
+      }
+      if (r > max) {
+        max = r;
+      }
+    }
+
+    pn_list_minpush(list, (void *) r);
+  }
+
+  intptr_t prev = (intptr_t) pn_list_minpop(list);
+  assert(prev == min);
+  assert(pn_list_size(list) == (size_t)(size - 1));
+  int count = 0;
+  while (pn_list_size(list)) {
+    intptr_t r = (intptr_t) pn_list_minpop(list);
+    assert(r >= prev);
+    prev = r;
+    count++;
+  }
+  assert(count == size - 1);
+  assert(prev == max);
+
+  pn_free(list);
+}
+
+int main(int argc, char **argv)
+{
+  for (size_t i = 0; i < 128; i++) {
+    test_class(PN_OBJECT, i);
+    test_class(PN_VOID, i);
+    test_class(&noop_class, i);
+  }
+
+  for (size_t i = 0; i < 128; i++) {
+    test_new(i, PN_OBJECT);
+    test_new(i, &noop_class);
+  }
+
+  test_finalize();
+  test_free();
+  test_hashcode();
+  test_compare();
+
+  for (int i = 0; i < 1024; i++) {
+    test_refcounting(i);
+  }
+
+  for (size_t i = 0; i < 4; i++) {
+    test_list(i);
+  }
+
+  for (size_t i = 0; i < 4; i++) {
+    test_list_refcount(i);
+  }
+
+  test_list_index();
+
+  test_map();
+  test_map_links();
+
+  test_hash();
+
+  test_string(NULL);
+  test_string("");
+  test_string("this is a test");
+  test_string("012345678910111213151617181920212223242526272829303132333435363"
+              "738394041424344454647484950515253545556575859606162636465666768");
+  test_string("this has an embedded \000 in it");
+  test_stringn("this has an embedded \000 in it", 28);
+
+  test_string_format();
+  test_string_addf();
+
+  test_build_list();
+  test_build_map();
+  test_build_map_odd();
+
+  for (int i = 0; i < 64; i++)
+  {
+    test_map_iteration(i);
+  }
+
+  test_list_inspect();
+  test_map_inspect();
+  test_list_compare();
+  test_iterator();
+  for (int seed = 0; seed < 64; seed++) {
+    for (int size = 1; size <= 64; size++) {
+      test_heap(seed, size);
+    }
+  }
+
+  test_map_coalesced_chain();
+  test_map_coalesced_chain2();
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tests/parse-url.c
----------------------------------------------------------------------
diff --git a/c/tests/parse-url.c b/c/tests/parse-url.c
new file mode 100644
index 0000000..b632237
--- /dev/null
+++ b/c/tests/parse-url.c
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "proton/type_compat.h"
+#include "proton/error.h"
+#include "proton/url.h"
+
+static bool verify(const char* what, const char* want, const char* got) {
+  bool eq = (want == got || (want && got && strcmp(want, got) == 0));
+  if (!eq) printf("  %s: '%s' != '%s'\n", what, want, got);
+  return eq;
+}
+
+static bool test(const char* url, const char* scheme, const char* user, const char* pass, const char* host, const char* port, const char*path)
+{
+  pn_url_t *purl = pn_url_parse(url);
+  bool ok =
+    verify("scheme", scheme, pn_url_get_scheme(purl)) &&
+    verify("user", user, pn_url_get_username(purl)) &&
+    verify("pass", pass, pn_url_get_password(purl)) &&
+    verify("host", host, pn_url_get_host(purl)) &&
+    verify("port", port, pn_url_get_port(purl)) &&
+    verify("path", path, pn_url_get_path(purl));
+  pn_url_free(purl);
+  return ok;
+}
+
+// Run test and additionally verify the round trip of parse and stringify
+// matches original string.
+static bool testrt(const char* url, const char* scheme, const char* user, const char* pass, const char* host, const char* port, const char*path)
+{
+  bool ok = test(url, scheme, user, pass, host, port, path);
+  pn_url_t *purl = pn_url_parse(url);
+  ok = ok && verify("url", url, pn_url_str(purl));
+  pn_url_free(purl);
+  return ok;
+}
+
+#define TEST(EXPR) \
+  do { if (!(EXPR)) { printf("%s:%d: %s\n\n", __FILE__, __LINE__, #EXPR); failed++; } } while(0)
+
+int main(int argc, char **argv)
+{
+  int failed = 0;
+  TEST(testrt("/Foo.bar:90087@somewhere", 0, 0, 0, 0, 0, "Foo.bar:90087@somewhere"));
+  TEST(testrt("host", 0, 0, 0, "host", 0, 0));
+  TEST(testrt("host:423", 0, 0, 0, "host", "423", 0));
+  TEST(testrt("user@host", 0, "user", 0, "host", 0, 0));
+
+  // Can't round-trip passwords with ':', not strictly legal but the parser allows it.
+  TEST(test("user:1243^&^:pw@host:423", 0, "user", "1243^&^:pw", "host", "423", 0));
+  TEST(test("user:1243^&^:pw@host:423/Foo.bar:90087", 0, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087"));
+  TEST(test("user:1243^&^:pw@host:423/Foo.bar:90087@somewhere", 0, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087@somewhere"));
+
+  TEST(testrt("[::1]", 0, 0, 0, "::1", 0, 0));
+  TEST(testrt("[::1]:amqp", 0, 0, 0, "::1", "amqp", 0));
+  TEST(testrt("user@[::1]", 0, "user", 0, "::1", 0, 0));
+  TEST(testrt("user@[::1]:amqp", 0, "user", 0, "::1", "amqp", 0));
+
+  // Can't round-trip passwords with ':', not strictly legal but the parser allows it.
+  TEST(test("user:1243^&^:pw@[::1]:amqp", 0, "user", "1243^&^:pw", "::1", "amqp", 0));
+  TEST(test("user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", 0, "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"));
+  TEST(test("user:1243^&^:pw@[::1:amqp/Foo.bar:90087", 0, "user", "1243^&^:pw", "[::1", "amqp", "Foo.bar:90087"));
+  TEST(test("user:1243^&^:pw@::1]:amqp/Foo.bar:90087", 0, "user", "1243^&^:pw", "::1]", "amqp", "Foo.bar:90087"));
+
+  TEST(testrt("amqp://user@[::1]", "amqp", "user", 0, "::1", 0, 0));
+  TEST(testrt("amqp://user@[::1]:amqp", "amqp", "user", 0, "::1", "amqp", 0));
+  TEST(testrt("amqp://user@[1234:52:0:1260:f2de:f1ff:fe59:8f87]:amqp", "amqp", "user", 0, "1234:52:0:1260:f2de:f1ff:fe59:8f87", "amqp", 0));
+
+  // Can't round-trip passwords with ':', not strictly legal but the parser allows it.
+  TEST(test("amqp://user:1243^&^:pw@[::1]:amqp", "amqp", "user", "1243^&^:pw", "::1", "amqp", 0));
+  TEST(test("amqp://user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", "amqp", "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"));
+
+  TEST(testrt("amqp://host", "amqp", 0, 0, "host", 0, 0));
+  TEST(testrt("amqp://user@host", "amqp", "user", 0, "host", 0, 0));
+  TEST(testrt("amqp://user@host/path:%", "amqp", "user", 0, "host", 0, "path:%"));
+  TEST(testrt("amqp://user@host:5674/path:%", "amqp", "user", 0, "host", "5674", "path:%"));
+  TEST(testrt("amqp://user@host/path:%", "amqp", "user", 0, "host", 0, "path:%"));
+  TEST(testrt("amqp://bigbird@host/queue@host", "amqp", "bigbird", 0, "host", 0, "queue@host"));
+  TEST(testrt("amqp://host/queue@host", "amqp", 0, 0, "host", 0, "queue@host"));
+  TEST(testrt("amqp://host:9765/queue@host", "amqp", 0, 0, "host", "9765", "queue@host"));
+  TEST(test("user:pass%2fword@host", 0, "user", "pass/word", "host", 0, 0));
+  TEST(testrt("user:pass%2Fword@host", 0, "user", "pass/word", "host", 0, 0));
+  // Can't round-trip passwords with lowercase hex encoding
+  TEST(test("us%2fer:password@host", 0, "us/er", "password", "host", 0, 0));
+  TEST(testrt("us%2Fer:password@host", 0, "us/er", "password", "host", 0, 0));
+  // Can't round-trip passwords with lowercase hex encoding
+  TEST(test("user:pass%2fword%@host", 0, "user", "pass/word%", "host", 0, 0));
+  TEST(testrt("localhost/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", 0, 0, 0, "localhost", 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+  TEST(testrt("/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", 0, 0, 0, 0, 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+  TEST(testrt("amqp://localhost/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", "amqp", 0, 0, "localhost", 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+  // PROTON-995
+  TEST(testrt("amqps://%40user%2F%3A:%40pass%2F%3A@example.net/some_topic",
+              "amqps", "@user/:", "@pass/:", "example.net", 0, "some_topic"));
+  TEST(testrt("amqps://user%2F%3A=:pass%2F%3A=@example.net/some_topic",
+              "amqps", "user/:=", "pass/:=", "example.net", 0, "some_topic"));
+  // Really perverse url
+  TEST(testrt("://:@://:", "", "", "", 0, "", "/:"));
+  return failed;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tests/proactor.c
----------------------------------------------------------------------
diff --git a/c/tests/proactor.c b/c/tests/proactor.c
new file mode 100644
index 0000000..216d68a
--- /dev/null
+++ b/c/tests/proactor.c
@@ -0,0 +1,1102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_tools.h"
+#include "test_handler.h"
+#include "test_config.h"
+#include "../src/proactor/proactor-internal.h"
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/listener.h>
+#include <proton/session.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+#include <proton/ssl.h>
+#include <proton/transport.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define ARRAYLEN(A) (sizeof(A)/sizeof((A)[0]))
+
+/* Proactor and handler that take part in a test */
+typedef struct test_proactor_t {
+  test_handler_t handler;
+  pn_proactor_t *proactor;
+} test_proactor_t;
+
+static test_proactor_t test_proactor(test_t *t, test_handler_fn f) {
+  test_proactor_t tp;
+  test_handler_init(&tp.handler, t, f);
+  tp.proactor = pn_proactor();
+  TEST_ASSERT(tp.proactor);
+  return tp;
+}
+
+static void test_proactor_destroy(test_proactor_t *tp) {
+  pn_proactor_free(tp->proactor);
+}
+
+/* Set this to a pn_condition() to save condition data */
+pn_condition_t *last_condition = NULL;
+
+static void save_condition(pn_event_t *e) {
+  if (last_condition) {
+    pn_condition_t *cond = NULL;
+    if (pn_event_listener(e)) {
+      cond = pn_listener_condition(pn_event_listener(e));
+    } else {
+      cond = pn_event_condition(e);
+    }
+    if (cond) {
+      pn_condition_copy(last_condition, cond);
+    } else {
+      pn_condition_clear(last_condition);
+    }
+  }
+}
+
+/* Process events on a proactor array until a handler returns an event, or
+ * all proactors return NULL
+ */
+static pn_event_type_t test_proactors_get(test_proactor_t *tps, size_t n) {
+  if (last_condition) pn_condition_clear(last_condition);
+  while (true) {
+    bool busy = false;
+    for (test_proactor_t *tp = tps; tp < tps + n; ++tp) {
+      pn_event_batch_t *eb =  pn_proactor_get(tp->proactor);
+      if (eb) {
+        busy = true;
+        pn_event_type_t ret = PN_EVENT_NONE;
+        for (pn_event_t* e = pn_event_batch_next(eb); e; e = pn_event_batch_next(eb)) {
+          test_handler_log(&tp->handler, e);
+          save_condition(e);
+          ret = tp->handler.f(&tp->handler, e);
+          if (ret) break;
+        }
+        pn_proactor_done(tp->proactor, eb);
+        if (ret) return ret;
+      }
+    }
+    if (!busy) {
+      return PN_EVENT_NONE;
+    }
+  }
+}
+
+/* Run an array of proactors till a handler returns an event. */
+static pn_event_type_t test_proactors_run(test_proactor_t *tps, size_t n) {
+  pn_event_type_t e;
+  while ((e = test_proactors_get(tps, n)) == PN_EVENT_NONE)
+         ;
+  return e;
+}
+
+/* Run an array of proactors till a handler returns the desired event. */
+void test_proactors_run_until(test_proactor_t *tps, size_t n, pn_event_type_t want) {
+  while (test_proactors_get(tps, n) != want)
+         ;
+}
+
+/* Drain and discard outstanding events from an array of proactors */
+static void test_proactors_drain(test_proactor_t *tps, size_t n) {
+  while (test_proactors_get(tps, n))
+         ;
+}
+
+
+#define TEST_PROACTORS_GET(A) test_proactors_get((A), ARRAYLEN(A))
+#define TEST_PROACTORS_RUN(A) test_proactors_run((A), ARRAYLEN(A))
+#define TEST_PROACTORS_RUN_UNTIL(A, WANT) test_proactors_run_until((A), ARRAYLEN(A), WANT)
+#define TEST_PROACTORS_DRAIN(A) test_proactors_drain((A), ARRAYLEN(A))
+
+#define TEST_PROACTORS_DESTROY(A) do {           \
+    for (size_t i = 0; i < ARRAYLEN(A); ++i)       \
+      test_proactor_destroy((A)+i);             \
+  } while (0)
+
+
+#define MAX_STR 256
+struct addrinfo {
+  char host[MAX_STR];
+  char port[MAX_STR];
+  char connect[MAX_STR];
+  char host_port[MAX_STR];
+};
+
+struct addrinfo listener_info(pn_listener_t *l) {
+  struct addrinfo ai = {{0}};
+  const pn_netaddr_t *na = pn_listener_addr(l);
+  TEST_ASSERT(0 == pn_netaddr_host_port(na, ai.host, sizeof(ai.host), ai.port, sizeof(ai.port)));
+  for (na = pn_netaddr_next(na); na; na = pn_netaddr_next(na)) { /* Check that ports are consistent */
+    char port[MAX_STR];
+    TEST_ASSERT(0 == pn_netaddr_host_port(na, NULL, 0, port, sizeof(port)));
+    TEST_ASSERTF(0 == strcmp(port, ai.port), "%s !=  %s", port, ai.port);
+  }
+  (void)pn_proactor_addr(ai.connect, sizeof(ai.connect), "", ai.port); /* Address for connecting */
+  (void)pn_netaddr_str(na, ai.host_port, sizeof(ai.host_port)); /* host:port listening address */
+  return ai;
+}
+
+/* Return a pn_listener_t*, raise errors if not successful */
+pn_listener_t *test_listen(test_proactor_t *tp, const char *host) {
+  char addr[1024];
+  pn_listener_t *l = pn_listener();
+  (void)pn_proactor_addr(addr, sizeof(addr), host, "0");
+  pn_proactor_listen(tp->proactor, l, addr, 4);
+  TEST_ETYPE_EQUAL(tp->handler.t, PN_LISTENER_OPEN, test_proactors_run(tp, 1));
+  TEST_COND_EMPTY(tp->handler.t, last_condition);
+  return l;
+}
+
+
+/* Wait for the next single event, return its type */
+static pn_event_type_t wait_next(pn_proactor_t *proactor) {
+  pn_event_batch_t *events = pn_proactor_wait(proactor);
+  pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
+  pn_proactor_done(proactor, events);
+  return etype;
+}
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
+static void test_interrupt_timeout(test_t *t) {
+  pn_proactor_t *p = pn_proactor();
+  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+  pn_proactor_interrupt(p);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INTERRUPT, wait_next(p));
+  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+
+  /* Set an immediate timeout */
+  pn_proactor_set_timeout(p, 0);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p)); /* Inactive because timeout expired */
+
+  /* Set a (very short) timeout */
+  pn_proactor_set_timeout(p, 1);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
+
+  /* Set and cancel a timeout, make sure we don't get the timeout event */
+  pn_proactor_set_timeout(p, 10000000);
+  pn_proactor_cancel_timeout(p);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
+  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+
+  pn_proactor_free(p);
+}
+
+/* Save the last connection accepted by the common_handler */
+pn_connection_t *last_accepted = NULL;
+
+/* Common handler for simple client/server interactions,  */
+static pn_event_type_t common_handler(test_handler_t *th, pn_event_t *e) {
+  pn_connection_t *c = pn_event_connection(e);
+  pn_listener_t *l = pn_event_listener(e);
+
+  switch (pn_event_type(e)) {
+
+    /* Stop on these events */
+   case PN_TRANSPORT_CLOSED:
+   case PN_PROACTOR_INACTIVE:
+   case PN_PROACTOR_TIMEOUT:
+   case PN_LISTENER_OPEN:
+    return pn_event_type(e);
+
+   case PN_LISTENER_ACCEPT:
+    last_accepted = pn_connection();
+    pn_listener_accept2(l, last_accepted, NULL);
+    pn_listener_close(l);       /* Only accept one connection */
+    return PN_EVENT_NONE;
+
+   case PN_CONNECTION_REMOTE_OPEN:
+    pn_connection_open(c);      /* Return the open (no-op if already open) */
+    return PN_EVENT_NONE;
+
+   case PN_SESSION_REMOTE_OPEN:
+    pn_session_open(pn_event_session(e));
+    return PN_EVENT_NONE;
+
+   case PN_LINK_REMOTE_OPEN:
+    pn_link_open(pn_event_link(e));
+    return PN_EVENT_NONE;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    pn_connection_close(c);     /* Return the close */
+    return PN_EVENT_NONE;
+
+    /* Ignore these events */
+   case PN_CONNECTION_BOUND:
+   case PN_CONNECTION_INIT:
+   case PN_CONNECTION_LOCAL_CLOSE:
+   case PN_CONNECTION_LOCAL_OPEN:
+   case PN_LINK_INIT:
+   case PN_LINK_LOCAL_OPEN:
+   case PN_LISTENER_CLOSE:
+   case PN_SESSION_INIT:
+   case PN_SESSION_LOCAL_OPEN:
+   case PN_TRANSPORT:
+   case PN_TRANSPORT_ERROR:
+   case PN_TRANSPORT_HEAD_CLOSED:
+   case PN_TRANSPORT_TAIL_CLOSED:
+    return PN_EVENT_NONE;
+
+   default:
+    TEST_ERRORF(th->t, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+    return PN_EVENT_NONE;                   /* Fail the test but keep going */
+  }
+}
+
+/* Like common_handler but does not auto-close the listener after one accept,
+   and returns on LISTENER_CLOSE
+*/
+static pn_event_type_t listen_handler(test_handler_t *th, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_LISTENER_ACCEPT:
+    /* No automatic listener close/free for tests that accept multiple connections */
+    last_accepted = pn_connection();
+    pn_listener_accept2(pn_event_listener(e), last_accepted, NULL);
+    /* No automatic close */
+    return PN_EVENT_NONE;
+
+   case PN_LISTENER_CLOSE:
+    return PN_LISTENER_CLOSE;
+
+   default:
+    return common_handler(th, e);
+  }
+}
+
+/* close a connection when it is remote open */
+static pn_event_type_t open_close_handler(test_handler_t *th, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_REMOTE_OPEN:
+    pn_connection_close(pn_event_connection(e));
+    return PN_EVENT_NONE;          /* common_handler will finish on TRANSPORT_CLOSED */
+   default:
+    return common_handler(th, e);
+  }
+}
+
+/* Test simple client/server connection with 2 proactors */
+static void test_client_server(test_t *t) {
+  test_proactor_t tps[] ={ test_proactor(t, open_close_handler), test_proactor(t, common_handler) };
+  pn_listener_t *l = test_listen(&tps[1], "");
+  /* Connect and wait for close at both ends */
+  pn_proactor_connect2(tps[0].proactor, NULL, NULL, listener_info(l).connect);
+  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);  
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Return on connection open, close and return on wake */
+static pn_event_type_t open_wake_handler(test_handler_t *th, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_REMOTE_OPEN:
+    return pn_event_type(e);
+   case PN_CONNECTION_WAKE:
+    pn_connection_close(pn_event_connection(e));
+    return pn_event_type(e);
+   default:
+    return common_handler(th, e);
+  }
+}
+
+/* Test waking up a connection that is idle */
+static void test_connection_wake(test_t *t) {
+  test_proactor_t tps[] =  { test_proactor(t, open_wake_handler), test_proactor(t,  listen_handler) };
+  pn_proactor_t *client = tps[0].proactor;
+  pn_listener_t *l = test_listen(&tps[1], "");
+
+  pn_connection_t *c = pn_connection();
+  pn_incref(c);                 /* Keep a reference for wake() after free */
+  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+  pn_connection_wake(c);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* Both ends */
+  /* The pn_connection_t is still valid so wake is legal but a no-op */
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_EVENT_NONE, TEST_PROACTORS_GET(tps)); /* No more wake */
+
+  /* Verify we don't get a wake after close even if they happen together */
+  pn_connection_t *c2 = pn_connection();
+  pn_proactor_connect2(client, c2, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  pn_connection_wake(c2);
+  pn_proactor_disconnect(client, NULL);
+  pn_connection_wake(c2);
+
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, test_proactors_run(&tps[0], 1));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, test_proactors_run(&tps[0], 1));
+  TEST_ETYPE_EQUAL(t, PN_EVENT_NONE, test_proactors_get(&tps[0], 1)); /* No late wake */
+
+  TEST_PROACTORS_DESTROY(tps);
+  /* The pn_connection_t is still valid so wake is legal but a no-op */
+  pn_connection_wake(c);
+  pn_decref(c);
+}
+
+/* Close the transport to abort a connection, i.e. close the socket without an AMQP close */
+static pn_event_type_t listen_abort_handler(test_handler_t *th, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_REMOTE_OPEN:
+    /* Close the transport - abruptly closes the socket */
+    pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
+    pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
+    return PN_EVENT_NONE;
+
+   default:
+    /* Don't auto-close the listener to keep the event sequences simple */
+    return listen_handler(th, e);
+  }
+}
+
+/* Verify that pn_transport_close_head/tail aborts a connection without an AMQP protocol close */
+static void test_abort(test_t *t) {
+  test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_abort_handler) };
+  pn_proactor_t *client = tps[0].proactor;
+  pn_listener_t *l = test_listen(&tps[1], "");
+  pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
+
+  /* server transport closes */
+  if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
+    TEST_COND_NAME(t, "amqp:connection:framing-error",last_condition);
+    TEST_COND_DESC(t, "abort", last_condition);
+  }
+  /* client transport closes */
+  if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
+    TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition);
+    TEST_COND_DESC(t, "abort", last_condition);
+  }
+
+  pn_listener_close(l);
+
+  while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
+  while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
+
+  /* Verify expected event sequences, no unexpected events */
+  TEST_HANDLER_EXPECT(
+    &tps[0].handler,
+    PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_BOUND,
+    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+    PN_PROACTOR_INACTIVE,
+    0);
+
+  TEST_HANDLER_EXPECT(
+    &tps[1].handler,
+    PN_LISTENER_OPEN, PN_LISTENER_ACCEPT,
+    PN_CONNECTION_INIT, PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN,
+    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+    PN_LISTENER_CLOSE,
+    PN_PROACTOR_INACTIVE,
+    0);
+
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Refuse a connection: abort before the AMQP open sequence begins. */
+static pn_event_type_t listen_refuse_handler(test_handler_t *th, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+
+   case PN_CONNECTION_BOUND:
+    /* Close the transport - abruptly closes the socket */
+    pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
+    pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
+    return PN_EVENT_NONE;
+
+   default:
+    /* Don't auto-close the listener to keep the event sequences simple */
+    return listen_handler(th, e);
+  }
+}
+
+/* Verify that pn_transport_close_head/tail aborts a connection without an AMQP protocol close */
+static void test_refuse(test_t *t) {
+  test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_refuse_handler) };
+  pn_proactor_t *client = tps[0].proactor;
+  pn_listener_t *l = test_listen(&tps[1], "");
+  pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
+
+  /* client transport closes */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* client */
+  TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition);
+
+  pn_listener_close(l);
+  while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
+  while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
+
+  /* Verify expected event sequences, no unexpected events */
+  TEST_HANDLER_EXPECT(
+    &tps[0].handler,
+    PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_BOUND,
+    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+    PN_PROACTOR_INACTIVE,
+    0);
+
+  TEST_HANDLER_EXPECT(
+    &tps[1].handler,
+    PN_LISTENER_OPEN, PN_LISTENER_ACCEPT,
+    PN_CONNECTION_INIT, PN_CONNECTION_BOUND,
+    PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+    PN_LISTENER_CLOSE,
+    PN_PROACTOR_INACTIVE,
+    0);
+
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Test that INACTIVE event is generated when last connections/listeners closes. */
+static void test_inactive(test_t *t) {
+  test_proactor_t tps[] =  { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+  pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
+
+  /* Listen, connect, disconnect */
+  pn_listener_t *l = test_listen(&tps[1], "");
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  pn_connection_wake(c);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
+  /* Expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  /* Immediate timer generates INACTIVE on client (no connections) */
+  pn_proactor_set_timeout(client, 0);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  /* Connect, set-timer, disconnect */
+  pn_proactor_set_timeout(client, 1000000);
+  c = pn_connection();
+  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  pn_connection_wake(c);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
+  /* Expect TRANSPORT_CLOSED from client and server */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  /* No INACTIVE till timer is cancelled */
+  TEST_CHECK(t, pn_proactor_get(server) == NULL);
+  pn_proactor_cancel_timeout(client);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  /* Server won't be INACTIVE until listener is closed */
+  TEST_CHECK(t, pn_proactor_get(server) == NULL);
+  pn_listener_close(l);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Tests for error handling */
+static void test_errors(test_t *t) {
+  test_proactor_t tps[] =  { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+  pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
+
+  /* Invalid connect/listen service name */
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect2(client, c, NULL, "127.0.0.1:xxx");
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_COND_DESC(t, "xxx", last_condition);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  pn_proactor_listen(server, pn_listener(), "127.0.0.1:xxx", 1);
+  TEST_PROACTORS_RUN(tps);
+  TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, no OPEN */
+  TEST_COND_DESC(t, "xxx", last_condition);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  /* Invalid connect/listen host name */
+  c = pn_connection();
+  pn_proactor_connect2(client, c, NULL, "nosuch.example.com:");
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_COND_DESC(t, "nosuch", last_condition);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  test_handler_clear(&tps[1].handler, 0);
+  pn_proactor_listen(server, pn_listener(), "nosuch.example.com:", 1);
+  TEST_PROACTORS_RUN(tps);
+  TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, no OPEN */
+  TEST_COND_DESC(t, "nosuch", last_condition);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  /* Listen on a port already in use */
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(server, l, ":0", 1);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, TEST_PROACTORS_RUN(tps));
+  test_handler_clear(&tps[1].handler, 0);
+  struct addrinfo laddr = listener_info(l);
+  pn_proactor_listen(server, pn_listener(), laddr.connect, 1); /* Busy */
+  TEST_PROACTORS_RUN(tps);
+  TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, no OPEN */
+  TEST_COND_NAME(t, "proton:io", last_condition);
+  pn_listener_close(l);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  /* Connect with no listener */
+  c = pn_connection();
+  pn_proactor_connect2(client, c, NULL, laddr.connect);
+  if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
+    TEST_COND_DESC(t, "refused", last_condition);
+    TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+  }
+
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Closing the connection during PN_TRANSPORT_ERROR should be a no-op
+ * Regression test for: https://issues.apache.org/jira/browse/PROTON-1586
+ */
+static pn_event_type_t transport_close_connection_handler(test_handler_t *th, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_TRANSPORT_ERROR:
+    pn_connection_close(pn_event_connection(e));
+    break;
+   default:
+    return open_wake_handler(th, e);
+  }
+  return PN_EVENT_NONE;
+}
+
+/* Closing the connection during PN_TRANSPORT_ERROR due to connection failure should be a no-op
+ * Regression test for: https://issues.apache.org/jira/browse/PROTON-1586
+ */
+static void test_proton_1586(test_t *t) {
+  test_proactor_t tps[] =  { test_proactor(t, transport_close_connection_handler) };
+  pn_proactor_connect2(tps[0].proactor, NULL, NULL, ":yyy");
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_COND_DESC(t, ":yyy", last_condition);
+  test_handler_clear(&tps[0].handler, 0); /* Clear events */
+  /* There should be no events generated after PN_TRANSPORT_CLOSED */
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+  TEST_HANDLER_EXPECT(&tps[0].handler,PN_PROACTOR_INACTIVE, 0);
+
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Test that we can control listen/select on ipv6/v4 and listen on both by default */
+static void test_ipv4_ipv6(test_t *t) {
+  test_proactor_t tps[] ={ test_proactor(t, open_close_handler), test_proactor(t, listen_handler) };
+  pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
+
+  /* Listen on all interfaces for IPv4 only. */
+  pn_listener_t *l4 = test_listen(&tps[1], "0.0.0.0");
+  TEST_PROACTORS_DRAIN(tps);
+
+  /* Empty address listens on both IPv4 and IPv6 on all interfaces */
+  pn_listener_t *l = test_listen(&tps[1], "");
+  TEST_PROACTORS_DRAIN(tps);
+
+#define EXPECT_CONNECT(LISTENER, HOST) do {                             \
+    char addr[1024];                                                    \
+    pn_proactor_addr(addr, sizeof(addr), HOST, listener_info(LISTENER).port); \
+    pn_proactor_connect2(client, NULL, NULL, addr);                     \
+    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));  \
+    TEST_COND_EMPTY(t, last_condition);                                 \
+    TEST_PROACTORS_DRAIN(tps);                                          \
+  } while(0)
+
+  EXPECT_CONNECT(l4, "127.0.0.1"); /* v4->v4 */
+  EXPECT_CONNECT(l4, "");          /* local->v4*/
+
+  EXPECT_CONNECT(l, "127.0.0.1"); /* v4->all */
+  EXPECT_CONNECT(l, "");          /* local->all */
+
+  /* Listen on ipv6 loopback, if it fails skip ipv6 tests.
+
+     NOTE: Don't use the unspecified address "::" here - ipv6-disabled platforms
+     may allow listening on "::" without complaining. However they won't have a
+     local ipv6 loopback configured, so "::1" will force an error.
+  */
+  TEST_PROACTORS_DRAIN(tps);
+  pn_listener_t *l6 = pn_listener();
+  pn_proactor_listen(server, l6, "::1:0", 4);
+  pn_event_type_t e = TEST_PROACTORS_RUN(tps);
+  if (e == PN_LISTENER_OPEN && !pn_condition_is_set(last_condition)) {
+    TEST_PROACTORS_DRAIN(tps);
+
+    EXPECT_CONNECT(l6, "::1"); /* v6->v6 */
+    EXPECT_CONNECT(l6, "");    /* local->v6 */
+    EXPECT_CONNECT(l, "::1");  /* v6->all */
+
+    pn_listener_close(l6);
+  } else  {
+    const char *d = pn_condition_get_description(last_condition);
+    TEST_LOGF(t, "skip IPv6 tests: %s %s", pn_event_type_name(e), d ? d : "no condition");
+  }
+
+  pn_listener_close(l);
+  pn_listener_close(l4);
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Make sure we clean up released connections and open sockets correctly */
+static void test_release_free(test_t *t) {
+  test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+  pn_proactor_t *client = tps[0].proactor;
+  pn_listener_t *l = test_listen(&tps[1], "");
+
+  /* leave one connection to the proactor  */
+  pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+
+  /* release c1 and free immediately */
+  pn_connection_t *c1 = pn_connection();
+  pn_proactor_connect2(client, c1, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  pn_proactor_release_connection(c1); /* We free but socket should still be cleaned up */
+  pn_connection_free(c1);
+  TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* Server closed */
+
+  /* release c2 and but don't free till after proactor free */
+  pn_connection_t *c2 = pn_connection();
+  pn_proactor_connect2(client, c2, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  pn_proactor_release_connection(c2);
+  TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* Server closed */
+
+  TEST_PROACTORS_DESTROY(tps);
+  pn_connection_free(c2);
+
+  /* Check freeing a listener or connection that was never given to a proactor */
+  pn_listener_free(pn_listener());
+  pn_connection_free(pn_connection());
+}
+
+#define SSL_FILE(NAME) CMAKE_CURRENT_SOURCE_DIR "/ssl-certs/" NAME
+#define SSL_PW "tserverpw"
+/* Windows vs. OpenSSL certificates */
+#if defined(_WIN32)
+#  define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12")
+#  define SET_CREDENTIALS(DOMAIN, NAME)                                 \
+  pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW)
+#else
+#  define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem")
+#  define SET_CREDENTIALS(DOMAIN, NAME)                                 \
+  pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME "-private-key.pem"), SSL_PW)
+#endif
+
+static pn_event_type_t ssl_handler(test_handler_t *h, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+
+   case PN_CONNECTION_BOUND:
+    TEST_CHECK(h->t, 0 == pn_ssl_init(pn_ssl(pn_event_transport(e)), h->ssl_domain, NULL));
+    return PN_EVENT_NONE;
+
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_ssl_t *ssl = pn_ssl(pn_event_transport(e));
+     TEST_CHECK(h->t, ssl);
+     char protocol[256];
+     TEST_CHECK(h->t, pn_ssl_get_protocol_name(ssl, protocol, sizeof(protocol)));
+     TEST_STR_IN(h->t, "TLS", protocol);
+     return PN_CONNECTION_REMOTE_OPEN;
+   }
+   default:
+    return PN_EVENT_NONE;
+  }
+}
+
+static pn_event_type_t ssl_server_handler(test_handler_t *h, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_BOUND:
+    return ssl_handler(h, e);
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_event_type_t et = ssl_handler(h, e);
+     pn_connection_open(pn_event_connection(e));
+     return et;
+   }
+   default:
+    return listen_handler(h, e);
+  }
+}
+
+static pn_event_type_t ssl_client_handler(test_handler_t *h, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_BOUND:
+    return ssl_handler(h, e);
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_event_type_t et = ssl_handler(h, e);
+     pn_connection_close(pn_event_connection(e));
+     return et;
+   }
+    break;
+   default:
+    return common_handler(h, e);
+  }
+}
+
+/* Test various SSL connections between proactors*/
+static void test_ssl(test_t *t) {
+  if (!pn_ssl_present()) {
+    TEST_LOGF(t, "Skip SSL test, no support");
+    return;
+  }
+
+  test_proactor_t tps[] ={ test_proactor(t, ssl_client_handler), test_proactor(t, ssl_server_handler) };
+  test_proactor_t *client = &tps[0], *server = &tps[1];
+  pn_ssl_domain_t *cd = client->handler.ssl_domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
+  pn_ssl_domain_t *sd =  server->handler.ssl_domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
+  TEST_CHECK(t, 0 == SET_CREDENTIALS(sd, "tserver"));
+  pn_listener_t *l = test_listen(server, "");
+
+  /* Basic SSL connection */
+  pn_proactor_connect2(client->proactor, NULL, NULL, listener_info(l).connect);
+  /* Open ok at both ends */
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  TEST_COND_EMPTY(t, last_condition);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  TEST_COND_EMPTY(t, last_condition);
+  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+
+  /* Verify peer with good hostname */
+  TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_trusted_ca_db(cd, CERTIFICATE("tserver")));
+  TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_peer_authentication(cd, PN_SSL_VERIFY_PEER_NAME, NULL));
+  pn_connection_t *c = pn_connection();
+  pn_connection_set_hostname(c, "test_server");
+  pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  TEST_COND_EMPTY(t, last_condition);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  TEST_COND_EMPTY(t, last_condition);
+  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+  TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+
+  /* Verify peer with bad hostname */
+  c = pn_connection();
+  pn_connection_set_hostname(c, "wrongname");
+  pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_COND_NAME(t, "amqp:connection:framing-error",  last_condition);
+  TEST_COND_DESC(t, "SSL",  last_condition);
+  TEST_PROACTORS_DRAIN(tps);
+
+  pn_ssl_domain_free(cd);
+  pn_ssl_domain_free(sd);
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+static void test_proactor_addr(test_t *t) {
+  /* Test the address formatter */
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), "foo", "bar");
+  TEST_STR_EQUAL(t, "foo:bar", addr);
+  pn_proactor_addr(addr, sizeof(addr), "foo", "");
+  TEST_STR_EQUAL(t, "foo:", addr);
+  pn_proactor_addr(addr, sizeof(addr), "foo", NULL);
+  TEST_STR_EQUAL(t, "foo:", addr);
+  pn_proactor_addr(addr, sizeof(addr), "", "bar");
+  TEST_STR_EQUAL(t, ":bar", addr);
+  pn_proactor_addr(addr, sizeof(addr), NULL, "bar");
+  TEST_STR_EQUAL(t, ":bar", addr);
+  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "5");
+  TEST_STR_EQUAL(t, "1:2:3:4:5", addr);
+  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "");
+  TEST_STR_EQUAL(t, "1:2:3:4:", addr);
+  pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", NULL);
+  TEST_STR_EQUAL(t, "1:2:3:4:", addr);
+}
+
+static void test_parse_addr(test_t *t) {
+  char buf[1024];
+  const char *host, *port;
+
+  TEST_CHECK(t, 0 == pni_parse_addr("foo:bar", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "foo", host);
+  TEST_STR_EQUAL(t, "bar", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr("foo:", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "foo", host);
+  TEST_STR_EQUAL(t, "5672", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":bar", buf, sizeof(buf), &host, &port));
+  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+  TEST_STR_EQUAL(t, "bar", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":", buf, sizeof(buf), &host, &port));
+  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+  TEST_STR_EQUAL(t, "5672", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":amqps", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "5671", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":amqp", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "5672", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr("::1:2:3", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "::1:2", host);
+  TEST_STR_EQUAL(t, "3", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":::", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "::", host);
+  TEST_STR_EQUAL(t, "5672", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr("", buf, sizeof(buf), &host, &port));
+  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+  TEST_STR_EQUAL(t, "5672", port);
+}
+
+/* Test pn_proactor_addr functions */
+
+static void test_netaddr(test_t *t) {
+  test_proactor_t tps[] ={ test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+  pn_proactor_t *client = tps[0].proactor;
+  /* Use IPv4 to get consistent results all platforms */
+  pn_listener_t *l = test_listen(&tps[1], "127.0.0.1");
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+  if (!TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps))) {
+    TEST_COND_EMPTY(t, last_condition); /* Show the last condition */
+    return;                     /* don't continue if connection is closed */
+  }
+
+  /* client remote, client local, server remote and server local address strings */
+  char cr[1024], cl[1024], sr[1024], sl[1024];
+
+  pn_transport_t *ct = pn_connection_transport(c);
+  const pn_netaddr_t *na = pn_transport_remote_addr(ct);
+  pn_netaddr_str(na, cr, sizeof(cr));
+  TEST_STR_IN(t, listener_info(l).port, cr); /* remote address has listening port */
+
+  pn_connection_t *s = last_accepted; /* server side of the connection */
+
+  pn_transport_t *st = pn_connection_transport(s);
+  if (!TEST_CHECK(t, st)) return;
+  pn_netaddr_str(pn_transport_local_addr(st), sl, sizeof(sl));
+  TEST_STR_EQUAL(t, cr, sl);  /* client remote == server local */
+
+  pn_netaddr_str(pn_transport_local_addr(ct), cl, sizeof(cl));
+  pn_netaddr_str(pn_transport_remote_addr(st), sr, sizeof(sr));
+  TEST_STR_EQUAL(t, cl, sr);    /* client local == server remote */
+
+  char host[MAX_STR] = "";
+  char serv[MAX_STR] = "";
+  int err = pn_netaddr_host_port(na, host, sizeof(host), serv, sizeof(serv));
+  TEST_CHECK(t, 0 == err);
+  TEST_STR_EQUAL(t, "127.0.0.1", host);
+  TEST_STR_EQUAL(t, listener_info(l).port, serv);
+
+  /* Make sure you can use NULL, 0 to get length of address string without a crash */
+  size_t len = pn_netaddr_str(pn_transport_local_addr(ct), NULL, 0);
+  TEST_CHECKF(t, strlen(cl) == len, "%d != %d", strlen(cl), len);
+
+  TEST_PROACTORS_DRAIN(tps);
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Test pn_proactor_disconnect */
+static void test_disconnect(test_t *t) {
+  test_proactor_t tps[] ={ test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+  pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
+
+  /* Start two listeners */
+  pn_listener_t *l = test_listen(&tps[1], "");
+  pn_listener_t *l2 = test_listen(&tps[1], "");
+
+  /* Only wait for one connection to remote-open before disconnect */
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  pn_connection_t *c2 = pn_connection();
+  pn_proactor_connect2(client, c2, NULL, listener_info(l2).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  TEST_PROACTORS_DRAIN(tps);
+
+  /* Disconnect the client proactor */
+  pn_condition_t *cond = pn_condition();
+  pn_condition_set_name(cond, "test-name");
+  pn_condition_set_description(cond, "test-description");
+  pn_proactor_disconnect(client, cond);
+  /* Verify expected client side first */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, test_proactors_run(&tps[0], 1));
+  TEST_COND_NAME(t, "test-name", last_condition);
+  TEST_COND_DESC(t, "test-description", last_condition);
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, test_proactors_run(&tps[0], 1));
+  TEST_COND_NAME(t, "test-name", last_condition);
+  TEST_COND_DESC(t, "test-description", last_condition);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, test_proactors_run(&tps[0], 1));
+
+  /* Now check server sees the disconnects */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+
+  /* Now disconnect the server end (the listeners) */
+  pn_proactor_disconnect(server, cond);
+  pn_condition_free(cond);
+
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+  /* Make sure the proactors are still functional */
+  pn_listener_t *l3 = test_listen(&tps[1], "");
+  pn_proactor_connect2(client, NULL, NULL, listener_info(l3).connect);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+  pn_proactor_disconnect(client, NULL);
+
+  TEST_PROACTORS_DRAIN(tps);
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+struct message_stream_context {
+  pn_link_t *sender;
+  pn_delivery_t *dlv;
+  pn_rwbytes_t send_buf, recv_buf;
+  ssize_t size, sent, received;
+  bool complete;
+};
+
+#define FRAME 512                   /* Smallest legal frame */
+#define CHUNK (FRAME + FRAME/2)     /* Chunk overflows frame */
+#define BODY (CHUNK*3 + CHUNK/2)    /* Body doesn't fit into chunks */
+
+static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e) {
+  struct message_stream_context *ctx = (struct message_stream_context*)th->context;
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_BOUND:
+    pn_transport_set_max_frame(pn_event_transport(e), FRAME);
+    return PN_EVENT_NONE;
+
+   case PN_SESSION_INIT:
+    pn_session_set_incoming_capacity(pn_event_session(e), FRAME); /* Single frame incoming */
+    pn_session_set_outgoing_window(pn_event_session(e), 1);       /* Single frame outgoing */
+    return PN_EVENT_NONE;
+
+   case PN_LINK_REMOTE_OPEN:
+    common_handler(th, e);
+    if (pn_link_is_receiver(pn_event_link(e))) {
+      pn_link_flow(pn_event_link(e), 1);
+    } else {
+      ctx->sender = pn_event_link(e);
+    }
+    return PN_EVENT_NONE;
+
+   case PN_LINK_FLOW:           /* Start a delivery */
+    if (pn_link_is_sender(pn_event_link(e)) && !ctx->dlv) {
+      ctx->dlv = pn_delivery(pn_event_link(e), pn_dtag("x", 1));
+    }
+    return PN_LINK_FLOW;
+
+   case PN_CONNECTION_WAKE: {     /* Send a chunk */
+     ssize_t remains = ctx->size - ctx->sent;
+     ssize_t n = (CHUNK < remains) ? CHUNK : remains;
+     TEST_CHECK(th->t, n == pn_link_send(ctx->sender, ctx->send_buf.start + ctx->sent, n));
+     ctx->sent += n;
+     if (ctx->sent == ctx->size) {
+       TEST_CHECK(th->t, pn_link_advance(ctx->sender));
+     }
+     return PN_CONNECTION_WAKE;
+   }
+
+   case PN_DELIVERY: {          /* Receive a delivery - smaller than a chunk? */
+     pn_delivery_t *dlv = pn_event_delivery(e);
+     if (pn_delivery_readable(dlv)) {
+       ssize_t n = pn_delivery_pending(dlv);
+       rwbytes_ensure(&ctx->recv_buf, ctx->received + n);
+       TEST_ASSERT(n == pn_link_recv(pn_event_link(e), ctx->recv_buf.start + ctx->received, n));
+       ctx->received += n;
+     }
+     ctx->complete = !pn_delivery_partial(dlv);
+     return PN_DELIVERY;
+   }
+
+   default:
+    return common_handler(th, e);
+  }
+}
+
+/* Test sending/receiving a message in chunks */
+static void test_message_stream(test_t *t) {
+  test_proactor_t tps[] ={
+    test_proactor(t, message_stream_handler),
+    test_proactor(t, message_stream_handler)
+  };
+  pn_proactor_t *client = tps[0].proactor;
+  pn_listener_t *l = test_listen(&tps[1], "");
+  struct message_stream_context ctx = { 0 };
+  tps[0].handler.context = &ctx;
+  tps[1].handler.context = &ctx;
+
+  /* Encode a large (not very) message to send in chunks */
+  char *body = (char*)malloc(BODY);
+  memset(body, 'x', BODY);
+  pn_message_t *m = pn_message();
+  pn_data_put_binary(pn_message_body(m), pn_bytes(BODY, body));
+  free(body);
+  ctx.size = message_encode(m, &ctx.send_buf);
+  pn_message_free(m);
+
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+  pn_session_t *ssn = pn_session(c);
+  pn_session_open(ssn);
+  pn_link_t *snd = pn_sender(ssn, "x");
+  pn_link_open(snd);
+  TEST_PROACTORS_RUN_UNTIL(tps, PN_LINK_FLOW);
+
+  /* Send and receive the message in chunks */
+  do {
+    pn_connection_wake(c);      /* Initiate send/receive of one chunk */
+    do {                        /* May be multiple receives for one send */
+      TEST_PROACTORS_RUN_UNTIL(tps, PN_DELIVERY);
+    } while (ctx.received < ctx.sent);
+  } while (!ctx.complete);
+  TEST_CHECK(t, ctx.received == ctx.size);
+  TEST_CHECK(t, ctx.sent == ctx.size);
+  TEST_CHECK(t, !memcmp(ctx.send_buf.start, ctx.recv_buf.start, ctx.size));
+
+  free(ctx.send_buf.start);
+  free(ctx.recv_buf.start);
+  TEST_PROACTORS_DESTROY(tps);
+}
+
+int main(int argc, char **argv) {
+  int failed = 0;
+  last_condition = pn_condition();
+  RUN_ARGV_TEST(failed, t, test_inactive(&t));
+  RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
+  RUN_ARGV_TEST(failed, t, test_errors(&t));
+  RUN_ARGV_TEST(failed, t, test_proton_1586(&t));
+  RUN_ARGV_TEST(failed, t, test_client_server(&t));
+  RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
+  RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
+  RUN_ARGV_TEST(failed, t, test_release_free(&t));
+#if !defined(_WIN32)
+  RUN_ARGV_TEST(failed, t, test_ssl(&t));
+#endif
+  RUN_ARGV_TEST(failed, t, test_proactor_addr(&t));
+  RUN_ARGV_TEST(failed, t, test_parse_addr(&t));
+  RUN_ARGV_TEST(failed, t, test_netaddr(&t));
+  RUN_ARGV_TEST(failed, t, test_disconnect(&t));
+  RUN_ARGV_TEST(failed, t, test_abort(&t));
+  RUN_ARGV_TEST(failed, t, test_refuse(&t));
+  RUN_ARGV_TEST(failed, t, test_message_stream(&t));
+  pn_condition_free(last_condition);
+  return failed;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org