You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/03 22:14:04 UTC
[75/89] [abbrv] qpid-proton git commit: PROTON-1771: [c] add
-close-connnect, -cancel-timeout to threaderciser
PROTON-1771: [c] add -close-connnect, -cancel-timeout to threaderciser
Also added -no-xxx flags to disable selected actions
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/94dfe1bf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/94dfe1bf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/94dfe1bf
Branch: refs/heads/go1
Commit: 94dfe1bf033f7d4b9183bbad75b1801d688a300d
Parents: 3d46b4f
Author: Alan Conway <ac...@redhat.com>
Authored: Tue May 8 14:30:46 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue May 8 14:32:30 2018 -0400
----------------------------------------------------------------------
c/tests/threaderciser.c | 110 +++++++++++++++++++++++++++------------
c/tests/threaderciser.tsupp | 5 ++
2 files changed, 83 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/94dfe1bf/c/tests/threaderciser.c
----------------------------------------------------------------------
diff --git a/c/tests/threaderciser.c b/c/tests/threaderciser.c
index 82f39ea..e74db2c 100644
--- a/c/tests/threaderciser.c
+++ b/c/tests/threaderciser.c
@@ -31,11 +31,17 @@
unpredictable scheduling. Currently using plain old rand(), if quality of
randomness is problem we can upgrade.
- TODO
- - closing connections
+ NOTE: to narrow down race conditions you have two tools
+ - specify a limited set of actions with command line flags, e.g.
+ $ threaderciser -no-close-connect # Do everything but close connections
+ $ threaderciser -timeout -cancel-timeout # Only do timeout actions
+ - use suppressions to hide races you know about but are not ready to fix
+ $ TSAN_OPTIONS="suppressions=/my/suppressions/file" threaderciser
+ $ valgrind --tool=helgrind --suppressions=/my/suppressions/file threaderciser
+
+ TODO:
- pn_proactor_release_connection and re-use with pn_proactor_connect/accept
- sending/receiving/tracking messages
- - cancel timeout
*/
#include "thread.h"
@@ -60,8 +66,8 @@
#define SLEEP_MAX 100 /* Milliseconds */
/* Set of actions that can be enabled/disabled/counted */
-typedef enum { A_LISTEN, A_LCLOSE, A_CONNECT, A_CCLOSE, A_WAKE, A_TIMEOUT } action;
-const char* action_name[] = { "listen", "lclose", "connect", "cclose", "wake", "timeout" };
+typedef enum { A_LISTEN, A_CLOSE_LISTEN, A_CONNECT, A_CLOSE_CONNECT, A_WAKE, A_TIMEOUT, A_CANCEL_TIMEOUT } action;
+const char* action_name[] = { "listen", "close-listen", "connect", "close-connect", "wake", "timeout", "cancel-timeout" };
#define action_size (sizeof(action_name)/sizeof(*action_name))
bool action_enabled[action_size] = { 0 } ;
@@ -90,7 +96,7 @@ static void debug_impl(const char *fmt, ...) {
i += assert_no_err(vsnprintf(i, end-i, fmt, ap));
va_end(ap);
}
- fputs(msg, stderr);
+ fprintf(stderr, "%s\n", msg);
}
/* Shorthand for debugging an action using id as identifier */
@@ -237,6 +243,20 @@ void cpool_wake(cpool *cp) {
}
}
+void cpool_close(cpool *cp) {
+ if (!action_enabled[A_CLOSE_CONNECT]) return;
+ connection_ctx *ctx = cpool_pick(cp);
+ if (ctx) {
+ pthread_mutex_lock(&ctx->lock);
+ if (ctx->pn_connection) {
+ pn_connection_close(ctx->pn_connection);
+ debuga(A_CLOSE_CONNECT, ctx->pn_connection);
+ }
+ pthread_mutex_unlock(&ctx->lock);
+ cpool_unref(ctx);
+ }
+}
+
static void connection_ctx_on_close(connection_ctx *ctx) {
/* Required locking: mark connection (possibly) closed no more wake calls */
pthread_mutex_lock(&ctx->lock);
@@ -318,13 +338,13 @@ static void lpool_addr(lpool *lp, char* a, size_t s) {
}
void lpool_close(lpool *lp) {
- if (!action_enabled[A_LCLOSE]) return;
+ if (!action_enabled[A_CLOSE_LISTEN]) return;
listener_ctx *ctx = lpool_pick(lp);
if (ctx) {
pthread_mutex_lock(&ctx->lock);
if (ctx->pn_listener) {
pn_listener_close(ctx->pn_listener);
- debuga(A_LCLOSE, ctx->pn_listener);
+ debuga(A_CLOSE_LISTEN, ctx->pn_listener);
}
pthread_mutex_unlock(&ctx->lock);
lpool_unref(ctx);
@@ -391,13 +411,18 @@ static bool maybe(double probability) {
static void global_do_stuff(global *g) {
if (maybe(0.5)) global_connect(g);
if (maybe(0.3)) lpool_listen(&g->listeners, g->proactor);
+ if (maybe(0.1)) lpool_close(&g->listeners);
if (maybe(0.5)) cpool_wake(&g->connections_active);
if (maybe(0.5)) cpool_wake(&g->connections_idle);
- if (maybe(0.1)) lpool_close(&g->listeners);
+ if (maybe(0.1)) cpool_close(&g->connections_active);
if (action_enabled[A_TIMEOUT] && maybe(0.5)) {
debuga(A_TIMEOUT, g->proactor);
pn_proactor_set_timeout(g->proactor, rand() % TIMEOUT_MAX);
}
+ if (action_enabled[A_CANCEL_TIMEOUT] && maybe(0.1)) {
+ debuga(A_CANCEL_TIMEOUT, g->proactor);
+ pn_proactor_cancel_timeout(g->proactor);
+ }
}
static void* user_thread(void* void_g) {
@@ -470,66 +495,87 @@ static void* proactor_thread(void* void_g) {
static const int default_runtime = 1;
static const int default_threads = 8;
-void usage(const char **argv, const char **arg, const char **end) {
+void usage(const char **argv, const char **arg) {
fprintf(stderr, "usage: %s [options]\n", argv[0]);
fprintf(stderr, " -time TIME: total run-time in seconds (default %d)\n", default_runtime);
fprintf(stderr, " -threads THREADS: total number of threads (default %d)\n", default_threads);
fprintf(stderr, " -debug: print debug messages\n");
+ fprintf(stderr, "Flags to enable specific actions (all enabled by default)\n");
fprintf(stderr, " ");
for (int i = 0; i < (int)action_size; ++i) fprintf(stderr, " -%s", action_name[i]);
- fprintf(stderr, ": enable actions\n\n");
-
- fprintf(stderr, "bad argument: ");
- for (const char **a = argv+1; a < arg; ++a) fprintf(stderr, "%s ", *a);
- fprintf(stderr, ">>> %s <<<", *arg++);
- for (; arg < end; ++arg) fprintf(stderr, " %s", *arg);
+ fprintf(stderr, "\n");
+ fprintf(stderr, "Flags to disable specific actions (all enabled by default)\n");
+ for (int i = 0; i < (int)action_size; ++i) fprintf(stderr, " -no-%s", action_name[i]);
+ fprintf(stderr, "\n\n");
+ fprintf(stderr, "bad argument: %s\n", *arg);
exit(1);
}
+size_t find_action(const char *name, const char **argv, const char **arg) {
+ for (size_t i = 0; i < action_size; ++i) {
+ if (!strcmp(name, action_name[i])) return i;
+ }
+ usage(argv, arg);
+ return 0; /* Can't get here. */
+}
+
int main(int argc, const char* argv[]) {
const char **arg = argv + 1;
const char **end = argv + argc;
int runtime = default_runtime;
int threads = default_threads;
+ bool action_default = true;
+ for (size_t i = 0; i < action_size; ++i) action_enabled[i] = action_default;
while (arg < end) {
if (!strcmp(*arg, "-time") && ++arg < end) {
runtime = atoi(*arg);
- if (runtime <= 0) usage(argv, arg, end);
+ if (runtime <= 0) usage(argv, arg);
}
else if (!strcmp(*arg, "-threads") && ++arg < end) {
threads = atoi(*arg);
- if (threads <= 0) usage(argv, arg, end);
+ if (threads <= 0) usage(argv, arg);
if (threads % 2) threads += 1; /* Round up to even: half proactor, half user */
}
else if (!strcmp(*arg, "-debug")) {
debug_enable = true;
}
- else if (**arg == '-') {
- size_t i = 0;
- while (i < action_size && strcmp((*arg)+1, action_name[i])) ++i;
- if (i == action_size) usage(argv, arg, end);
- action_enabled[i] = true;
- } else {
- break;
+ else if (!strncmp(*arg, "-no-", 4)) {
+ action_enabled[find_action((*arg) + 4, argv, arg)] = false;
+ }
+ else if (!strncmp(*arg, "-", 1)) {
+ if (action_default) { /* First enable-action flag, switch to default-off */
+ action_default = false;
+ for (size_t i = 0; i < action_size; ++i) action_enabled[i] = action_default;
+ }
+ action_enabled[find_action((*arg) + 1, argv, arg)] = true;
+ }
+ else {
+ usage(argv, arg);
}
++arg;
}
- int i = 0;
- /* If no actions are requested on command line, enable them all */
- while (i < (int)action_size && !action_enabled[i]) ++i;
- if (i == action_size) {
- for (i = 0; i < (int)action_size; ++i) action_enabled[i] = true;
- }
/* Set up global state, start threads */
- debug("threaderciser start threads=%d, time=%d\n", threads, runtime);
+
+ printf("threaderciser start: threads=%d, time=%d, actions=[", threads, runtime);
+ bool comma = false;
+ for (size_t i = 0; i < action_size; ++i) {
+ if (action_enabled[i]) {
+ printf("%s%s", (comma ? ", " : ""), action_name[i]);
+ comma = true;
+ }
+ }
+ printf("]\n");
+ fflush(stdout);
+
global g;
global_init(&g, threads);
lpool_listen(&g.listeners, g.proactor); /* Start initial listener */
pthread_t *user_threads = (pthread_t*)calloc(threads/2, sizeof(pthread_t));
pthread_t *proactor_threads = (pthread_t*)calloc(threads/2, sizeof(pthread_t));
+ int i;
for (i = 0; i < threads/2; ++i) {
pthread_create(&user_threads[i], NULL, user_thread, &g);
pthread_create(&proactor_threads[i], NULL, proactor_thread, &g);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/94dfe1bf/c/tests/threaderciser.tsupp
----------------------------------------------------------------------
diff --git a/c/tests/threaderciser.tsupp b/c/tests/threaderciser.tsupp
new file mode 100644
index 0000000..18ceb23
--- /dev/null
+++ b/c/tests/threaderciser.tsupp
@@ -0,0 +1,5 @@
+# TSAN suppressions for threaderciser
+
+# Benign race in pni_log_enabled
+race:pni_log_enabled
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org