You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/05/16 19:18:51 UTC
qpid-proton git commit: PROTON-1771: [c] fix race conditions in
threaderciser.c
Repository: qpid-proton
Updated Branches:
refs/heads/master 50a4e8353 -> 3206378b0
PROTON-1771: [c] fix race conditions in threaderciser.c
Was incorrectly calling pn_connection_close() from non-handler threads.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3206378b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3206378b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3206378b
Branch: refs/heads/master
Commit: 3206378b0149c903f0507fc6429ae899e38f2368
Parents: 50a4e83
Author: Alan Conway <ac...@redhat.com>
Authored: Wed May 16 15:15:49 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed May 16 15:17:40 2018 -0400
----------------------------------------------------------------------
c/tests/threaderciser.c | 82 +++++++++++++++++++-------------------------
1 file changed, 35 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3206378b/c/tests/threaderciser.c
----------------------------------------------------------------------
diff --git a/c/tests/threaderciser.c b/c/tests/threaderciser.c
index e74db2c..552ccb0 100644
--- a/c/tests/threaderciser.c
+++ b/c/tests/threaderciser.c
@@ -243,28 +243,6 @@ void cpool_wake(cpool *cp) {
}
}
-void cpool_close(cpool *cp) {
- if (!action_enabled[A_CLOSE_CONNECT]) return;
- connection_ctx *ctx = cpool_pick(cp);
- if (ctx) {
- pthread_mutex_lock(&ctx->lock);
- if (ctx->pn_connection) {
- pn_connection_close(ctx->pn_connection);
- debuga(A_CLOSE_CONNECT, ctx->pn_connection);
- }
- pthread_mutex_unlock(&ctx->lock);
- cpool_unref(ctx);
- }
-}
-
-static void connection_ctx_on_close(connection_ctx *ctx) {
- /* Required locking: mark connection (possibly) closed no more wake calls */
- pthread_mutex_lock(&ctx->lock);
- ctx->pn_connection = NULL;
- pthread_mutex_unlock(&ctx->lock);
- cpool_unref(ctx);
-}
-
/* Listener pool */
typedef struct listener_ctx {
@@ -306,23 +284,6 @@ static void lpool_listen(lpool *lp, pn_proactor_t *proactor) {
}
}
-/* Advertise address once open */
-static void listener_ctx_on_open(listener_ctx *ctx) {
- pthread_mutex_lock(&ctx->lock);
- if (ctx->pn_listener) {
- pn_netaddr_str(pn_listener_addr(ctx->pn_listener), ctx->addr, sizeof(ctx->addr));
- }
- debug("[%p] listening on %s", ctx->pn_listener, ctx->addr);
- pthread_mutex_unlock(&ctx->lock);
-}
-
-static void listener_ctx_on_close(listener_ctx *ctx) {
- pthread_mutex_lock(&ctx->lock);
- ctx->pn_listener = NULL;
- pthread_mutex_unlock(&ctx->lock);
- lpool_unref(ctx);
-}
-
/* Pick a random listening address from the listener pool.
Returns "invalid:address" for no address.
*/
@@ -414,7 +375,6 @@ static void global_do_stuff(global *g) {
if (maybe(0.1)) lpool_close(&g->listeners);
if (maybe(0.5)) cpool_wake(&g->connections_active);
if (maybe(0.5)) cpool_wake(&g->connections_idle);
- if (maybe(0.1)) cpool_close(&g->connections_active);
if (action_enabled[A_TIMEOUT] && maybe(0.5)) {
debuga(A_TIMEOUT, g->proactor);
pn_proactor_set_timeout(g->proactor, rand() % TIMEOUT_MAX);
@@ -437,6 +397,11 @@ static void* user_thread(void* void_g) {
}
static bool handle(global *g, pn_event_t *e) {
+ pn_connection_t *c = pn_event_connection(e);
+ connection_ctx *cctx = c ? (connection_ctx*)pn_connection_get_context(c) : NULL;
+ pn_listener_t *l = pn_event_listener(e);
+ listener_ctx *lctx = l ? (listener_ctx*)pn_listener_get_context(l) : NULL;
+
switch (pn_event_type(e)) {
case PN_PROACTOR_TIMEOUT: {
@@ -444,24 +409,47 @@ static bool handle(global *g, pn_event_t *e) {
global_do_stuff(g);
break;
}
+
case PN_LISTENER_OPEN: {
- listener_ctx *ctx = (listener_ctx*)pn_listener_get_context(pn_event_listener(e));
- listener_ctx_on_open(ctx);
- cpool_connect(&g->connections_active, g->proactor, ctx->addr); /* Initial connection */
+ pthread_mutex_lock(&lctx->lock);
+ if (lctx->pn_listener) {
+ pn_netaddr_str(pn_listener_addr(lctx->pn_listener), lctx->addr, sizeof(lctx->addr));
+ }
+ debug("[%p] listening on %s", lctx->pn_listener, lctx->addr);
+ pthread_mutex_unlock(&lctx->lock);
+ cpool_connect(&g->connections_active, g->proactor, lctx->addr); /* Initial connection */
break;
}
case PN_LISTENER_CLOSE: {
- listener_ctx_on_close((listener_ctx*)pn_listener_get_context(pn_event_listener(e)));
+ pthread_mutex_lock(&lctx->lock);
+ lctx->pn_listener = NULL;
+ pthread_mutex_unlock(&lctx->lock);
+ lpool_unref(lctx);
+ break;
+ }
+
+ case PN_CONNECTION_WAKE: {
+ if (!action_enabled[A_CLOSE_CONNECT] && maybe(0.5)) pn_connection_close(c);
+ /* FIXME aconway 2018-05-16: connection release/re-use */
break;
}
+
case PN_TRANSPORT_CLOSED: {
- connection_ctx_on_close((connection_ctx*)pn_connection_get_context(pn_event_connection(e)));
+ if (cctx) {
+ /* Required locking: mark connection as closed, no more wake calls */
+ pthread_mutex_lock(&cctx->lock);
+ cctx->pn_connection = NULL;
+ pthread_mutex_unlock(&cctx->lock);
+ cpool_unref(cctx);
+ }
break;
}
- case PN_PROACTOR_INACTIVE: /* Shutting down */
+ // Following events are used to shut down the threaderciser
+
+ case PN_PROACTOR_INACTIVE: { /* Shutting down */
pn_proactor_interrupt(g->proactor); /* Interrupt remaining threads */
return false;
-
+ }
case PN_PROACTOR_INTERRUPT:
pn_proactor_interrupt(g->proactor); /* Pass the interrupt along */
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org