You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:51:21 UTC
[14/38] qpid-proton git commit: PROTON-1403: c proactor memory leaks
PROTON-1403: c proactor memory leaks
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/85687373
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/85687373
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/85687373
Branch: refs/heads/go1
Commit: 85687373d8d5ec84cc9ac81ba4af2e1a81d9e294
Parents: a21a03e
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Feb 14 12:06:26 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Feb 14 12:06:26 2017 -0500
----------------------------------------------------------------------
proton-c/src/proactor/libuv.c | 85 +++++++++++++++++++++-----------------
1 file changed, 47 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85687373/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 29463d5..173f767 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -65,6 +65,12 @@
- leader_* - called in leader thread (either leader_q processing or from an on_ function)
- worker_* - called in worker thread
- *_lh - called with the relevant lock held
+
+ LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their
+ UV handles have received an on_close(). Freeing resources is always initiated by
+ uv_close() of the uv_tcp_t handle, and completed in on_close() handler functions when it
+ is safe. The only exception is when an error occurs that prevents a pn_connection_t or
+ pn_listener_t from being associated with a uv handle at all.
*/
const char *COND_NAME = "proactor";
@@ -247,7 +253,7 @@ static inline pn_listener_t *as_listener(psocket_t* ps) {
return ps->is_conn ? NULL: (pn_listener_t*)ps;
}
-static pconnection_t *new_pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
+static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
return NULL;
@@ -288,33 +294,31 @@ static void leader_count(pn_proactor_t *p, int change) {
uv_mutex_unlock(&p->lock);
}
-/* Final close event for a a pconnection_t */
-static void on_close_pconnection_final(uv_handle_t *h) {
- pconnection_t *pc = (pconnection_t*)h->data;
+static void pconnection_free(pconnection_t *pc) {
+ pn_connection_driver_destroy(&pc->driver);
free(pc);
}
-/* Close event for uv_tcp_t of a pconnection_t */
-static void on_close_pconnection(uv_handle_t *h) {
- pconnection_t *pc = (pconnection_t*)h->data;
- assert(pc->psocket.state == ON_UV);
- leader_count(pc->psocket.proactor, -1);
- pn_connection_driver_destroy(&pc->driver);
- uv_timer_stop(&pc->timer);
- /* Close the timer with the final event to free the pconnection_t */
- uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
-}
+static void pn_listener_free(pn_listener_t *l);
-/* Close event for uv_tcp_t of a pn_listener_t */
-static void on_close_listener(uv_handle_t *h) {
- pn_listener_t *l = (pn_listener_t*)h->data;
- pn_condition_free(l->condition);
- free(l);
+/* Final close event for for a pconnection_t, closes the timer */
+static void on_close_pconnection_final(uv_handle_t *h) {
+ pconnection_free((pconnection_t*)h->data);
}
-static inline void leader_finished(psocket_t *ps) {
- set_state(ps, ON_UV, NULL);
- uv_close((uv_handle_t*)&ps->tcp, ps->is_conn ? on_close_pconnection : on_close_listener);
+/* Close event for uv_tcp_t of a psocket_t */
+static void on_close_psocket(uv_handle_t *h) {
+ /* No assert(ps->state == ON_UV); may be called in other states during shutdown. */
+ psocket_t *ps = (psocket_t*)h->data;
+ if (ps->is_conn) {
+ leader_count(ps->proactor, -1);
+ pconnection_t *pc = as_pconnection(ps);
+ uv_timer_stop(&pc->timer);
+ /* Delay the free till the timer handle is also closed */
+ uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
+ } else {
+ pn_listener_free(as_listener(ps));
+ }
}
static pconnection_t *get_pconnection(pn_connection_t* c) {
@@ -358,7 +362,7 @@ static int leader_init(psocket_t *ps) {
pc->connect.data = ps;
int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
if (!err) {
- pc->timer.data = pc;
+ pc->timer.data = ps;
}
}
}
@@ -392,7 +396,7 @@ static void on_connection(uv_stream_t* server, int err) {
++l->connections;
leader_unwatch(&l->psocket);
} else {
- leader_error(&l->psocket, err, "on incoming connection from");
+ leader_error(&l->psocket, err, "on connection from");
}
}
@@ -538,7 +542,7 @@ static void leader_watch(psocket_t *ps) {
if (ps->is_conn) {
pconnection_t *pc = as_pconnection(ps);
if (pn_connection_driver_finished(&pc->driver)) {
- leader_finished(ps);
+ uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
return;
}
pn_millis_t next_tick = leader_tick(pc);
@@ -567,7 +571,8 @@ static void leader_watch(psocket_t *ps) {
} else {
pn_listener_t *l = as_listener(ps);
if (l->closing && pn_collector_peek(l->collector)) {
- leader_finished(&l->psocket);
+ uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
+ return;
} else {
if (l->accepting) {
leader_accept(l);
@@ -777,7 +782,7 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
}
int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
- pconnection_t *pc = new_pconnection(p, c, false, host, port);
+ pconnection_t *pc = pconnection(p, c, false, host, port);
if (!pc) {
return PN_OUT_OF_MEMORY;
}
@@ -826,16 +831,20 @@ pn_proactor_t *pn_proactor() {
}
static void on_stopping(uv_handle_t* h, void* v) {
- if (!uv_is_closing(h)) {
- uv_close(h, NULL); /* Close this handle */
+ /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
+ if (h->type == UV_TCP && !uv_is_closing(h)) {
+ uv_close(h, on_close_psocket);
}
- if (!uv_loop_alive(h->loop)) /* Everything closed */
- uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */
}
void pn_proactor_free(pn_proactor_t *p) {
- uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
- uv_run(&p->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */
+ uv_timer_stop(&p->timer);
+ uv_close((uv_handle_t*)&p->timer, NULL);
+ uv_close((uv_handle_t*)&p->async, NULL);
+ uv_walk(&p->loop, on_stopping, NULL); /* Close all TCP handles */
+ while (uv_loop_alive(&p->loop)) {
+ uv_run(&p->loop, UV_RUN_ONCE); /* Run till all handles closed */
+ }
uv_loop_close(&p->loop);
uv_mutex_destroy(&p->lock);
uv_cond_destroy(&p->cond);
@@ -854,11 +863,11 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
}
static void pn_listener_free(pn_listener_t *l) {
- assert(l->psocket.state == ON_WORKER);
+ /* No assert(l->psocket.state == ON_WORKER); can be called during shutdown */
if (l) {
- if (!l->collector) pn_collector_free(l->collector);
- if (!l->condition) pn_condition_free(l->condition);
- if (!l->attachments) pn_free(l->attachments);
+ if (l->collector) pn_collector_free(l->collector);
+ if (l->condition) pn_condition_free(l->condition);
+ if (l->attachments) pn_free(l->attachments);
free(l);
}
}
@@ -917,7 +926,7 @@ int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
if (l->accepting) {
return PN_STATE_ERR; /* Only one at a time */
}
- l->accepting = new_pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+ l->accepting = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
if (!l->accepting) {
return UV_ENOMEM;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org