You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/03 22:12:58 UTC
[09/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728:
Reorganize the source tree
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tests/object.c
----------------------------------------------------------------------
diff --git a/c/tests/object.c b/c/tests/object.c
new file mode 100644
index 0000000..8a1d00e
--- /dev/null
+++ b/c/tests/object.c
@@ -0,0 +1,1115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <proton/object.h>
+
+#define assert(E) ((E) ? 0 : (abort(), 0))
+
+static char mem;
+static void *END = &mem;
+
+static pn_list_t *build_list(size_t capacity, ...)
+{
+ pn_list_t *result = pn_list(PN_OBJECT, capacity);
+ va_list ap;
+
+ va_start(ap, capacity);
+ while (true) {
+ void *arg = va_arg(ap, void *);
+ if (arg == END) {
+ break;
+ }
+
+ pn_list_add(result, arg);
+ pn_class_decref(PN_OBJECT, arg);
+ }
+ va_end(ap);
+
+ return result;
+}
+
+static pn_map_t *build_map(float load_factor, size_t capacity, ...)
+{
+ pn_map_t *result = pn_map(PN_OBJECT, PN_OBJECT, capacity, load_factor);
+ va_list ap;
+
+ void *prev = NULL;
+
+ va_start(ap, capacity);
+ int count = 0;
+ while (true) {
+ void *arg = va_arg(ap, void *);
+ bool last = arg == END;
+ if (arg == END) {
+ arg = NULL;
+ }
+
+ if (count % 2) {
+ pn_map_put(result, prev, arg);
+ pn_class_decref(PN_OBJECT, prev);
+ pn_class_decref(PN_OBJECT, arg);
+ } else {
+ prev = arg;
+ }
+
+ if (last) {
+ break;
+ }
+
+ count++;
+ }
+ va_end(ap);
+
+ return result;
+}
+
+static void noop(void *o) {}
+static uintptr_t zero(void *o) { return 0; }
+static intptr_t delta(void *a, void *b) { return (uintptr_t) b - (uintptr_t) a; }
+
+#define CID_noop CID_pn_object
+#define noop_initialize noop
+#define noop_finalize noop
+#define noop_hashcode zero
+#define noop_compare delta
+#define noop_inspect NULL
+
+static const pn_class_t noop_class = PN_CLASS(noop);
+
+static void test_class(const pn_class_t *clazz, size_t size)
+{
+ void *a = pn_class_new(clazz, size);
+ void *b = pn_class_new(clazz, size);
+
+ assert(!pn_class_equals(clazz, a, b));
+ assert(pn_class_equals(clazz, a, a));
+ assert(pn_class_equals(clazz, b, b));
+ assert(!pn_class_equals(clazz, a, NULL));
+ assert(!pn_class_equals(clazz, NULL, a));
+
+ int rca = pn_class_refcount(clazz, a);
+ int rcb = pn_class_refcount(clazz, b);
+
+ assert(rca == -1 || rca == 1);
+ assert(rcb == -1 || rcb == 1);
+
+ pn_class_incref(clazz, a);
+
+ rca = pn_class_refcount(clazz, a);
+ assert(rca == -1 || rca == 2);
+
+ pn_class_decref(clazz, a);
+
+ rca = pn_class_refcount(clazz, a);
+ assert(rca == -1 || rca == 1);
+
+ pn_class_free(clazz, a);
+ pn_class_free(clazz, b);
+}
+
+static void test_new(size_t size, const pn_class_t *clazz)
+{
+ void *obj = pn_class_new(clazz, size);
+ assert(obj);
+ assert(pn_class_refcount(PN_OBJECT, obj) == 1);
+ assert(pn_class(obj) == clazz);
+ char *bytes = (char *) obj;
+ for (size_t i = 0; i < size; i++) {
+ // touch everything for valgrind
+ bytes[i] = i;
+ }
+ pn_free(obj);
+}
+
+static void finalizer(void *object)
+{
+ int **called = (int **) object;
+ (**called)++;
+}
+
+#define CID_finalizer CID_pn_object
+#define finalizer_initialize NULL
+#define finalizer_finalize finalizer
+#define finalizer_hashcode NULL
+#define finalizer_compare NULL
+#define finalizer_inspect NULL
+
+static void test_finalize(void)
+{
+ static pn_class_t clazz = PN_CLASS(finalizer);
+
+ int **obj = (int **) pn_class_new(&clazz, sizeof(int *));
+ assert(obj);
+
+ int called = 0;
+ *obj = &called;
+ pn_free(obj);
+
+ assert(called == 1);
+}
+
+static void test_free(void)
+{
+ // just to make sure it doesn't seg fault or anything
+ pn_free(NULL);
+}
+
+static uintptr_t hashcode(void *obj) { return (uintptr_t) obj; }
+
+#define CID_hashcode CID_pn_object
+#define hashcode_initialize NULL
+#define hashcode_finalize NULL
+#define hashcode_compare NULL
+#define hashcode_hashcode hashcode
+#define hashcode_inspect NULL
+
+static void test_hashcode(void)
+{
+ static pn_class_t clazz = PN_CLASS(hashcode);
+ void *obj = pn_class_new(&clazz, 0);
+ assert(obj);
+ assert(pn_hashcode(obj) == (uintptr_t) obj);
+ assert(pn_hashcode(NULL) == 0);
+ pn_free(obj);
+}
+
+#define CID_compare CID_pn_object
+#define compare_initialize NULL
+#define compare_finalize NULL
+#define compare_compare delta
+#define compare_hashcode NULL
+#define compare_inspect NULL
+
+static void test_compare(void)
+{
+ static pn_class_t clazz = PN_CLASS(compare);
+
+ void *a = pn_class_new(&clazz, 0);
+ assert(a);
+ void *b = pn_class_new(&clazz, 0);
+ assert(b);
+
+ assert(pn_compare(a, b));
+ assert(!pn_equals(a, b));
+ assert(!pn_compare(a, a));
+ assert(pn_equals(a, a));
+ assert(!pn_compare(b, b));
+ assert(pn_equals(b, b));
+ assert(pn_compare(a, b) == (intptr_t) ((uintptr_t) b - (uintptr_t) a));
+
+ assert(pn_compare(NULL, b));
+ assert(!pn_equals(NULL, b));
+
+ assert(pn_compare(a, NULL));
+ assert(!pn_equals(a, NULL));
+
+ assert(!pn_compare(NULL, NULL));
+ assert(pn_equals(NULL, NULL));
+
+ pn_free(a);
+ pn_free(b);
+}
+
+static void test_refcounting(int refs)
+{
+ void *obj = pn_class_new(PN_OBJECT, 0);
+
+ assert(pn_refcount(obj) == 1);
+
+ for (int i = 0; i < refs; i++) {
+ pn_incref(obj);
+ assert(pn_refcount(obj) == i + 2);
+ }
+
+ assert(pn_refcount(obj) == refs + 1);
+
+ for (int i = 0; i < refs; i++) {
+ pn_decref(obj);
+ assert(pn_refcount(obj) == refs - i);
+ }
+
+ assert(pn_refcount(obj) == 1);
+
+ pn_free(obj);
+}
+
+static void test_list(size_t capacity)
+{
+ pn_list_t *list = pn_list(PN_WEAKREF, 0);
+ assert(pn_list_size(list) == 0);
+ assert(!pn_list_add(list, (void *) 0));
+ assert(!pn_list_add(list, (void *) 1));
+ assert(!pn_list_add(list, (void *) 2));
+ assert(!pn_list_add(list, (void *) 3));
+ assert(pn_list_get(list, 0) == (void *) 0);
+ assert(pn_list_get(list, 1) == (void *) 1);
+ assert(pn_list_get(list, 2) == (void *) 2);
+ assert(pn_list_get(list, 3) == (void *) 3);
+ assert(pn_list_size(list) == 4);
+ pn_list_del(list, 1, 2);
+ assert(pn_list_size(list) == 2);
+ assert(pn_list_get(list, 0) == (void *) 0);
+ assert(pn_list_get(list, 1) == (void *) 3);
+ pn_decref(list);
+}
+
+static void test_list_refcount(size_t capacity)
+{
+ void *one = pn_class_new(PN_OBJECT, 0);
+ void *two = pn_class_new(PN_OBJECT, 0);
+ void *three = pn_class_new(PN_OBJECT, 0);
+ void *four = pn_class_new(PN_OBJECT, 0);
+
+ pn_list_t *list = pn_list(PN_OBJECT, 0);
+ assert(!pn_list_add(list, one));
+ assert(!pn_list_add(list, two));
+ assert(!pn_list_add(list, three));
+ assert(!pn_list_add(list, four));
+ assert(pn_list_get(list, 0) == one);
+ assert(pn_list_get(list, 1) == two);
+ assert(pn_list_get(list, 2) == three);
+ assert(pn_list_get(list, 3) == four);
+ assert(pn_list_size(list) == 4);
+
+ assert(pn_refcount(one) == 2);
+ assert(pn_refcount(two) == 2);
+ assert(pn_refcount(three) == 2);
+ assert(pn_refcount(four) == 2);
+
+ pn_list_del(list, 1, 2);
+ assert(pn_list_size(list) == 2);
+
+ assert(pn_refcount(one) == 2);
+ assert(pn_refcount(two) == 1);
+ assert(pn_refcount(three) == 1);
+ assert(pn_refcount(four) == 2);
+
+ assert(pn_list_get(list, 0) == one);
+ assert(pn_list_get(list, 1) == four);
+
+ assert(!pn_list_add(list, one));
+
+ assert(pn_list_size(list) == 3);
+ assert(pn_refcount(one) == 3);
+
+ pn_decref(list);
+
+ assert(pn_refcount(one) == 1);
+ assert(pn_refcount(two) == 1);
+ assert(pn_refcount(three) == 1);
+ assert(pn_refcount(four) == 1);
+
+ pn_decref(one);
+ pn_decref(two);
+ pn_decref(three);
+ pn_decref(four);
+}
+
+static void check_list_index(pn_list_t *list, void *value, ssize_t idx)
+{
+ assert(pn_list_index(list, value) == idx);
+}
+
+static void test_list_index(void)
+{
+ pn_list_t *l = pn_list(PN_WEAKREF, 0);
+ void *one = pn_string("one");
+ void *two = pn_string("two");
+ void *three = pn_string("three");
+ void *dup1 = pn_string("dup");
+ void *dup2 = pn_string("dup");
+ void *last = pn_string("last");
+
+ pn_list_add(l, one);
+ pn_list_add(l, two);
+ pn_list_add(l, three);
+ pn_list_add(l, dup1);
+ pn_list_add(l, dup2);
+ pn_list_add(l, last);
+
+ check_list_index(l, one, 0);
+ check_list_index(l, two, 1);
+ check_list_index(l, three, 2);
+ check_list_index(l, dup1, 3);
+ check_list_index(l, dup2, 3);
+ check_list_index(l, last, 5);
+
+ void *nonexistent = pn_string("nonexistent");
+
+ check_list_index(l, nonexistent, -1);
+
+ pn_free(l);
+ pn_free(one);
+ pn_free(two);
+ pn_free(three);
+ pn_free(dup1);
+ pn_free(dup2);
+ pn_free(last);
+ pn_free(nonexistent);
+}
+
+static bool pn_strequals(const char *a, const char *b)
+{
+ return !strcmp(a, b);
+}
+
+static void test_build_list(void)
+{
+ pn_list_t *l = build_list(0,
+ pn_string("one"),
+ pn_string("two"),
+ pn_string("three"),
+ END);
+
+ assert(pn_list_size(l) == 3);
+
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_list_get(l, 0)),
+ "one"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_list_get(l, 1)),
+ "two"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_list_get(l, 2)),
+ "three"));
+
+ pn_free(l);
+}
+
+static void test_build_map(void)
+{
+ pn_map_t *m = build_map(0.75, 0,
+ pn_string("key"),
+ pn_string("value"),
+ pn_string("key2"),
+ pn_string("value2"),
+ END);
+
+ assert(pn_map_size(m) == 2);
+
+ pn_string_t *key = pn_string(NULL);
+
+ pn_string_set(key, "key");
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_map_get(m, key)),
+ "value"));
+ pn_string_set(key, "key2");
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_map_get(m, key)),
+ "value2"));
+
+ pn_free(m);
+ pn_free(key);
+}
+
+static void test_build_map_odd(void)
+{
+ pn_map_t *m = build_map(0.75, 0,
+ pn_string("key"),
+ pn_string("value"),
+ pn_string("key2"),
+ pn_string("value2"),
+ pn_string("key3"),
+ END);
+
+ assert(pn_map_size(m) == 3);
+
+ pn_string_t *key = pn_string(NULL);
+
+ pn_string_set(key, "key");
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_map_get(m, key)),
+ "value"));
+ pn_string_set(key, "key2");
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_map_get(m, key)),
+ "value2"));
+ pn_string_set(key, "key3");
+ assert(pn_map_get(m, key) == NULL);
+
+ pn_free(m);
+ pn_free(key);
+}
+
+static void test_map(void)
+{
+ void *one = pn_class_new(PN_OBJECT, 0);
+ void *two = pn_class_new(PN_OBJECT, 0);
+ void *three = pn_class_new(PN_OBJECT, 0);
+
+ pn_map_t *map = pn_map(PN_OBJECT, PN_OBJECT, 4, 0.75);
+ assert(pn_map_size(map) == 0);
+
+ pn_string_t *key = pn_string("key");
+ pn_string_t *dup = pn_string("key");
+ pn_string_t *key1 = pn_string("key1");
+ pn_string_t *key2 = pn_string("key2");
+
+ assert(!pn_map_put(map, key, one));
+ assert(pn_map_size(map) == 1);
+ assert(!pn_map_put(map, key1, two));
+ assert(pn_map_size(map) == 2);
+ assert(!pn_map_put(map, key2, three));
+ assert(pn_map_size(map) == 3);
+
+ assert(pn_map_get(map, dup) == one);
+
+ assert(!pn_map_put(map, dup, one));
+ assert(pn_map_size(map) == 3);
+
+ assert(!pn_map_put(map, dup, two));
+ assert(pn_map_size(map) == 3);
+ assert(pn_map_get(map, dup) == two);
+
+ assert(pn_refcount(key) == 2);
+ assert(pn_refcount(dup) == 1);
+ assert(pn_refcount(key1) == 2);
+ assert(pn_refcount(key2) == 2);
+
+ assert(pn_refcount(one) == 1);
+ assert(pn_refcount(two) == 3);
+ assert(pn_refcount(three) == 2);
+
+ pn_map_del(map, key1);
+ assert(pn_map_size(map) == 2);
+
+ assert(pn_refcount(key) == 2);
+ assert(pn_refcount(dup) == 1);
+ assert(pn_refcount(key1) == 1);
+ assert(pn_refcount(key2) == 2);
+
+ assert(pn_refcount(one) == 1);
+ assert(pn_refcount(two) == 2);
+ assert(pn_refcount(three) == 2);
+
+ pn_decref(one);
+ pn_decref(two);
+ pn_decref(three);
+
+ pn_decref(key);
+ pn_decref(dup);
+ pn_decref(key1);
+ pn_decref(key2);
+
+ pn_decref(map);
+}
+
+static void test_hash(void)
+{
+ void *one = pn_class_new(PN_OBJECT, 0);
+ void *two = pn_class_new(PN_OBJECT, 0);
+ void *three = pn_class_new(PN_OBJECT, 0);
+
+ pn_hash_t *hash = pn_hash(PN_OBJECT, 4, 0.75);
+ pn_hash_put(hash, 0, NULL);
+ pn_hash_put(hash, 1, one);
+ pn_hash_put(hash, 2, two);
+ pn_hash_put(hash, 3, three);
+ pn_hash_put(hash, 4, one);
+ pn_hash_put(hash, 5, two);
+ pn_hash_put(hash, 6, three);
+ pn_hash_put(hash, 7, one);
+ pn_hash_put(hash, 8, two);
+ pn_hash_put(hash, 9, three);
+ pn_hash_put(hash, 10, one);
+ pn_hash_put(hash, 11, two);
+ pn_hash_put(hash, 12, three);
+ pn_hash_put(hash, 18, one);
+
+ assert(pn_hash_get(hash, 2) == two);
+ assert(pn_hash_get(hash, 5) == two);
+ assert(pn_hash_get(hash, 18) == one);
+ assert(pn_hash_get(hash, 0) == NULL);
+
+ assert(pn_hash_size(hash) == 14);
+
+ pn_hash_del(hash, 5);
+ assert(pn_hash_get(hash, 5) == NULL);
+ assert(pn_hash_size(hash) == 13);
+ pn_hash_del(hash, 18);
+ assert(pn_hash_get(hash, 18) == NULL);
+ assert(pn_hash_size(hash) == 12);
+
+ pn_decref(hash);
+
+ pn_decref(one);
+ pn_decref(two);
+ pn_decref(three);
+}
+
+
+// collider class: all objects have same hash, no two objects compare equal
+static intptr_t collider_compare(void *a, void *b)
+{
+ if (a == b) return 0;
+ return (a > b) ? 1 : -1;
+}
+
+static uintptr_t collider_hashcode(void *obj)
+{
+ return 23;
+}
+
+#define CID_collider CID_pn_object
+#define collider_initialize NULL
+#define collider_finalize NULL
+#define collider_inspect NULL
+
+static void test_map_links(void)
+{
+ const pn_class_t collider_clazz = PN_CLASS(collider);
+ void *keys[3];
+ for (int i = 0; i < 3; i++)
+ keys[i] = pn_class_new(&collider_clazz, 0);
+
+ // test deleting a head, middle link, tail
+
+ for (int delete_idx=0; delete_idx < 3; delete_idx++) {
+ pn_map_t *map = pn_map(PN_WEAKREF, PN_WEAKREF, 0, 0.75);
+ // create a chain of entries that have same head (from identical key hashcode)
+ for (int i = 0; i < 3; i++) {
+ pn_map_put(map, keys[i], keys[i]);
+ }
+ pn_map_del(map, keys[delete_idx]);
+ for (int i = 0; i < 3; i++) {
+ void *value = (i == delete_idx) ? NULL : keys[i];
+ assert (pn_map_get(map, keys[i]) == value);
+ }
+ pn_free(map);
+ }
+ for (int i = 0; i < 3; i++)
+ pn_free(keys[i]);
+}
+
+
+static bool equals(const char *a, const char *b)
+{
+ if (a == NULL && b == NULL) {
+ return true;
+ }
+
+ if (a == NULL || b == NULL) {
+ return false;
+ }
+
+ return !strcmp(a, b);
+}
+
+static void test_string(const char *value)
+{
+ size_t size = value ? strlen(value) : 0;
+
+ pn_string_t *str = pn_string(value);
+ assert(equals(pn_string_get(str), value));
+ assert(pn_string_size(str) == size);
+
+ pn_string_t *strn = pn_stringn(value, size);
+ assert(equals(pn_string_get(strn), value));
+ assert(pn_string_size(strn) == size);
+
+ pn_string_t *strset = pn_string(NULL);
+ pn_string_set(strset, value);
+ assert(equals(pn_string_get(strset), value));
+ assert(pn_string_size(strset) == size);
+
+ pn_string_t *strsetn = pn_string(NULL);
+ pn_string_setn(strsetn, value, size);
+ assert(equals(pn_string_get(strsetn), value));
+ assert(pn_string_size(strsetn) == size);
+
+ assert(pn_hashcode(str) == pn_hashcode(strn));
+ assert(pn_hashcode(str) == pn_hashcode(strset));
+ assert(pn_hashcode(str) == pn_hashcode(strsetn));
+
+ assert(!pn_compare(str, str));
+ assert(!pn_compare(str, strn));
+ assert(!pn_compare(str, strset));
+ assert(!pn_compare(str, strsetn));
+
+ pn_free(str);
+ pn_free(strn);
+ pn_free(strset);
+ pn_free(strsetn);
+}
+
+static void test_stringn(const char *value, size_t size)
+{
+ pn_string_t *strn = pn_stringn(value, size);
+ assert(equals(pn_string_get(strn), value));
+ assert(pn_string_size(strn) == size);
+
+ pn_string_t *strsetn = pn_string(NULL);
+ pn_string_setn(strsetn, value, size);
+ assert(equals(pn_string_get(strsetn), value));
+ assert(pn_string_size(strsetn) == size);
+
+ assert(pn_hashcode(strn) == pn_hashcode(strsetn));
+ assert(!pn_compare(strn, strsetn));
+
+ pn_free(strn);
+ pn_free(strsetn);
+}
+
+static void test_string_format(void)
+{
+ pn_string_t *str = pn_string("");
+ assert(str);
+ int err = pn_string_format(str, "%s", "this is a string that should be long "
+ "enough to force growth but just in case we'll "
+ "tack this other really long string on for the "
+ "heck of it");
+ assert(err == 0);
+ pn_free(str);
+}
+
+static void test_string_addf(void)
+{
+ pn_string_t *str = pn_string("hello ");
+ assert(str);
+ int err = pn_string_addf(str, "%s", "this is a string that should be long "
+ "enough to force growth but just in case we'll "
+ "tack this other really long string on for the "
+ "heck of it");
+ assert(err == 0);
+ pn_free(str);
+}
+
+static void test_map_iteration(int n)
+{
+ pn_list_t *pairs = pn_list(PN_OBJECT, 2*n);
+ for (int i = 0; i < n; i++) {
+ void *key = pn_class_new(PN_OBJECT, 0);
+ void *value = pn_class_new(PN_OBJECT, 0);
+ pn_list_add(pairs, key);
+ pn_list_add(pairs, value);
+ pn_decref(key);
+ pn_decref(value);
+ }
+
+ pn_map_t *map = pn_map(PN_OBJECT, PN_OBJECT, 0, 0.75);
+
+ assert(pn_map_head(map) == 0);
+
+ for (int i = 0; i < n; i++) {
+ pn_map_put(map, pn_list_get(pairs, 2*i), pn_list_get(pairs, 2*i + 1));
+ }
+
+ for (pn_handle_t entry = pn_map_head(map); entry; entry = pn_map_next(map, entry))
+ {
+ void *key = pn_map_key(map, entry);
+ void *value = pn_map_value(map, entry);
+ ssize_t idx = pn_list_index(pairs, key);
+ assert(idx >= 0);
+
+ assert(pn_list_get(pairs, idx) == key);
+ assert(pn_list_get(pairs, idx + 1) == value);
+
+ pn_list_del(pairs, idx, 2);
+ }
+
+ assert(pn_list_size(pairs) == 0);
+
+ pn_decref(map);
+ pn_decref(pairs);
+}
+
+void test_inspect(void *o, const char *expected)
+{
+ pn_string_t *dst = pn_string(NULL);
+ pn_inspect(o, dst);
+ assert(pn_strequals(pn_string_get(dst), expected));
+ pn_free(dst);
+}
+
+void test_list_inspect(void)
+{
+ pn_list_t *l = build_list(0, END);
+ test_inspect(l, "[]");
+ pn_free(l);
+
+ l = build_list(0, pn_string("one"), END);
+ test_inspect(l, "[\"one\"]");
+ pn_free(l);
+
+ l = build_list(0,
+ pn_string("one"),
+ pn_string("two"),
+ END);
+ test_inspect(l, "[\"one\", \"two\"]");
+ pn_free(l);
+
+ l = build_list(0,
+ pn_string("one"),
+ pn_string("two"),
+ pn_string("three"),
+ END);
+ test_inspect(l, "[\"one\", \"two\", \"three\"]");
+ pn_free(l);
+}
+
+void test_map_inspect(void)
+{
+ // note that when there is more than one entry in a map, the order
+ // of the entries is dependent on the hashes involved, it will be
+ // deterministic though
+ pn_map_t *m = build_map(0.75, 0, END);
+ test_inspect(m, "{}");
+ pn_free(m);
+
+ m = build_map(0.75, 0,
+ pn_string("key"), pn_string("value"),
+ END);
+ test_inspect(m, "{\"key\": \"value\"}");
+ pn_free(m);
+
+ m = build_map(0.75, 0,
+ pn_string("k1"), pn_string("v1"),
+ pn_string("k2"), pn_string("v2"),
+ END);
+ test_inspect(m, "{\"k1\": \"v1\", \"k2\": \"v2\"}");
+ pn_free(m);
+
+ m = build_map(0.75, 0,
+ pn_string("k1"), pn_string("v1"),
+ pn_string("k2"), pn_string("v2"),
+ pn_string("k3"), pn_string("v3"),
+ END);
+ test_inspect(m, "{\"k3\": \"v3\", \"k1\": \"v1\", \"k2\": \"v2\"}");
+ pn_free(m);
+}
+
+void test_map_coalesced_chain(void)
+{
+ pn_hash_t *map = pn_hash(PN_OBJECT, 16, 0.75);
+ pn_string_t *values[9] = {
+ pn_string("a"),
+ pn_string("b"),
+ pn_string("c"),
+ pn_string("d"),
+ pn_string("e"),
+ pn_string("f"),
+ pn_string("g"),
+ pn_string("h"),
+ pn_string("i")
+ };
+ //add some items:
+ pn_hash_put(map, 1, values[0]);
+ pn_hash_put(map, 2, values[1]);
+ pn_hash_put(map, 3, values[2]);
+
+ //use up all non-addressable elements:
+ pn_hash_put(map, 14, values[3]);
+ pn_hash_put(map, 15, values[4]);
+ pn_hash_put(map, 16, values[5]);
+
+ //use an addressable element for a key that doesn't map to it:
+ pn_hash_put(map, 4, values[6]);
+ pn_hash_put(map, 17, values[7]);
+ assert(pn_hash_size(map) == 8);
+
+ //free up one non-addressable entry:
+ pn_hash_del(map, 16);
+ assert(pn_hash_get(map, 16) == NULL);
+ assert(pn_hash_size(map) == 7);
+
+ //add a key whose addressable slot is already taken (by 17),
+ //generating a coalesced chain:
+ pn_hash_put(map, 12, values[8]);
+
+ //remove an entry from the coalesced chain:
+ pn_hash_del(map, 4);
+ assert(pn_hash_get(map, 4) == NULL);
+
+ //test lookup of all entries:
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 1)), "a"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 2)), "b"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 3)), "c"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 14)), "d"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 15)), "e"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 17)), "h"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 12)), "i"));
+ assert(pn_hash_size(map) == 7);
+
+ //cleanup:
+ for (pn_handle_t i = pn_hash_head(map); i; i = pn_hash_head(map)) {
+ pn_hash_del(map, pn_hash_key(map, i));
+ }
+ assert(pn_hash_size(map) == 0);
+
+ for (size_t i = 0; i < 9; ++i) {
+ pn_free(values[i]);
+ }
+ pn_free(map);
+}
+
+void test_map_coalesced_chain2(void)
+{
+ pn_hash_t *map = pn_hash(PN_OBJECT, 16, 0.75);
+ pn_string_t *values[10] = {
+ pn_string("a"),
+ pn_string("b"),
+ pn_string("c"),
+ pn_string("d"),
+ pn_string("e"),
+ pn_string("f"),
+ pn_string("g"),
+ pn_string("h"),
+ pn_string("i"),
+ pn_string("j")
+ };
+ //add some items:
+ pn_hash_put(map, 1, values[0]);//a
+ pn_hash_put(map, 2, values[1]);//b
+ pn_hash_put(map, 3, values[2]);//c
+
+ //use up all non-addressable elements:
+ pn_hash_put(map, 14, values[3]);//d
+ pn_hash_put(map, 15, values[4]);//e
+ pn_hash_put(map, 16, values[5]);//f
+ //take slot from addressable region
+ pn_hash_put(map, 29, values[6]);//g, goes into slot 12
+
+ //free up one non-addressable entry:
+ pn_hash_del(map, 14);
+ assert(pn_hash_get(map, 14) == NULL);
+
+ //add a key whose addressable slot is already taken (by 29),
+ //generating a coalesced chain:
+ pn_hash_put(map, 12, values[7]);//h
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 12)), "h"));
+ //delete from tail of coalesced chain:
+ pn_hash_del(map, 12);
+ assert(pn_hash_get(map, 12) == NULL);
+
+ //extend chain into cellar again, then coalesce again extending back
+ //into addressable region
+ pn_hash_put(map, 42, values[8]);//i
+ pn_hash_put(map, 25, values[9]);//j
+ //delete entry from coalesced chain, where next element in chain is
+ //in cellar:
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 29)), "g"));
+ pn_hash_del(map, 29);
+
+ //test lookup of all entries:
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 1)), "a"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 2)), "b"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 3)), "c"));
+ //d was deleted
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 15)), "e"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 16)), "f"));
+ //g was deleted, h was deleted
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 42)), "i"));
+ assert(pn_strequals(pn_string_get((pn_string_t *) pn_hash_get(map, 25)), "j"));
+ assert(pn_hash_size(map) == 7);
+
+ //cleanup:
+ for (pn_handle_t i = pn_hash_head(map); i; i = pn_hash_head(map)) {
+ pn_hash_del(map, pn_hash_key(map, i));
+ }
+ assert(pn_hash_size(map) == 0);
+
+ for (size_t i = 0; i < 10; ++i) {
+ pn_free(values[i]);
+ }
+ pn_free(map);
+}
+
+void test_list_compare(void)
+{
+ pn_list_t *a = pn_list(PN_OBJECT, 0);
+ pn_list_t *b = pn_list(PN_OBJECT, 0);
+
+ assert(pn_equals(a, b));
+
+ void *one = pn_class_new(PN_OBJECT, 0);
+ void *two = pn_class_new(PN_OBJECT, 0);
+ void *three = pn_class_new(PN_OBJECT, 0);
+
+ pn_list_add(a, one);
+ assert(!pn_equals(a, b));
+ pn_list_add(b, one);
+ assert(pn_equals(a, b));
+
+ pn_list_add(b, two);
+ assert(!pn_equals(a, b));
+ pn_list_add(a, two);
+ assert(pn_equals(a, b));
+
+ pn_list_add(a, three);
+ assert(!pn_equals(a, b));
+ pn_list_add(b, three);
+ assert(pn_equals(a, b));
+
+ pn_free(a); pn_free(b);
+ pn_free(one); pn_free(two); pn_free(three);
+}
+
+typedef struct {
+ pn_list_t *list;
+ size_t index;
+} pn_it_state_t;
+
+static void *pn_it_next(void *state) {
+ pn_it_state_t *it = (pn_it_state_t *) state;
+ if (it->index < pn_list_size(it->list)) {
+ return pn_list_get(it->list, it->index++);
+ } else {
+ return NULL;
+ }
+}
+
+void test_iterator(void)
+{
+ pn_list_t *list = build_list(0,
+ pn_string("one"),
+ pn_string("two"),
+ pn_string("three"),
+ pn_string("four"),
+ END);
+ pn_iterator_t *it = pn_iterator();
+ pn_it_state_t *state = (pn_it_state_t *) pn_iterator_start
+ (it, pn_it_next, sizeof(pn_it_state_t));
+ state->list = list;
+ state->index = 0;
+
+ void *obj;
+ int index = 0;
+ while ((obj = pn_iterator_next(it))) {
+ assert(obj == pn_list_get(list, index));
+ ++index;
+ }
+ assert(index == 4);
+
+ pn_free(list);
+ pn_free(it);
+}
+
+void test_heap(int seed, int size)
+{
+ srand(seed);
+ pn_list_t *list = pn_list(PN_VOID, 0);
+
+ intptr_t min = 0;
+ intptr_t max = 0;
+
+ for (int i = 0; i < size; i++) {
+ intptr_t r = rand();
+
+ if (i == 0) {
+ min = r;
+ max = r;
+ } else {
+ if (r < min) {
+ min = r;
+ }
+ if (r > max) {
+ max = r;
+ }
+ }
+
+ pn_list_minpush(list, (void *) r);
+ }
+
+ intptr_t prev = (intptr_t) pn_list_minpop(list);
+ assert(prev == min);
+ assert(pn_list_size(list) == (size_t)(size - 1));
+ int count = 0;
+ while (pn_list_size(list)) {
+ intptr_t r = (intptr_t) pn_list_minpop(list);
+ assert(r >= prev);
+ prev = r;
+ count++;
+ }
+ assert(count == size - 1);
+ assert(prev == max);
+
+ pn_free(list);
+}
+
+int main(int argc, char **argv)
+{
+ for (size_t i = 0; i < 128; i++) {
+ test_class(PN_OBJECT, i);
+ test_class(PN_VOID, i);
+ test_class(&noop_class, i);
+ }
+
+ for (size_t i = 0; i < 128; i++) {
+ test_new(i, PN_OBJECT);
+ test_new(i, &noop_class);
+ }
+
+ test_finalize();
+ test_free();
+ test_hashcode();
+ test_compare();
+
+ for (int i = 0; i < 1024; i++) {
+ test_refcounting(i);
+ }
+
+ for (size_t i = 0; i < 4; i++) {
+ test_list(i);
+ }
+
+ for (size_t i = 0; i < 4; i++) {
+ test_list_refcount(i);
+ }
+
+ test_list_index();
+
+ test_map();
+ test_map_links();
+
+ test_hash();
+
+ test_string(NULL);
+ test_string("");
+ test_string("this is a test");
+ test_string("012345678910111213151617181920212223242526272829303132333435363"
+ "738394041424344454647484950515253545556575859606162636465666768");
+ test_string("this has an embedded \000 in it");
+ test_stringn("this has an embedded \000 in it", 28);
+
+ test_string_format();
+ test_string_addf();
+
+ test_build_list();
+ test_build_map();
+ test_build_map_odd();
+
+ for (int i = 0; i < 64; i++)
+ {
+ test_map_iteration(i);
+ }
+
+ test_list_inspect();
+ test_map_inspect();
+ test_list_compare();
+ test_iterator();
+ for (int seed = 0; seed < 64; seed++) {
+ for (int size = 1; size <= 64; size++) {
+ test_heap(seed, size);
+ }
+ }
+
+ test_map_coalesced_chain();
+ test_map_coalesced_chain2();
+
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tests/parse-url.c
----------------------------------------------------------------------
diff --git a/c/tests/parse-url.c b/c/tests/parse-url.c
new file mode 100644
index 0000000..b632237
--- /dev/null
+++ b/c/tests/parse-url.c
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "proton/type_compat.h"
+#include "proton/error.h"
+#include "proton/url.h"
+
+static bool verify(const char* what, const char* want, const char* got) {
+ bool eq = (want == got || (want && got && strcmp(want, got) == 0));
+ if (!eq) printf(" %s: '%s' != '%s'\n", what, want, got);
+ return eq;
+}
+
+static bool test(const char* url, const char* scheme, const char* user, const char* pass, const char* host, const char* port, const char*path)
+{
+ pn_url_t *purl = pn_url_parse(url);
+ bool ok =
+ verify("scheme", scheme, pn_url_get_scheme(purl)) &&
+ verify("user", user, pn_url_get_username(purl)) &&
+ verify("pass", pass, pn_url_get_password(purl)) &&
+ verify("host", host, pn_url_get_host(purl)) &&
+ verify("port", port, pn_url_get_port(purl)) &&
+ verify("path", path, pn_url_get_path(purl));
+ pn_url_free(purl);
+ return ok;
+}
+
+// Run test and additionally verify the round trip of parse and stringify
+// matches original string.
+static bool testrt(const char* url, const char* scheme, const char* user, const char* pass, const char* host, const char* port, const char*path)
+{
+ bool ok = test(url, scheme, user, pass, host, port, path);
+ pn_url_t *purl = pn_url_parse(url);
+ ok = ok && verify("url", url, pn_url_str(purl));
+ pn_url_free(purl);
+ return ok;
+}
+
+#define TEST(EXPR) \
+ do { if (!(EXPR)) { printf("%s:%d: %s\n\n", __FILE__, __LINE__, #EXPR); failed++; } } while(0)
+
+int main(int argc, char **argv)
+{
+ int failed = 0;
+ TEST(testrt("/Foo.bar:90087@somewhere", 0, 0, 0, 0, 0, "Foo.bar:90087@somewhere"));
+ TEST(testrt("host", 0, 0, 0, "host", 0, 0));
+ TEST(testrt("host:423", 0, 0, 0, "host", "423", 0));
+ TEST(testrt("user@host", 0, "user", 0, "host", 0, 0));
+
+ // Can't round-trip passwords with ':', not strictly legal but the parser allows it.
+ TEST(test("user:1243^&^:pw@host:423", 0, "user", "1243^&^:pw", "host", "423", 0));
+ TEST(test("user:1243^&^:pw@host:423/Foo.bar:90087", 0, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087"));
+ TEST(test("user:1243^&^:pw@host:423/Foo.bar:90087@somewhere", 0, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087@somewhere"));
+
+ TEST(testrt("[::1]", 0, 0, 0, "::1", 0, 0));
+ TEST(testrt("[::1]:amqp", 0, 0, 0, "::1", "amqp", 0));
+ TEST(testrt("user@[::1]", 0, "user", 0, "::1", 0, 0));
+ TEST(testrt("user@[::1]:amqp", 0, "user", 0, "::1", "amqp", 0));
+
+ // Can't round-trip passwords with ':', not strictly legal but the parser allows it.
+ TEST(test("user:1243^&^:pw@[::1]:amqp", 0, "user", "1243^&^:pw", "::1", "amqp", 0));
+ TEST(test("user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", 0, "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"));
+ TEST(test("user:1243^&^:pw@[::1:amqp/Foo.bar:90087", 0, "user", "1243^&^:pw", "[::1", "amqp", "Foo.bar:90087"));
+ TEST(test("user:1243^&^:pw@::1]:amqp/Foo.bar:90087", 0, "user", "1243^&^:pw", "::1]", "amqp", "Foo.bar:90087"));
+
+ TEST(testrt("amqp://user@[::1]", "amqp", "user", 0, "::1", 0, 0));
+ TEST(testrt("amqp://user@[::1]:amqp", "amqp", "user", 0, "::1", "amqp", 0));
+ TEST(testrt("amqp://user@[1234:52:0:1260:f2de:f1ff:fe59:8f87]:amqp", "amqp", "user", 0, "1234:52:0:1260:f2de:f1ff:fe59:8f87", "amqp", 0));
+
+ // Can't round-trip passwords with ':', not strictly legal but the parser allows it.
+ TEST(test("amqp://user:1243^&^:pw@[::1]:amqp", "amqp", "user", "1243^&^:pw", "::1", "amqp", 0));
+ TEST(test("amqp://user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", "amqp", "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"));
+
+ TEST(testrt("amqp://host", "amqp", 0, 0, "host", 0, 0));
+ TEST(testrt("amqp://user@host", "amqp", "user", 0, "host", 0, 0));
+ TEST(testrt("amqp://user@host/path:%", "amqp", "user", 0, "host", 0, "path:%"));
+ TEST(testrt("amqp://user@host:5674/path:%", "amqp", "user", 0, "host", "5674", "path:%"));
+ TEST(testrt("amqp://user@host/path:%", "amqp", "user", 0, "host", 0, "path:%"));
+ TEST(testrt("amqp://bigbird@host/queue@host", "amqp", "bigbird", 0, "host", 0, "queue@host"));
+ TEST(testrt("amqp://host/queue@host", "amqp", 0, 0, "host", 0, "queue@host"));
+ TEST(testrt("amqp://host:9765/queue@host", "amqp", 0, 0, "host", "9765", "queue@host"));
+ TEST(test("user:pass%2fword@host", 0, "user", "pass/word", "host", 0, 0));
+ TEST(testrt("user:pass%2Fword@host", 0, "user", "pass/word", "host", 0, 0));
+ // Can't round-trip passwords with lowercase hex encoding
+ TEST(test("us%2fer:password@host", 0, "us/er", "password", "host", 0, 0));
+ TEST(testrt("us%2Fer:password@host", 0, "us/er", "password", "host", 0, 0));
+ // Can't round-trip passwords with lowercase hex encoding
+ TEST(test("user:pass%2fword%@host", 0, "user", "pass/word%", "host", 0, 0));
+ TEST(testrt("localhost/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", 0, 0, 0, "localhost", 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+ TEST(testrt("/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", 0, 0, 0, 0, 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+ TEST(testrt("amqp://localhost/temp-queue://ID:ganymede-36663-1408448359876-2:123:0", "amqp", 0, 0, "localhost", 0, "temp-queue://ID:ganymede-36663-1408448359876-2:123:0"));
+ // PROTON-995
+ TEST(testrt("amqps://%40user%2F%3A:%40pass%2F%3A@example.net/some_topic",
+ "amqps", "@user/:", "@pass/:", "example.net", 0, "some_topic"));
+ TEST(testrt("amqps://user%2F%3A=:pass%2F%3A=@example.net/some_topic",
+ "amqps", "user/:=", "pass/:=", "example.net", 0, "some_topic"));
+ // Really perverse url
+ TEST(testrt("://:@://:", "", "", "", 0, "", "/:"));
+ return failed;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tests/proactor.c
----------------------------------------------------------------------
diff --git a/c/tests/proactor.c b/c/tests/proactor.c
new file mode 100644
index 0000000..216d68a
--- /dev/null
+++ b/c/tests/proactor.c
@@ -0,0 +1,1102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_tools.h"
+#include "test_handler.h"
+#include "test_config.h"
+#include "../src/proactor/proactor-internal.h"
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/listener.h>
+#include <proton/session.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+#include <proton/ssl.h>
+#include <proton/transport.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define ARRAYLEN(A) (sizeof(A)/sizeof((A)[0]))
+
+/* Proactor and handler that take part in a test */
+typedef struct test_proactor_t {
+ test_handler_t handler;
+ pn_proactor_t *proactor;
+} test_proactor_t;
+
+static test_proactor_t test_proactor(test_t *t, test_handler_fn f) {
+ test_proactor_t tp;
+ test_handler_init(&tp.handler, t, f);
+ tp.proactor = pn_proactor();
+ TEST_ASSERT(tp.proactor);
+ return tp;
+}
+
+static void test_proactor_destroy(test_proactor_t *tp) {
+ pn_proactor_free(tp->proactor);
+}
+
+/* Set this to a pn_condition() to save condition data */
+pn_condition_t *last_condition = NULL;
+
+static void save_condition(pn_event_t *e) {
+ if (last_condition) {
+ pn_condition_t *cond = NULL;
+ if (pn_event_listener(e)) {
+ cond = pn_listener_condition(pn_event_listener(e));
+ } else {
+ cond = pn_event_condition(e);
+ }
+ if (cond) {
+ pn_condition_copy(last_condition, cond);
+ } else {
+ pn_condition_clear(last_condition);
+ }
+ }
+}
+
+/* Process events on a proactor array until a handler returns an event, or
+ * all proactors return NULL
+ */
+static pn_event_type_t test_proactors_get(test_proactor_t *tps, size_t n) {
+ if (last_condition) pn_condition_clear(last_condition);
+ while (true) {
+ bool busy = false;
+ for (test_proactor_t *tp = tps; tp < tps + n; ++tp) {
+ pn_event_batch_t *eb = pn_proactor_get(tp->proactor);
+ if (eb) {
+ busy = true;
+ pn_event_type_t ret = PN_EVENT_NONE;
+ for (pn_event_t* e = pn_event_batch_next(eb); e; e = pn_event_batch_next(eb)) {
+ test_handler_log(&tp->handler, e);
+ save_condition(e);
+ ret = tp->handler.f(&tp->handler, e);
+ if (ret) break;
+ }
+ pn_proactor_done(tp->proactor, eb);
+ if (ret) return ret;
+ }
+ }
+ if (!busy) {
+ return PN_EVENT_NONE;
+ }
+ }
+}
+
+/* Run an array of proactors till a handler returns an event. */
+static pn_event_type_t test_proactors_run(test_proactor_t *tps, size_t n) {
+ pn_event_type_t e;
+ while ((e = test_proactors_get(tps, n)) == PN_EVENT_NONE)
+ ;
+ return e;
+}
+
+/* Run an array of proactors till a handler returns the desired event. */
+void test_proactors_run_until(test_proactor_t *tps, size_t n, pn_event_type_t want) {
+ while (test_proactors_get(tps, n) != want)
+ ;
+}
+
+/* Drain and discard outstanding events from an array of proactors */
+static void test_proactors_drain(test_proactor_t *tps, size_t n) {
+ while (test_proactors_get(tps, n))
+ ;
+}
+
+
+#define TEST_PROACTORS_GET(A) test_proactors_get((A), ARRAYLEN(A))
+#define TEST_PROACTORS_RUN(A) test_proactors_run((A), ARRAYLEN(A))
+#define TEST_PROACTORS_RUN_UNTIL(A, WANT) test_proactors_run_until((A), ARRAYLEN(A), WANT)
+#define TEST_PROACTORS_DRAIN(A) test_proactors_drain((A), ARRAYLEN(A))
+
+#define TEST_PROACTORS_DESTROY(A) do { \
+ for (size_t i = 0; i < ARRAYLEN(A); ++i) \
+ test_proactor_destroy((A)+i); \
+ } while (0)
+
+
+#define MAX_STR 256
+struct addrinfo {
+ char host[MAX_STR];
+ char port[MAX_STR];
+ char connect[MAX_STR];
+ char host_port[MAX_STR];
+};
+
+struct addrinfo listener_info(pn_listener_t *l) {
+ struct addrinfo ai = {{0}};
+ const pn_netaddr_t *na = pn_listener_addr(l);
+ TEST_ASSERT(0 == pn_netaddr_host_port(na, ai.host, sizeof(ai.host), ai.port, sizeof(ai.port)));
+ for (na = pn_netaddr_next(na); na; na = pn_netaddr_next(na)) { /* Check that ports are consistent */
+ char port[MAX_STR];
+ TEST_ASSERT(0 == pn_netaddr_host_port(na, NULL, 0, port, sizeof(port)));
+ TEST_ASSERTF(0 == strcmp(port, ai.port), "%s != %s", port, ai.port);
+ }
+ (void)pn_proactor_addr(ai.connect, sizeof(ai.connect), "", ai.port); /* Address for connecting */
+ (void)pn_netaddr_str(na, ai.host_port, sizeof(ai.host_port)); /* host:port listening address */
+ return ai;
+}
+
+/* Return a pn_listener_t*, raise errors if not successful */
+pn_listener_t *test_listen(test_proactor_t *tp, const char *host) {
+ char addr[1024];
+ pn_listener_t *l = pn_listener();
+ (void)pn_proactor_addr(addr, sizeof(addr), host, "0");
+ pn_proactor_listen(tp->proactor, l, addr, 4);
+ TEST_ETYPE_EQUAL(tp->handler.t, PN_LISTENER_OPEN, test_proactors_run(tp, 1));
+ TEST_COND_EMPTY(tp->handler.t, last_condition);
+ return l;
+}
+
+
+/* Wait for the next single event, return its type */
+static pn_event_type_t wait_next(pn_proactor_t *proactor) {
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
+ pn_proactor_done(proactor, events);
+ return etype;
+}
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
+static void test_interrupt_timeout(test_t *t) {
+ pn_proactor_t *p = pn_proactor();
+ TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+ pn_proactor_interrupt(p);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INTERRUPT, wait_next(p));
+ TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+
+ /* Set an immediate timeout */
+ pn_proactor_set_timeout(p, 0);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p)); /* Inactive because timeout expired */
+
+ /* Set a (very short) timeout */
+ pn_proactor_set_timeout(p, 1);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
+
+ /* Set and cancel a timeout, make sure we don't get the timeout event */
+ pn_proactor_set_timeout(p, 10000000);
+ pn_proactor_cancel_timeout(p);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
+ TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+
+ pn_proactor_free(p);
+}
+
+/* Save the last connection accepted by the common_handler */
+pn_connection_t *last_accepted = NULL;
+
+/* Common handler for simple client/server interactions, */
+static pn_event_type_t common_handler(test_handler_t *th, pn_event_t *e) {
+ pn_connection_t *c = pn_event_connection(e);
+ pn_listener_t *l = pn_event_listener(e);
+
+ switch (pn_event_type(e)) {
+
+ /* Stop on these events */
+ case PN_TRANSPORT_CLOSED:
+ case PN_PROACTOR_INACTIVE:
+ case PN_PROACTOR_TIMEOUT:
+ case PN_LISTENER_OPEN:
+ return pn_event_type(e);
+
+ case PN_LISTENER_ACCEPT:
+ last_accepted = pn_connection();
+ pn_listener_accept2(l, last_accepted, NULL);
+ pn_listener_close(l); /* Only accept one connection */
+ return PN_EVENT_NONE;
+
+ case PN_CONNECTION_REMOTE_OPEN:
+ pn_connection_open(c); /* Return the open (no-op if already open) */
+ return PN_EVENT_NONE;
+
+ case PN_SESSION_REMOTE_OPEN:
+ pn_session_open(pn_event_session(e));
+ return PN_EVENT_NONE;
+
+ case PN_LINK_REMOTE_OPEN:
+ pn_link_open(pn_event_link(e));
+ return PN_EVENT_NONE;
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ pn_connection_close(c); /* Return the close */
+ return PN_EVENT_NONE;
+
+ /* Ignore these events */
+ case PN_CONNECTION_BOUND:
+ case PN_CONNECTION_INIT:
+ case PN_CONNECTION_LOCAL_CLOSE:
+ case PN_CONNECTION_LOCAL_OPEN:
+ case PN_LINK_INIT:
+ case PN_LINK_LOCAL_OPEN:
+ case PN_LISTENER_CLOSE:
+ case PN_SESSION_INIT:
+ case PN_SESSION_LOCAL_OPEN:
+ case PN_TRANSPORT:
+ case PN_TRANSPORT_ERROR:
+ case PN_TRANSPORT_HEAD_CLOSED:
+ case PN_TRANSPORT_TAIL_CLOSED:
+ return PN_EVENT_NONE;
+
+ default:
+ TEST_ERRORF(th->t, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+ return PN_EVENT_NONE; /* Fail the test but keep going */
+ }
+}
+
+/* Like common_handler but does not auto-close the listener after one accept,
+ and returns on LISTENER_CLOSE
+*/
+static pn_event_type_t listen_handler(test_handler_t *th, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_LISTENER_ACCEPT:
+ /* No automatic listener close/free for tests that accept multiple connections */
+ last_accepted = pn_connection();
+ pn_listener_accept2(pn_event_listener(e), last_accepted, NULL);
+ /* No automatic close */
+ return PN_EVENT_NONE;
+
+ case PN_LISTENER_CLOSE:
+ return PN_LISTENER_CLOSE;
+
+ default:
+ return common_handler(th, e);
+ }
+}
+
+/* close a connection when it is remote open */
+static pn_event_type_t open_close_handler(test_handler_t *th, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_REMOTE_OPEN:
+ pn_connection_close(pn_event_connection(e));
+ return PN_EVENT_NONE; /* common_handler will finish on TRANSPORT_CLOSED */
+ default:
+ return common_handler(th, e);
+ }
+}
+
+/* Test simple client/server connection with 2 proactors */
+static void test_client_server(test_t *t) {
+ test_proactor_t tps[] ={ test_proactor(t, open_close_handler), test_proactor(t, common_handler) };
+ pn_listener_t *l = test_listen(&tps[1], "");
+ /* Connect and wait for close at both ends */
+ pn_proactor_connect2(tps[0].proactor, NULL, NULL, listener_info(l).connect);
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Return on connection open, close and return on wake */
+static pn_event_type_t open_wake_handler(test_handler_t *th, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_REMOTE_OPEN:
+ return pn_event_type(e);
+ case PN_CONNECTION_WAKE:
+ pn_connection_close(pn_event_connection(e));
+ return pn_event_type(e);
+ default:
+ return common_handler(th, e);
+ }
+}
+
+/* Test waking up a connection that is idle */
+static void test_connection_wake(test_t *t) {
+ test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+ pn_proactor_t *client = tps[0].proactor;
+ pn_listener_t *l = test_listen(&tps[1], "");
+
+ pn_connection_t *c = pn_connection();
+ pn_incref(c); /* Keep a reference for wake() after free */
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+ pn_connection_wake(c);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* Both ends */
+ /* The pn_connection_t is still valid so wake is legal but a no-op */
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_EVENT_NONE, TEST_PROACTORS_GET(tps)); /* No more wake */
+
+ /* Verify we don't get a wake after close even if they happen together */
+ pn_connection_t *c2 = pn_connection();
+ pn_proactor_connect2(client, c2, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ pn_connection_wake(c2);
+ pn_proactor_disconnect(client, NULL);
+ pn_connection_wake(c2);
+
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, test_proactors_run(&tps[0], 1));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, test_proactors_run(&tps[0], 1));
+ TEST_ETYPE_EQUAL(t, PN_EVENT_NONE, test_proactors_get(&tps[0], 1)); /* No late wake */
+
+ TEST_PROACTORS_DESTROY(tps);
+ /* The pn_connection_t is still valid so wake is legal but a no-op */
+ pn_connection_wake(c);
+ pn_decref(c);
+}
+
+/* Close the transport to abort a connection, i.e. close the socket without an AMQP close */
+static pn_event_type_t listen_abort_handler(test_handler_t *th, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_REMOTE_OPEN:
+ /* Close the transport - abruptly closes the socket */
+ pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
+ pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
+ return PN_EVENT_NONE;
+
+ default:
+ /* Don't auto-close the listener to keep the event sequences simple */
+ return listen_handler(th, e);
+ }
+}
+
+/* Verify that pn_transport_close_head/tail aborts a connection without an AMQP protocol close */
+static void test_abort(test_t *t) {
+ test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_abort_handler) };
+ pn_proactor_t *client = tps[0].proactor;
+ pn_listener_t *l = test_listen(&tps[1], "");
+ pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
+
+ /* server transport closes */
+ if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
+ TEST_COND_NAME(t, "amqp:connection:framing-error",last_condition);
+ TEST_COND_DESC(t, "abort", last_condition);
+ }
+ /* client transport closes */
+ if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
+ TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition);
+ TEST_COND_DESC(t, "abort", last_condition);
+ }
+
+ pn_listener_close(l);
+
+ while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
+ while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
+
+ /* Verify expected event sequences, no unexpected events */
+ TEST_HANDLER_EXPECT(
+ &tps[0].handler,
+ PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_BOUND,
+ PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+ PN_PROACTOR_INACTIVE,
+ 0);
+
+ TEST_HANDLER_EXPECT(
+ &tps[1].handler,
+ PN_LISTENER_OPEN, PN_LISTENER_ACCEPT,
+ PN_CONNECTION_INIT, PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN,
+ PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+ PN_LISTENER_CLOSE,
+ PN_PROACTOR_INACTIVE,
+ 0);
+
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Refuse a connection: abort before the AMQP open sequence begins. */
+static pn_event_type_t listen_refuse_handler(test_handler_t *th, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+
+ case PN_CONNECTION_BOUND:
+ /* Close the transport - abruptly closes the socket */
+ pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
+ pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
+ return PN_EVENT_NONE;
+
+ default:
+ /* Don't auto-close the listener to keep the event sequences simple */
+ return listen_handler(th, e);
+ }
+}
+
+/* Verify that pn_transport_close_head/tail aborts a connection without an AMQP protocol close */
+static void test_refuse(test_t *t) {
+ test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_refuse_handler) };
+ pn_proactor_t *client = tps[0].proactor;
+ pn_listener_t *l = test_listen(&tps[1], "");
+ pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
+
+ /* client transport closes */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* client */
+ TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition);
+
+ pn_listener_close(l);
+ while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
+ while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {}
+
+ /* Verify expected event sequences, no unexpected events */
+ TEST_HANDLER_EXPECT(
+ &tps[0].handler,
+ PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_BOUND,
+ PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+ PN_PROACTOR_INACTIVE,
+ 0);
+
+ TEST_HANDLER_EXPECT(
+ &tps[1].handler,
+ PN_LISTENER_OPEN, PN_LISTENER_ACCEPT,
+ PN_CONNECTION_INIT, PN_CONNECTION_BOUND,
+ PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+ PN_LISTENER_CLOSE,
+ PN_PROACTOR_INACTIVE,
+ 0);
+
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Test that INACTIVE event is generated when last connections/listeners closes. */
+static void test_inactive(test_t *t) {
+ test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+ pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
+
+ /* Listen, connect, disconnect */
+ pn_listener_t *l = test_listen(&tps[1], "");
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ pn_connection_wake(c);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
+ /* Expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ /* Immediate timer generates INACTIVE on client (no connections) */
+ pn_proactor_set_timeout(client, 0);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ /* Connect, set-timer, disconnect */
+ pn_proactor_set_timeout(client, 1000000);
+ c = pn_connection();
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ pn_connection_wake(c);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
+ /* Expect TRANSPORT_CLOSED from client and server */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ /* No INACTIVE till timer is cancelled */
+ TEST_CHECK(t, pn_proactor_get(server) == NULL);
+ pn_proactor_cancel_timeout(client);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ /* Server won't be INACTIVE until listener is closed */
+ TEST_CHECK(t, pn_proactor_get(server) == NULL);
+ pn_listener_close(l);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Tests for error handling */
+static void test_errors(test_t *t) {
+ test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+ pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
+
+ /* Invalid connect/listen service name */
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect2(client, c, NULL, "127.0.0.1:xxx");
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_COND_DESC(t, "xxx", last_condition);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ pn_proactor_listen(server, pn_listener(), "127.0.0.1:xxx", 1);
+ TEST_PROACTORS_RUN(tps);
+ TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, no OPEN */
+ TEST_COND_DESC(t, "xxx", last_condition);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ /* Invalid connect/listen host name */
+ c = pn_connection();
+ pn_proactor_connect2(client, c, NULL, "nosuch.example.com:");
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_COND_DESC(t, "nosuch", last_condition);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ test_handler_clear(&tps[1].handler, 0);
+ pn_proactor_listen(server, pn_listener(), "nosuch.example.com:", 1);
+ TEST_PROACTORS_RUN(tps);
+ TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, no OPEN */
+ TEST_COND_DESC(t, "nosuch", last_condition);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ /* Listen on a port already in use */
+ pn_listener_t *l = pn_listener();
+ pn_proactor_listen(server, l, ":0", 1);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, TEST_PROACTORS_RUN(tps));
+ test_handler_clear(&tps[1].handler, 0);
+ struct addrinfo laddr = listener_info(l);
+ pn_proactor_listen(server, pn_listener(), laddr.connect, 1); /* Busy */
+ TEST_PROACTORS_RUN(tps);
+ TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, no OPEN */
+ TEST_COND_NAME(t, "proton:io", last_condition);
+ pn_listener_close(l);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ /* Connect with no listener */
+ c = pn_connection();
+ pn_proactor_connect2(client, c, NULL, laddr.connect);
+ if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
+ TEST_COND_DESC(t, "refused", last_condition);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+ }
+
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Closing the connection during PN_TRANSPORT_ERROR should be a no-op
+ * Regression test for: https://issues.apache.org/jira/browse/PROTON-1586
+ */
+static pn_event_type_t transport_close_connection_handler(test_handler_t *th, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_TRANSPORT_ERROR:
+ pn_connection_close(pn_event_connection(e));
+ break;
+ default:
+ return open_wake_handler(th, e);
+ }
+ return PN_EVENT_NONE;
+}
+
+/* Closing the connection during PN_TRANSPORT_ERROR due to connection failure should be a no-op
+ * Regression test for: https://issues.apache.org/jira/browse/PROTON-1586
+ */
+static void test_proton_1586(test_t *t) {
+ test_proactor_t tps[] = { test_proactor(t, transport_close_connection_handler) };
+ pn_proactor_connect2(tps[0].proactor, NULL, NULL, ":yyy");
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_COND_DESC(t, ":yyy", last_condition);
+ test_handler_clear(&tps[0].handler, 0); /* Clear events */
+ /* There should be no events generated after PN_TRANSPORT_CLOSED */
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+ TEST_HANDLER_EXPECT(&tps[0].handler,PN_PROACTOR_INACTIVE, 0);
+
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Test that we can control listen/select on ipv6/v4 and listen on both by default */
+static void test_ipv4_ipv6(test_t *t) {
+ test_proactor_t tps[] ={ test_proactor(t, open_close_handler), test_proactor(t, listen_handler) };
+ pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
+
+ /* Listen on all interfaces for IPv4 only. */
+ pn_listener_t *l4 = test_listen(&tps[1], "0.0.0.0");
+ TEST_PROACTORS_DRAIN(tps);
+
+ /* Empty address listens on both IPv4 and IPv6 on all interfaces */
+ pn_listener_t *l = test_listen(&tps[1], "");
+ TEST_PROACTORS_DRAIN(tps);
+
+#define EXPECT_CONNECT(LISTENER, HOST) do { \
+ char addr[1024]; \
+ pn_proactor_addr(addr, sizeof(addr), HOST, listener_info(LISTENER).port); \
+ pn_proactor_connect2(client, NULL, NULL, addr); \
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); \
+ TEST_COND_EMPTY(t, last_condition); \
+ TEST_PROACTORS_DRAIN(tps); \
+ } while(0)
+
+ EXPECT_CONNECT(l4, "127.0.0.1"); /* v4->v4 */
+ EXPECT_CONNECT(l4, ""); /* local->v4*/
+
+ EXPECT_CONNECT(l, "127.0.0.1"); /* v4->all */
+ EXPECT_CONNECT(l, ""); /* local->all */
+
+ /* Listen on ipv6 loopback, if it fails skip ipv6 tests.
+
+ NOTE: Don't use the unspecified address "::" here - ipv6-disabled platforms
+ may allow listening on "::" without complaining. However they won't have a
+ local ipv6 loopback configured, so "::1" will force an error.
+ */
+ TEST_PROACTORS_DRAIN(tps);
+ pn_listener_t *l6 = pn_listener();
+ pn_proactor_listen(server, l6, "::1:0", 4);
+ pn_event_type_t e = TEST_PROACTORS_RUN(tps);
+ if (e == PN_LISTENER_OPEN && !pn_condition_is_set(last_condition)) {
+ TEST_PROACTORS_DRAIN(tps);
+
+ EXPECT_CONNECT(l6, "::1"); /* v6->v6 */
+ EXPECT_CONNECT(l6, ""); /* local->v6 */
+ EXPECT_CONNECT(l, "::1"); /* v6->all */
+
+ pn_listener_close(l6);
+ } else {
+ const char *d = pn_condition_get_description(last_condition);
+ TEST_LOGF(t, "skip IPv6 tests: %s %s", pn_event_type_name(e), d ? d : "no condition");
+ }
+
+ pn_listener_close(l);
+ pn_listener_close(l4);
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Make sure we clean up released connections and open sockets correctly */
+static void test_release_free(test_t *t) {
+ test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+ pn_proactor_t *client = tps[0].proactor;
+ pn_listener_t *l = test_listen(&tps[1], "");
+
+ /* leave one connection to the proactor */
+ pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+
+ /* release c1 and free immediately */
+ pn_connection_t *c1 = pn_connection();
+ pn_proactor_connect2(client, c1, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ pn_proactor_release_connection(c1); /* We free but socket should still be cleaned up */
+ pn_connection_free(c1);
+ TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* Server closed */
+
+ /* release c2 and but don't free till after proactor free */
+ pn_connection_t *c2 = pn_connection();
+ pn_proactor_connect2(client, c2, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ pn_proactor_release_connection(c2);
+ TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* Server closed */
+
+ TEST_PROACTORS_DESTROY(tps);
+ pn_connection_free(c2);
+
+ /* Check freeing a listener or connection that was never given to a proactor */
+ pn_listener_free(pn_listener());
+ pn_connection_free(pn_connection());
+}
+
+#define SSL_FILE(NAME) CMAKE_CURRENT_SOURCE_DIR "/ssl-certs/" NAME
+#define SSL_PW "tserverpw"
+/* Windows vs. OpenSSL certificates */
+#if defined(_WIN32)
+# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.p12")
+# define SET_CREDENTIALS(DOMAIN, NAME) \
+ pn_ssl_domain_set_credentials(DOMAIN, SSL_FILE(NAME "-full.p12"), "", SSL_PW)
+#else
+# define CERTIFICATE(NAME) SSL_FILE(NAME "-certificate.pem")
+# define SET_CREDENTIALS(DOMAIN, NAME) \
+ pn_ssl_domain_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME "-private-key.pem"), SSL_PW)
+#endif
+
+static pn_event_type_t ssl_handler(test_handler_t *h, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+
+ case PN_CONNECTION_BOUND:
+ TEST_CHECK(h->t, 0 == pn_ssl_init(pn_ssl(pn_event_transport(e)), h->ssl_domain, NULL));
+ return PN_EVENT_NONE;
+
+ case PN_CONNECTION_REMOTE_OPEN: {
+ pn_ssl_t *ssl = pn_ssl(pn_event_transport(e));
+ TEST_CHECK(h->t, ssl);
+ char protocol[256];
+ TEST_CHECK(h->t, pn_ssl_get_protocol_name(ssl, protocol, sizeof(protocol)));
+ TEST_STR_IN(h->t, "TLS", protocol);
+ return PN_CONNECTION_REMOTE_OPEN;
+ }
+ default:
+ return PN_EVENT_NONE;
+ }
+}
+
+static pn_event_type_t ssl_server_handler(test_handler_t *h, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_BOUND:
+ return ssl_handler(h, e);
+ case PN_CONNECTION_REMOTE_OPEN: {
+ pn_event_type_t et = ssl_handler(h, e);
+ pn_connection_open(pn_event_connection(e));
+ return et;
+ }
+ default:
+ return listen_handler(h, e);
+ }
+}
+
+static pn_event_type_t ssl_client_handler(test_handler_t *h, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_BOUND:
+ return ssl_handler(h, e);
+ case PN_CONNECTION_REMOTE_OPEN: {
+ pn_event_type_t et = ssl_handler(h, e);
+ pn_connection_close(pn_event_connection(e));
+ return et;
+ }
+ break;
+ default:
+ return common_handler(h, e);
+ }
+}
+
+/* Test various SSL connections between proactors*/
+static void test_ssl(test_t *t) {
+ if (!pn_ssl_present()) {
+ TEST_LOGF(t, "Skip SSL test, no support");
+ return;
+ }
+
+ test_proactor_t tps[] ={ test_proactor(t, ssl_client_handler), test_proactor(t, ssl_server_handler) };
+ test_proactor_t *client = &tps[0], *server = &tps[1];
+ pn_ssl_domain_t *cd = client->handler.ssl_domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
+ pn_ssl_domain_t *sd = server->handler.ssl_domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
+ TEST_CHECK(t, 0 == SET_CREDENTIALS(sd, "tserver"));
+ pn_listener_t *l = test_listen(server, "");
+
+ /* Basic SSL connection */
+ pn_proactor_connect2(client->proactor, NULL, NULL, listener_info(l).connect);
+ /* Open ok at both ends */
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ TEST_COND_EMPTY(t, last_condition);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ TEST_COND_EMPTY(t, last_condition);
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+
+ /* Verify peer with good hostname */
+ TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_trusted_ca_db(cd, CERTIFICATE("tserver")));
+ TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_peer_authentication(cd, PN_SSL_VERIFY_PEER_NAME, NULL));
+ pn_connection_t *c = pn_connection();
+ pn_connection_set_hostname(c, "test_server");
+ pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ TEST_COND_EMPTY(t, last_condition);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ TEST_COND_EMPTY(t, last_condition);
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
+
+ /* Verify peer with bad hostname */
+ c = pn_connection();
+ pn_connection_set_hostname(c, "wrongname");
+ pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition);
+ TEST_COND_DESC(t, "SSL", last_condition);
+ TEST_PROACTORS_DRAIN(tps);
+
+ pn_ssl_domain_free(cd);
+ pn_ssl_domain_free(sd);
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+static void test_proactor_addr(test_t *t) {
+ /* Test the address formatter */
+ char addr[PN_MAX_ADDR];
+ pn_proactor_addr(addr, sizeof(addr), "foo", "bar");
+ TEST_STR_EQUAL(t, "foo:bar", addr);
+ pn_proactor_addr(addr, sizeof(addr), "foo", "");
+ TEST_STR_EQUAL(t, "foo:", addr);
+ pn_proactor_addr(addr, sizeof(addr), "foo", NULL);
+ TEST_STR_EQUAL(t, "foo:", addr);
+ pn_proactor_addr(addr, sizeof(addr), "", "bar");
+ TEST_STR_EQUAL(t, ":bar", addr);
+ pn_proactor_addr(addr, sizeof(addr), NULL, "bar");
+ TEST_STR_EQUAL(t, ":bar", addr);
+ pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "5");
+ TEST_STR_EQUAL(t, "1:2:3:4:5", addr);
+ pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", "");
+ TEST_STR_EQUAL(t, "1:2:3:4:", addr);
+ pn_proactor_addr(addr, sizeof(addr), "1:2:3:4", NULL);
+ TEST_STR_EQUAL(t, "1:2:3:4:", addr);
+}
+
+static void test_parse_addr(test_t *t) {
+ char buf[1024];
+ const char *host, *port;
+
+ TEST_CHECK(t, 0 == pni_parse_addr("foo:bar", buf, sizeof(buf), &host, &port));
+ TEST_STR_EQUAL(t, "foo", host);
+ TEST_STR_EQUAL(t, "bar", port);
+
+ TEST_CHECK(t, 0 == pni_parse_addr("foo:", buf, sizeof(buf), &host, &port));
+ TEST_STR_EQUAL(t, "foo", host);
+ TEST_STR_EQUAL(t, "5672", port);
+
+ TEST_CHECK(t, 0 == pni_parse_addr(":bar", buf, sizeof(buf), &host, &port));
+ TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+ TEST_STR_EQUAL(t, "bar", port);
+
+ TEST_CHECK(t, 0 == pni_parse_addr(":", buf, sizeof(buf), &host, &port));
+ TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+ TEST_STR_EQUAL(t, "5672", port);
+
+ TEST_CHECK(t, 0 == pni_parse_addr(":amqps", buf, sizeof(buf), &host, &port));
+ TEST_STR_EQUAL(t, "5671", port);
+
+ TEST_CHECK(t, 0 == pni_parse_addr(":amqp", buf, sizeof(buf), &host, &port));
+ TEST_STR_EQUAL(t, "5672", port);
+
+ TEST_CHECK(t, 0 == pni_parse_addr("::1:2:3", buf, sizeof(buf), &host, &port));
+ TEST_STR_EQUAL(t, "::1:2", host);
+ TEST_STR_EQUAL(t, "3", port);
+
+ TEST_CHECK(t, 0 == pni_parse_addr(":::", buf, sizeof(buf), &host, &port));
+ TEST_STR_EQUAL(t, "::", host);
+ TEST_STR_EQUAL(t, "5672", port);
+
+ TEST_CHECK(t, 0 == pni_parse_addr("", buf, sizeof(buf), &host, &port));
+ TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+ TEST_STR_EQUAL(t, "5672", port);
+}
+
+/* Test pn_proactor_addr functions */
+
+static void test_netaddr(test_t *t) {
+ test_proactor_t tps[] ={ test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+ pn_proactor_t *client = tps[0].proactor;
+ /* Use IPv4 to get consistent results all platforms */
+ pn_listener_t *l = test_listen(&tps[1], "127.0.0.1");
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+ if (!TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps))) {
+ TEST_COND_EMPTY(t, last_condition); /* Show the last condition */
+ return; /* don't continue if connection is closed */
+ }
+
+ /* client remote, client local, server remote and server local address strings */
+ char cr[1024], cl[1024], sr[1024], sl[1024];
+
+ pn_transport_t *ct = pn_connection_transport(c);
+ const pn_netaddr_t *na = pn_transport_remote_addr(ct);
+ pn_netaddr_str(na, cr, sizeof(cr));
+ TEST_STR_IN(t, listener_info(l).port, cr); /* remote address has listening port */
+
+ pn_connection_t *s = last_accepted; /* server side of the connection */
+
+ pn_transport_t *st = pn_connection_transport(s);
+ if (!TEST_CHECK(t, st)) return;
+ pn_netaddr_str(pn_transport_local_addr(st), sl, sizeof(sl));
+ TEST_STR_EQUAL(t, cr, sl); /* client remote == server local */
+
+ pn_netaddr_str(pn_transport_local_addr(ct), cl, sizeof(cl));
+ pn_netaddr_str(pn_transport_remote_addr(st), sr, sizeof(sr));
+ TEST_STR_EQUAL(t, cl, sr); /* client local == server remote */
+
+ char host[MAX_STR] = "";
+ char serv[MAX_STR] = "";
+ int err = pn_netaddr_host_port(na, host, sizeof(host), serv, sizeof(serv));
+ TEST_CHECK(t, 0 == err);
+ TEST_STR_EQUAL(t, "127.0.0.1", host);
+ TEST_STR_EQUAL(t, listener_info(l).port, serv);
+
+ /* Make sure you can use NULL, 0 to get length of address string without a crash */
+ size_t len = pn_netaddr_str(pn_transport_local_addr(ct), NULL, 0);
+ TEST_CHECKF(t, strlen(cl) == len, "%d != %d", strlen(cl), len);
+
+ TEST_PROACTORS_DRAIN(tps);
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+/* Test pn_proactor_disconnect */
+static void test_disconnect(test_t *t) {
+ test_proactor_t tps[] ={ test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) };
+ pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor;
+
+ /* Start two listeners */
+ pn_listener_t *l = test_listen(&tps[1], "");
+ pn_listener_t *l2 = test_listen(&tps[1], "");
+
+ /* Only wait for one connection to remote-open before disconnect */
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ pn_connection_t *c2 = pn_connection();
+ pn_proactor_connect2(client, c2, NULL, listener_info(l2).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ TEST_PROACTORS_DRAIN(tps);
+
+ /* Disconnect the client proactor */
+ pn_condition_t *cond = pn_condition();
+ pn_condition_set_name(cond, "test-name");
+ pn_condition_set_description(cond, "test-description");
+ pn_proactor_disconnect(client, cond);
+ /* Verify expected client side first */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, test_proactors_run(&tps[0], 1));
+ TEST_COND_NAME(t, "test-name", last_condition);
+ TEST_COND_DESC(t, "test-description", last_condition);
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, test_proactors_run(&tps[0], 1));
+ TEST_COND_NAME(t, "test-name", last_condition);
+ TEST_COND_DESC(t, "test-description", last_condition);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, test_proactors_run(&tps[0], 1));
+
+ /* Now check server sees the disconnects */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
+
+ /* Now disconnect the server end (the listeners) */
+ pn_proactor_disconnect(server, cond);
+ pn_condition_free(cond);
+
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
+
+ /* Make sure the proactors are still functional */
+ pn_listener_t *l3 = test_listen(&tps[1], "");
+ pn_proactor_connect2(client, NULL, NULL, listener_info(l3).connect);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
+ pn_proactor_disconnect(client, NULL);
+
+ TEST_PROACTORS_DRAIN(tps);
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+struct message_stream_context {
+ pn_link_t *sender;
+ pn_delivery_t *dlv;
+ pn_rwbytes_t send_buf, recv_buf;
+ ssize_t size, sent, received;
+ bool complete;
+};
+
+#define FRAME 512 /* Smallest legal frame */
+#define CHUNK (FRAME + FRAME/2) /* Chunk overflows frame */
+#define BODY (CHUNK*3 + CHUNK/2) /* Body doesn't fit into chunks */
+
+static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e) {
+ struct message_stream_context *ctx = (struct message_stream_context*)th->context;
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_BOUND:
+ pn_transport_set_max_frame(pn_event_transport(e), FRAME);
+ return PN_EVENT_NONE;
+
+ case PN_SESSION_INIT:
+ pn_session_set_incoming_capacity(pn_event_session(e), FRAME); /* Single frame incoming */
+ pn_session_set_outgoing_window(pn_event_session(e), 1); /* Single frame outgoing */
+ return PN_EVENT_NONE;
+
+ case PN_LINK_REMOTE_OPEN:
+ common_handler(th, e);
+ if (pn_link_is_receiver(pn_event_link(e))) {
+ pn_link_flow(pn_event_link(e), 1);
+ } else {
+ ctx->sender = pn_event_link(e);
+ }
+ return PN_EVENT_NONE;
+
+ case PN_LINK_FLOW: /* Start a delivery */
+ if (pn_link_is_sender(pn_event_link(e)) && !ctx->dlv) {
+ ctx->dlv = pn_delivery(pn_event_link(e), pn_dtag("x", 1));
+ }
+ return PN_LINK_FLOW;
+
+ case PN_CONNECTION_WAKE: { /* Send a chunk */
+ ssize_t remains = ctx->size - ctx->sent;
+ ssize_t n = (CHUNK < remains) ? CHUNK : remains;
+ TEST_CHECK(th->t, n == pn_link_send(ctx->sender, ctx->send_buf.start + ctx->sent, n));
+ ctx->sent += n;
+ if (ctx->sent == ctx->size) {
+ TEST_CHECK(th->t, pn_link_advance(ctx->sender));
+ }
+ return PN_CONNECTION_WAKE;
+ }
+
+ case PN_DELIVERY: { /* Receive a delivery - smaller than a chunk? */
+ pn_delivery_t *dlv = pn_event_delivery(e);
+ if (pn_delivery_readable(dlv)) {
+ ssize_t n = pn_delivery_pending(dlv);
+ rwbytes_ensure(&ctx->recv_buf, ctx->received + n);
+ TEST_ASSERT(n == pn_link_recv(pn_event_link(e), ctx->recv_buf.start + ctx->received, n));
+ ctx->received += n;
+ }
+ ctx->complete = !pn_delivery_partial(dlv);
+ return PN_DELIVERY;
+ }
+
+ default:
+ return common_handler(th, e);
+ }
+}
+
+/* Test sending/receiving a message in chunks */
+static void test_message_stream(test_t *t) {
+ test_proactor_t tps[] ={
+ test_proactor(t, message_stream_handler),
+ test_proactor(t, message_stream_handler)
+ };
+ pn_proactor_t *client = tps[0].proactor;
+ pn_listener_t *l = test_listen(&tps[1], "");
+ struct message_stream_context ctx = { 0 };
+ tps[0].handler.context = &ctx;
+ tps[1].handler.context = &ctx;
+
+ /* Encode a large (not very) message to send in chunks */
+ char *body = (char*)malloc(BODY);
+ memset(body, 'x', BODY);
+ pn_message_t *m = pn_message();
+ pn_data_put_binary(pn_message_body(m), pn_bytes(BODY, body));
+ free(body);
+ ctx.size = message_encode(m, &ctx.send_buf);
+ pn_message_free(m);
+
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect2(client, c, NULL, listener_info(l).connect);
+ pn_session_t *ssn = pn_session(c);
+ pn_session_open(ssn);
+ pn_link_t *snd = pn_sender(ssn, "x");
+ pn_link_open(snd);
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_LINK_FLOW);
+
+ /* Send and receive the message in chunks */
+ do {
+ pn_connection_wake(c); /* Initiate send/receive of one chunk */
+ do { /* May be multiple receives for one send */
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_DELIVERY);
+ } while (ctx.received < ctx.sent);
+ } while (!ctx.complete);
+ TEST_CHECK(t, ctx.received == ctx.size);
+ TEST_CHECK(t, ctx.sent == ctx.size);
+ TEST_CHECK(t, !memcmp(ctx.send_buf.start, ctx.recv_buf.start, ctx.size));
+
+ free(ctx.send_buf.start);
+ free(ctx.recv_buf.start);
+ TEST_PROACTORS_DESTROY(tps);
+}
+
+int main(int argc, char **argv) {
+ int failed = 0;
+ last_condition = pn_condition();
+ RUN_ARGV_TEST(failed, t, test_inactive(&t));
+ RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
+ RUN_ARGV_TEST(failed, t, test_errors(&t));
+ RUN_ARGV_TEST(failed, t, test_proton_1586(&t));
+ RUN_ARGV_TEST(failed, t, test_client_server(&t));
+ RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
+ RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
+ RUN_ARGV_TEST(failed, t, test_release_free(&t));
+#if !defined(_WIN32)
+ RUN_ARGV_TEST(failed, t, test_ssl(&t));
+#endif
+ RUN_ARGV_TEST(failed, t, test_proactor_addr(&t));
+ RUN_ARGV_TEST(failed, t, test_parse_addr(&t));
+ RUN_ARGV_TEST(failed, t, test_netaddr(&t));
+ RUN_ARGV_TEST(failed, t, test_disconnect(&t));
+ RUN_ARGV_TEST(failed, t, test_abort(&t));
+ RUN_ARGV_TEST(failed, t, test_refuse(&t));
+ RUN_ARGV_TEST(failed, t, test_message_stream(&t));
+ pn_condition_free(last_condition);
+ return failed;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org