You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:51:21 UTC

[14/38] qpid-proton git commit: PROTON-1403: c proactor memory leaks

PROTON-1403: c proactor memory leaks


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/85687373
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/85687373
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/85687373

Branch: refs/heads/go1
Commit: 85687373d8d5ec84cc9ac81ba4af2e1a81d9e294
Parents: a21a03e
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Feb 14 12:06:26 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Feb 14 12:06:26 2017 -0500

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c | 85 +++++++++++++++++++++-----------------
 1 file changed, 47 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85687373/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 29463d5..173f767 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -65,6 +65,12 @@
   - leader_* - called in leader thread (either leader_q processing or from an on_ function)
   - worker_* - called in worker thread
   - *_lh - called with the relevant lock held
+
+  LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their
+  UV handles have received an on_close(). Freeing resources is always initiated by
+  uv_close() of the uv_tcp_t handle, and completed in on_close() handler functions when it
+  is safe. The only exception is when an error occurs that prevents a pn_connection_t or
+  pn_listener_t from being associated with a uv handle at all.
 */
 
 const char *COND_NAME = "proactor";
@@ -247,7 +253,7 @@ static inline pn_listener_t *as_listener(psocket_t* ps) {
   return ps->is_conn ? NULL: (pn_listener_t*)ps;
 }
 
-static pconnection_t *new_pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
+static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
   pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
   if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     return NULL;
@@ -288,33 +294,31 @@ static void leader_count(pn_proactor_t *p, int change) {
   uv_mutex_unlock(&p->lock);
 }
 
-/* Final close event for a a pconnection_t */
-static void on_close_pconnection_final(uv_handle_t *h) {
-  pconnection_t *pc = (pconnection_t*)h->data;
+static void pconnection_free(pconnection_t *pc) {
+  pn_connection_driver_destroy(&pc->driver);
   free(pc);
 }
 
-/* Close event for uv_tcp_t of a pconnection_t */
-static void on_close_pconnection(uv_handle_t *h) {
-  pconnection_t *pc = (pconnection_t*)h->data;
-  assert(pc->psocket.state == ON_UV);
-  leader_count(pc->psocket.proactor, -1);
-  pn_connection_driver_destroy(&pc->driver);
-  uv_timer_stop(&pc->timer);
-  /* Close the timer with the final event to free the pconnection_t */
-  uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
-}
+static void pn_listener_free(pn_listener_t *l);
 
-/* Close event for uv_tcp_t of a pn_listener_t */
-static void on_close_listener(uv_handle_t *h) {
-  pn_listener_t *l = (pn_listener_t*)h->data;
-  pn_condition_free(l->condition);
-  free(l);
+/* Final close event for for a pconnection_t, closes the timer */
+static void on_close_pconnection_final(uv_handle_t *h) {
+  pconnection_free((pconnection_t*)h->data);
 }
 
-static inline void leader_finished(psocket_t *ps) {
-  set_state(ps, ON_UV, NULL);
-  uv_close((uv_handle_t*)&ps->tcp, ps->is_conn ? on_close_pconnection : on_close_listener);
+/* Close event for uv_tcp_t of a psocket_t */
+static void on_close_psocket(uv_handle_t *h) {
+  /* No assert(ps->state == ON_UV); may be called in other states during shutdown. */
+  psocket_t *ps = (psocket_t*)h->data;
+  if (ps->is_conn) {
+    leader_count(ps->proactor, -1);
+    pconnection_t *pc = as_pconnection(ps);
+    uv_timer_stop(&pc->timer);
+    /* Delay the free till the timer handle is also closed */
+    uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
+  } else {
+    pn_listener_free(as_listener(ps));
+  }
 }
 
 static pconnection_t *get_pconnection(pn_connection_t* c) {
@@ -358,7 +362,7 @@ static int leader_init(psocket_t *ps) {
       pc->connect.data = ps;
       int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
       if (!err) {
-        pc->timer.data = pc;
+        pc->timer.data = ps;
       }
     }
   }
@@ -392,7 +396,7 @@ static void on_connection(uv_stream_t* server, int err) {
     ++l->connections;
     leader_unwatch(&l->psocket);
   } else {
-    leader_error(&l->psocket, err, "on incoming connection from");
+    leader_error(&l->psocket, err, "on connection from");
   }
 }
 
@@ -538,7 +542,7 @@ static void leader_watch(psocket_t *ps) {
   if (ps->is_conn) {
     pconnection_t *pc = as_pconnection(ps);
     if (pn_connection_driver_finished(&pc->driver)) {
-      leader_finished(ps);
+      uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
       return;
     }
     pn_millis_t next_tick = leader_tick(pc);
@@ -567,7 +571,8 @@ static void leader_watch(psocket_t *ps) {
   } else {
     pn_listener_t *l = as_listener(ps);
     if (l->closing && pn_collector_peek(l->collector)) {
-        leader_finished(&l->psocket);
+      uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
+      return;
     } else {
       if (l->accepting) {
         leader_accept(l);
@@ -777,7 +782,7 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
-  pconnection_t *pc = new_pconnection(p, c, false, host, port);
+  pconnection_t *pc = pconnection(p, c, false, host, port);
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
@@ -826,16 +831,20 @@ pn_proactor_t *pn_proactor() {
 }
 
 static void on_stopping(uv_handle_t* h, void* v) {
-  if (!uv_is_closing(h)) {
-    uv_close(h, NULL);           /* Close this handle */
+  /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
+  if (h->type == UV_TCP && !uv_is_closing(h)) {
+    uv_close(h, on_close_psocket);
   }
-  if (!uv_loop_alive(h->loop)) /* Everything closed */
-    uv_stop(h->loop);          /* Stop the loop, pn_proactor_destroy() can return */
 }
 
 void pn_proactor_free(pn_proactor_t *p) {
-  uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
-  uv_run(&p->loop, UV_RUN_DEFAULT);     /* Run till stop, all handles closed */
+  uv_timer_stop(&p->timer);
+  uv_close((uv_handle_t*)&p->timer, NULL);
+  uv_close((uv_handle_t*)&p->async, NULL);
+  uv_walk(&p->loop, on_stopping, NULL); /* Close all TCP handles */
+  while (uv_loop_alive(&p->loop)) {
+    uv_run(&p->loop, UV_RUN_ONCE);       /* Run till all handles closed */
+  }
   uv_loop_close(&p->loop);
   uv_mutex_destroy(&p->lock);
   uv_cond_destroy(&p->cond);
@@ -854,11 +863,11 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
 }
 
 static void pn_listener_free(pn_listener_t *l) {
-  assert(l->psocket.state == ON_WORKER);
+  /* No  assert(l->psocket.state == ON_WORKER);  can be called during shutdown */
   if (l) {
-    if (!l->collector) pn_collector_free(l->collector);
-    if (!l->condition) pn_condition_free(l->condition);
-    if (!l->attachments) pn_free(l->attachments);
+    if (l->collector) pn_collector_free(l->collector);
+    if (l->condition) pn_condition_free(l->condition);
+    if (l->attachments) pn_free(l->attachments);
     free(l);
   }
 }
@@ -917,7 +926,7 @@ int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   if (l->accepting) {
     return PN_STATE_ERR;        /* Only one at a time */
   }
-  l->accepting = new_pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+  l->accepting = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
   if (!l->accepting) {
     return UV_ENOMEM;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org