You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/27 22:24:53 UTC
qpid-proton git commit: PROTON-1445: tests/proactor.c - fix memory
management
Repository: qpid-proton
Updated Branches:
refs/heads/master a85c89ac8 -> 6e63fd787
PROTON-1445: tests/proactor.c - fix memory management
Fix the unit tests to respect the new memory management rules.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6e63fd78
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6e63fd78
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6e63fd78
Branch: refs/heads/master
Commit: 6e63fd7878a033d5822987c4b943021e74b6fc83
Parents: a85c89a
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Mar 27 18:05:40 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Mar 27 18:23:53 2017 -0400
----------------------------------------------------------------------
examples/c/proactor/broker.c | 4 ++
examples/c/proactor/direct.c | 2 +
examples/c/proactor/receive.c | 4 ++
examples/c/proactor/send.c | 1 +
proton-c/src/proactor/libuv.c | 24 ++++++-----
proton-c/src/tests/proactor.c | 83 +++++++++++++++++++++++++++++---------
6 files changed, 87 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index d322ad0..6a7d1eb 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -340,6 +340,7 @@ static void handle(broker_t* b, pn_event_t* e) {
case PN_TRANSPORT_CLOSED:
connection_unsub(b, pn_event_connection(e));
check_condition(e, pn_transport_condition(pn_event_transport(e)));
+ pn_connection_free(pn_event_connection(e));
break;
case PN_CONNECTION_REMOTE_CLOSE:
@@ -365,8 +366,11 @@ static void handle(broker_t* b, pn_event_t* e) {
case PN_LISTENER_CLOSE:
check_condition(e, pn_listener_condition(pn_event_listener(e)));
broker_stop(b);
+ pn_listener_free(pn_event_listener(e));
break;
+ break;
+
case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
broker_stop(b);
break;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index f76895c..bda66db 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -246,6 +246,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
case PN_TRANSPORT_CLOSED:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
pn_listener_close(app->listener); /* Finished */
+ pn_connection_free(pn_event_connection(event));
break;
case PN_CONNECTION_REMOTE_CLOSE:
@@ -271,6 +272,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
case PN_LISTENER_CLOSE:
check_condition(event, pn_listener_condition(pn_event_listener(event)));
+ pn_listener_free(pn_event_listener(event));
break;
case PN_PROACTOR_INACTIVE:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 43a68cd..6b4f02c 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -125,6 +125,10 @@ static bool handle(app_data_t* app, pn_event_t* event) {
}
} break;
+ case PN_TRANSPORT_CLOSED:
+ pn_connection_free(pn_event_connection(event));
+ break;
+
case PN_TRANSPORT_ERROR:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
break;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index c21ac68..0b2e68f 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -137,6 +137,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
case PN_TRANSPORT_CLOSED:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
+ pn_connection_free(pn_event_connection(event));
break;
case PN_CONNECTION_REMOTE_CLOSE:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 728ba7d..18d9101 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -655,6 +655,19 @@ static bool leader_process_listener(pn_listener_t *l) {
/* NOTE: l may be concurrently accessed by on_connection() */
bool closed = false;
uv_mutex_lock(&l->lock);
+
+ /* Process accepted connections */
+ for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) {
+ int err = pconnection_init(pc);
+ if (!err) {
+ err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
+ } else {
+ listener_error(l, err, "accepting from");
+ pconnection_error(pc, err, "accepting from");
+ }
+ work_start(&pc->work); /* Process events for the accepted/failed connection */
+ }
+
switch (l->state) {
case L_UNINIT:
@@ -685,17 +698,6 @@ static bool leader_process_listener(pn_listener_t *l) {
closed = true;
}
}
- /* Process accepted connections - if we are closed they will get an error */
- for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) {
- int err = pconnection_init(pc);
- if (!err) {
- err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
- } else {
- listener_error(l, err, "accepting from");
- pconnection_error(pc, err, "accepting from");
- }
- work_start(&pc->work); /* Process events for the accepted/failed connection */
- }
bool has_work = !closed && pn_collector_peek(l->collector);
uv_mutex_unlock(&l->lock);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index f0431cb..b8c4088 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -167,7 +167,8 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
/* Cleanup events */
case PN_LISTENER_CLOSE:
pn_listener_free(pn_event_listener(e));
- return PN_LISTENER_CLOSE;
+ return PN_EVENT_NONE;
+
case PN_TRANSPORT_CLOSED:
pn_connection_free(pn_event_connection(e));
return PN_TRANSPORT_CLOSED;
@@ -180,6 +181,7 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
case PN_LISTENER_ACCEPT:
pn_listener_accept(l, pn_connection());
+ pn_listener_close(l); /* Only accept one connection */
return PN_EVENT_NONE;
case PN_CONNECTION_REMOTE_OPEN:
@@ -207,6 +209,23 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
}
}
+/* Like common_handler but does not auto-close the listener after one accept */
+static pn_event_type_t listen_handler(test_t *t, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_LISTENER_ACCEPT:
+ /* No automatic listener close/free for the inactive test */
+ pn_listener_accept(pn_event_listener(e), pn_connection());
+ return PN_EVENT_NONE;
+
+ case PN_LISTENER_CLOSE:
+ /* No automatic free */
+ return PN_LISTENER_CLOSE;
+
+ default:
+ return common_handler(t, e);
+ }
+}
+
/* close a connection when it is remote open */
static pn_event_type_t open_close_handler(test_t *t, pn_event_t *e) {
switch (pn_event_type(e)) {
@@ -218,7 +237,7 @@ static pn_event_type_t open_close_handler(test_t *t, pn_event_t *e) {
}
}
-/* Test several client/server connection with 2 proactors */
+/* Test simple client/server connection with 2 proactors */
static void test_client_server(test_t *t) {
proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
PROACTOR_TEST_INIT(pts, t);
@@ -231,11 +250,6 @@ static void test_client_server(test_t *t) {
pn_proactor_connect(client, pn_connection(), port.host_port);
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
- /* Connect and wait for close at both ends */
- pn_proactor_connect(client, pn_connection(), port.host_port);
- TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
- TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-
PROACTOR_TEST_FREE(pts);
}
@@ -263,7 +277,6 @@ static void test_connection_wake(test_t *t) {
sock_close(port.sock);
pn_connection_t *c = pn_connection();
- pn_incref(c); /* Keep c alive after proactor frees it */
pn_proactor_connect(client, c, port.host_port);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
@@ -276,12 +289,12 @@ static void test_connection_wake(test_t *t) {
PROACTOR_TEST_FREE(pts);
/* The pn_connection_t is still valid so wake is legal but a no-op */
pn_connection_wake(c);
- pn_decref(c);
+ pn_connection_free(c);
}
/* Test that INACTIVE event is generated when last connections/listeners closes. */
static void test_inactive(test_t *t) {
- proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
test_port_t port = test_port(localhost); /* Hold a port */
@@ -303,6 +316,7 @@ static void test_inactive(test_t *t) {
pn_listener_close(l);
TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+ pn_listener_free(l);
sock_close(port.sock);
PROACTOR_TEST_FREE(pts);
@@ -310,7 +324,7 @@ static void test_inactive(test_t *t) {
/* Tests for error handling */
static void test_errors(test_t *t) {
- proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
test_port_t port = test_port(localhost); /* Hold a port */
@@ -328,6 +342,7 @@ static void test_errors(test_t *t) {
TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
TEST_STR_IN(t, "xxx", last_condition);
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+ pn_listener_free(l);
/* Connect with no listener */
c = pn_connection();
@@ -342,13 +357,14 @@ static void test_errors(test_t *t) {
/* Test that we can control listen/select on ipv6/v4 and listen on both by default */
static void test_ipv4_ipv6(test_t *t) {
- proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
+ proactor_test_t pts[] ={ { open_close_handler }, { listen_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
/* Listen on all interfaces for IPv6 only. If this fails, skip IPv6 tests */
test_port_t port6 = test_port("[::]");
- pn_proactor_listen(server, pn_listener(), port6.host_port, 4);
+ pn_listener_t *l6 = pn_listener();
+ pn_proactor_listen(server, l6, port6.host_port, 4);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
sock_close(port6.sock);
pn_event_type_t e = PROACTOR_TEST_GET(pts);
@@ -360,7 +376,8 @@ static void test_ipv4_ipv6(test_t *t) {
/* Listen on all interfaces for IPv4 only. */
test_port_t port4 = test_port("0.0.0.0");
- pn_proactor_listen(server, pn_listener(), port4.host_port, 4);
+ pn_listener_t *l4 = pn_listener();
+ pn_proactor_listen(server, l4, port4.host_port, 4);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
sock_close(port4.sock);
TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", last_condition);
@@ -368,11 +385,13 @@ static void test_ipv4_ipv6(test_t *t) {
/* Empty address listens on both IPv4 and IPv6 on all interfaces */
test_port_t port = test_port("");
- pn_proactor_listen(server, pn_listener(), port.host_port, 4);
+ pn_listener_t *l = pn_listener();
+ pn_proactor_listen(server, l, port.host_port, 4);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
sock_close(port.sock);
e = PROACTOR_TEST_GET(pts);
- TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", last_condition); PROACTOR_TEST_DRAIN(pts);
+ TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", last_condition);
+ PROACTOR_TEST_DRAIN(pts);
#define EXPECT_CONNECT(TP, HOST) do { \
pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
@@ -402,24 +421,48 @@ static void test_ipv4_ipv6(test_t *t) {
EXPECT_FAIL(port6, "127.0.0.1"); /* fail v4->v6 */
EXPECT_FAIL(port4, "[::1]"); /* fail v6->v4 */
}
+ PROACTOR_TEST_DRAIN(pts);
+
+ pn_listener_close(l);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+ pn_listener_free(l);
+
+ pn_listener_close(l6);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+ pn_listener_free(l6);
+
+ pn_listener_close(l4);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+ pn_listener_free(l4);
PROACTOR_TEST_FREE(pts);
}
/* Make sure pn_proactor_free cleans up open sockets */
static void test_free_cleanup(test_t *t) {
- proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
test_port_t ports[3] = { test_port(localhost), test_port(localhost), test_port(localhost) };
+ pn_listener_t *l[3];
+ pn_connection_t *c[3];
for (int i = 0; i < 3; ++i) {
- pn_proactor_listen(server, pn_listener(), ports[i].host_port, 2);
+ l[i] = pn_listener();
+ pn_proactor_listen(server, l[i], ports[i].host_port, 2);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
sock_close(ports[i].sock);
- pn_proactor_connect(client, pn_connection(), ports[i].host_port);
- pn_proactor_connect(client, pn_connection(), ports[i].host_port);
+ c[i] = pn_connection();
+ pn_proactor_connect(client, c[i], ports[i].host_port);
}
PROACTOR_TEST_FREE(pts);
+ /* Safe to free after proactor is gone */
+ for (int i = 0; i < 3; ++i) {
+ pn_listener_free(l[i]);
+ pn_connection_free(c[i]);
+ }
+ /* Freeing an unused listener/connector should be safe */
+ pn_listener_free(pn_listener());
+ pn_connection_free(pn_connection());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org