You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/27 22:24:53 UTC

qpid-proton git commit: PROTON-1445: tests/proactor.c - fix memory management

Repository: qpid-proton
Updated Branches:
  refs/heads/master a85c89ac8 -> 6e63fd787


PROTON-1445: tests/proactor.c - fix memory management

Fix the unit tests to respect the new memory management rules.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6e63fd78
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6e63fd78
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6e63fd78

Branch: refs/heads/master
Commit: 6e63fd7878a033d5822987c4b943021e74b6fc83
Parents: a85c89a
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Mar 27 18:05:40 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Mar 27 18:23:53 2017 -0400

----------------------------------------------------------------------
 examples/c/proactor/broker.c  |  4 ++
 examples/c/proactor/direct.c  |  2 +
 examples/c/proactor/receive.c |  4 ++
 examples/c/proactor/send.c    |  1 +
 proton-c/src/proactor/libuv.c | 24 ++++++-----
 proton-c/src/tests/proactor.c | 83 +++++++++++++++++++++++++++++---------
 6 files changed, 87 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index d322ad0..6a7d1eb 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -340,6 +340,7 @@ static void handle(broker_t* b, pn_event_t* e) {
    case PN_TRANSPORT_CLOSED:
     connection_unsub(b, pn_event_connection(e));
     check_condition(e, pn_transport_condition(pn_event_transport(e)));
+    pn_connection_free(pn_event_connection(e));
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:
@@ -365,8 +366,11 @@ static void handle(broker_t* b, pn_event_t* e) {
    case PN_LISTENER_CLOSE:
     check_condition(e, pn_listener_condition(pn_event_listener(e)));
     broker_stop(b);
+    pn_listener_free(pn_event_listener(e));
     break;
 
+ break;
+
    case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
     broker_stop(b);
     break;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index f76895c..bda66db 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -246,6 +246,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
    case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
     pn_listener_close(app->listener); /* Finished */
+    pn_connection_free(pn_event_connection(event));
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:
@@ -271,6 +272,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
 
    case PN_LISTENER_CLOSE:
     check_condition(event, pn_listener_condition(pn_event_listener(event)));
+    pn_listener_free(pn_event_listener(event));
     break;
 
    case PN_PROACTOR_INACTIVE:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 43a68cd..6b4f02c 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -125,6 +125,10 @@ static bool handle(app_data_t* app, pn_event_t* event) {
      }
    } break;
 
+   case PN_TRANSPORT_CLOSED:
+    pn_connection_free(pn_event_connection(event));
+    break;
+
    case PN_TRANSPORT_ERROR:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
     break;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index c21ac68..0b2e68f 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -137,6 +137,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
 
    case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    pn_connection_free(pn_event_connection(event));
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 728ba7d..18d9101 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -655,6 +655,19 @@ static bool leader_process_listener(pn_listener_t *l) {
   /* NOTE: l may be concurrently accessed by on_connection() */
   bool closed = false;
   uv_mutex_lock(&l->lock);
+
+  /* Process accepted connections */
+  for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) {
+    int err = pconnection_init(pc);
+    if (!err) {
+      err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
+    } else {
+      listener_error(l, err, "accepting from");
+      pconnection_error(pc, err, "accepting from");
+    }
+    work_start(&pc->work);      /* Process events for the accepted/failed connection */
+  }
+
   switch (l->state) {
 
    case L_UNINIT:
@@ -685,17 +698,6 @@ static bool leader_process_listener(pn_listener_t *l) {
       closed = true;
     }
   }
-  /* Process accepted connections - if we are closed they will get an error */
-  for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) {
-    int err = pconnection_init(pc);
-    if (!err) {
-      err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
-    } else {
-      listener_error(l, err, "accepting from");
-      pconnection_error(pc, err, "accepting from");
-    }
-    work_start(&pc->work);      /* Process events for the accepted/failed connection */
-  }
   bool has_work = !closed && pn_collector_peek(l->collector);
   uv_mutex_unlock(&l->lock);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index f0431cb..b8c4088 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -167,7 +167,8 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
     /* Cleanup events */
    case PN_LISTENER_CLOSE:
     pn_listener_free(pn_event_listener(e));
-    return PN_LISTENER_CLOSE;
+    return PN_EVENT_NONE;
+
    case PN_TRANSPORT_CLOSED:
     pn_connection_free(pn_event_connection(e));
     return PN_TRANSPORT_CLOSED;
@@ -180,6 +181,7 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
 
    case PN_LISTENER_ACCEPT:
     pn_listener_accept(l, pn_connection());
+    pn_listener_close(l);       /* Only accept one connection */
     return PN_EVENT_NONE;
 
    case PN_CONNECTION_REMOTE_OPEN:
@@ -207,6 +209,23 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
   }
 }
 
+/* Like common_handler but does not auto-close the listener after one accept */
+static pn_event_type_t listen_handler(test_t *t, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_LISTENER_ACCEPT:
+    /* No automatic listener close/free for the inactive test */
+    pn_listener_accept(pn_event_listener(e), pn_connection());
+    return PN_EVENT_NONE;
+
+   case PN_LISTENER_CLOSE:
+    /* No automatic free */
+    return PN_LISTENER_CLOSE;
+
+   default:
+    return common_handler(t, e);
+  }
+}
+
 /* close a connection when it is remote open */
 static pn_event_type_t open_close_handler(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
@@ -218,7 +237,7 @@ static pn_event_type_t open_close_handler(test_t *t, pn_event_t *e) {
   }
 }
 
-/* Test several client/server connection with 2 proactors */
+/* Test simple client/server connection with 2 proactors */
 static void test_client_server(test_t *t) {
   proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
   PROACTOR_TEST_INIT(pts, t);
@@ -231,11 +250,6 @@ static void test_client_server(test_t *t) {
   pn_proactor_connect(client, pn_connection(), port.host_port);
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  /* Connect and wait for close at both ends */
-  pn_proactor_connect(client, pn_connection(), port.host_port);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-
   PROACTOR_TEST_FREE(pts);
 }
 
@@ -263,7 +277,6 @@ static void test_connection_wake(test_t *t) {
   sock_close(port.sock);
 
   pn_connection_t *c = pn_connection();
-  pn_incref(c);                 /* Keep c alive after proactor frees it */
   pn_proactor_connect(client, c, port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
@@ -276,12 +289,12 @@ static void test_connection_wake(test_t *t) {
   PROACTOR_TEST_FREE(pts);
   /* The pn_connection_t is still valid so wake is legal but a no-op */
   pn_connection_wake(c);
-  pn_decref(c);
+  pn_connection_free(c);
 }
 
 /* Test that INACTIVE event is generated when last connections/listeners closes. */
 static void test_inactive(test_t *t) {
-  proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
+  proactor_test_t pts[] =  { { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port(localhost);          /* Hold a port */
@@ -303,6 +316,7 @@ static void test_inactive(test_t *t) {
   pn_listener_close(l);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  pn_listener_free(l);
 
   sock_close(port.sock);
   PROACTOR_TEST_FREE(pts);
@@ -310,7 +324,7 @@ static void test_inactive(test_t *t) {
 
 /* Tests for error handling */
 static void test_errors(test_t *t) {
-  proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
+  proactor_test_t pts[] =  { { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port(localhost);          /* Hold a port */
@@ -328,6 +342,7 @@ static void test_errors(test_t *t) {
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
   TEST_STR_IN(t, "xxx", last_condition);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  pn_listener_free(l);
 
   /* Connect with no listener */
   c = pn_connection();
@@ -342,13 +357,14 @@ static void test_errors(test_t *t) {
 
 /* Test that we can control listen/select on ipv6/v4 and listen on both by default */
 static void test_ipv4_ipv6(test_t *t) {
-  proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
+  proactor_test_t pts[] ={ { open_close_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
 
   /* Listen on all interfaces for IPv6 only. If this fails, skip IPv6 tests */
   test_port_t port6 = test_port("[::]");
-  pn_proactor_listen(server, pn_listener(), port6.host_port, 4);
+  pn_listener_t *l6 = pn_listener();
+  pn_proactor_listen(server, l6, port6.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port6.sock);
   pn_event_type_t e = PROACTOR_TEST_GET(pts);
@@ -360,7 +376,8 @@ static void test_ipv4_ipv6(test_t *t) {
 
   /* Listen on all interfaces for IPv4 only. */
   test_port_t port4 = test_port("0.0.0.0");
-  pn_proactor_listen(server, pn_listener(), port4.host_port, 4);
+  pn_listener_t *l4 = pn_listener();
+  pn_proactor_listen(server, l4, port4.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port4.sock);
   TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);
@@ -368,11 +385,13 @@ static void test_ipv4_ipv6(test_t *t) {
 
   /* Empty address listens on both IPv4 and IPv6 on all interfaces */
   test_port_t port = test_port("");
-  pn_proactor_listen(server, pn_listener(), port.host_port, 4);
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(server, l, port.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
   e = PROACTOR_TEST_GET(pts);
-  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);   PROACTOR_TEST_DRAIN(pts);
+  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);
+  PROACTOR_TEST_DRAIN(pts);
 
 #define EXPECT_CONNECT(TP, HOST) do {                                   \
     pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
@@ -402,24 +421,48 @@ static void test_ipv4_ipv6(test_t *t) {
     EXPECT_FAIL(port6, "127.0.0.1"); /* fail v4->v6 */
     EXPECT_FAIL(port4, "[::1]");     /* fail v6->v4 */
   }
+  PROACTOR_TEST_DRAIN(pts);
+
+  pn_listener_close(l);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  pn_listener_free(l);
+
+  pn_listener_close(l6);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  pn_listener_free(l6);
+
+  pn_listener_close(l4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  pn_listener_free(l4);
 
   PROACTOR_TEST_FREE(pts);
 }
 
 /* Make sure pn_proactor_free cleans up open sockets */
 static void test_free_cleanup(test_t *t) {
-  proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+  proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t ports[3] = { test_port(localhost), test_port(localhost), test_port(localhost) };
+  pn_listener_t *l[3];
+  pn_connection_t *c[3];
   for (int i = 0; i < 3; ++i) {
-    pn_proactor_listen(server, pn_listener(), ports[i].host_port, 2);
+    l[i] = pn_listener();
+    pn_proactor_listen(server, l[i], ports[i].host_port, 2);
     TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
     sock_close(ports[i].sock);
-    pn_proactor_connect(client, pn_connection(), ports[i].host_port);
-    pn_proactor_connect(client, pn_connection(), ports[i].host_port);
+    c[i] = pn_connection();
+    pn_proactor_connect(client, c[i], ports[i].host_port);
   }
   PROACTOR_TEST_FREE(pts);
+  /* Safe to free after proactor is gone */
+  for (int i = 0; i < 3; ++i) {
+    pn_listener_free(l[i]);
+    pn_connection_free(c[i]);
+  }
+  /* Freeing an unused listener/connector should be safe */
+  pn_listener_free(pn_listener());
+  pn_connection_free(pn_connection());
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org