You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/03 22:14:04 UTC

[75/89] [abbrv] qpid-proton git commit: PROTON-1771: [c] add -close-connnect, -cancel-timeout to threaderciser

PROTON-1771: [c] add -close-connnect, -cancel-timeout to threaderciser

Also added -no-xxx flags to disable selected actions


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/94dfe1bf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/94dfe1bf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/94dfe1bf

Branch: refs/heads/go1
Commit: 94dfe1bf033f7d4b9183bbad75b1801d688a300d
Parents: 3d46b4f
Author: Alan Conway <ac...@redhat.com>
Authored: Tue May 8 14:30:46 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue May 8 14:32:30 2018 -0400

----------------------------------------------------------------------
 c/tests/threaderciser.c     | 110 +++++++++++++++++++++++++++------------
 c/tests/threaderciser.tsupp |   5 ++
 2 files changed, 83 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/94dfe1bf/c/tests/threaderciser.c
----------------------------------------------------------------------
diff --git a/c/tests/threaderciser.c b/c/tests/threaderciser.c
index 82f39ea..e74db2c 100644
--- a/c/tests/threaderciser.c
+++ b/c/tests/threaderciser.c
@@ -31,11 +31,17 @@
    unpredictable scheduling. Currently using plain old rand(), if quality of
    randomness is problem we can upgrade.
 
-   TODO
-   - closing connections
+   NOTE: to narrow down race conditions you have two tools
+   - specify a limited set of actions with command line flags, e.g.
+   $ threaderciser -no-close-connect # Do everything but close connections
+   $ threaderciser -timeout -cancel-timeout # Only do timeout actions
+   - use suppressions to hide races you know about but are not ready to fix
+   $ TSAN_OPTIONS="suppressions=/my/suppressions/file" threaderciser
+   $ valgrind --tool=helgrind --suppressions=/my/suppressions/file threaderciser
+
+   TODO:
    - pn_proactor_release_connection and re-use with pn_proactor_connect/accept
    - sending/receiving/tracking messages
-   - cancel timeout
 */
 
 #include "thread.h"
@@ -60,8 +66,8 @@
 #define SLEEP_MAX 100           /* Milliseconds */
 
 /* Set of actions that can be enabled/disabled/counted */
-typedef enum { A_LISTEN, A_LCLOSE, A_CONNECT, A_CCLOSE, A_WAKE, A_TIMEOUT } action;
-const char* action_name[] = { "listen", "lclose", "connect", "cclose", "wake", "timeout" };
+typedef enum { A_LISTEN, A_CLOSE_LISTEN, A_CONNECT, A_CLOSE_CONNECT, A_WAKE, A_TIMEOUT, A_CANCEL_TIMEOUT } action;
+const char* action_name[] = { "listen", "close-listen", "connect", "close-connect", "wake", "timeout", "cancel-timeout" };
 #define action_size (sizeof(action_name)/sizeof(*action_name))
 bool action_enabled[action_size] = { 0 } ;
 
@@ -90,7 +96,7 @@ static void debug_impl(const char *fmt, ...) {
     i += assert_no_err(vsnprintf(i, end-i, fmt, ap));
     va_end(ap);
   }
-  fputs(msg, stderr);
+  fprintf(stderr, "%s\n", msg);
 }
 
 /* Shorthand for debugging an action using id as identifier */
@@ -237,6 +243,20 @@ void cpool_wake(cpool *cp) {
   }
 }
 
+void cpool_close(cpool *cp) {
+  if (!action_enabled[A_CLOSE_CONNECT]) return;
+  connection_ctx *ctx = cpool_pick(cp);
+  if (ctx) {
+    pthread_mutex_lock(&ctx->lock);
+    if (ctx->pn_connection) {
+      pn_connection_close(ctx->pn_connection);
+      debuga(A_CLOSE_CONNECT, ctx->pn_connection);
+    }
+    pthread_mutex_unlock(&ctx->lock);
+    cpool_unref(ctx);
+  }
+}
+
 static void connection_ctx_on_close(connection_ctx *ctx) {
   /* Required locking: mark connection (possibly) closed no more wake calls */
   pthread_mutex_lock(&ctx->lock);
@@ -318,13 +338,13 @@ static void lpool_addr(lpool *lp, char* a, size_t s) {
 }
 
 void lpool_close(lpool *lp) {
-  if (!action_enabled[A_LCLOSE]) return;
+  if (!action_enabled[A_CLOSE_LISTEN]) return;
   listener_ctx *ctx = lpool_pick(lp);
   if (ctx) {
     pthread_mutex_lock(&ctx->lock);
     if (ctx->pn_listener) {
       pn_listener_close(ctx->pn_listener);
-      debuga(A_LCLOSE, ctx->pn_listener);
+      debuga(A_CLOSE_LISTEN, ctx->pn_listener);
     }
     pthread_mutex_unlock(&ctx->lock);
     lpool_unref(ctx);
@@ -391,13 +411,18 @@ static bool maybe(double probability) {
 static void global_do_stuff(global *g) {
   if (maybe(0.5)) global_connect(g);
   if (maybe(0.3)) lpool_listen(&g->listeners, g->proactor);
+  if (maybe(0.1)) lpool_close(&g->listeners);
   if (maybe(0.5)) cpool_wake(&g->connections_active);
   if (maybe(0.5)) cpool_wake(&g->connections_idle);
-  if (maybe(0.1)) lpool_close(&g->listeners);
+  if (maybe(0.1)) cpool_close(&g->connections_active);
   if (action_enabled[A_TIMEOUT] && maybe(0.5)) {
     debuga(A_TIMEOUT, g->proactor);
     pn_proactor_set_timeout(g->proactor, rand() % TIMEOUT_MAX);
   }
+  if (action_enabled[A_CANCEL_TIMEOUT] && maybe(0.1)) {
+    debuga(A_CANCEL_TIMEOUT, g->proactor);
+    pn_proactor_cancel_timeout(g->proactor);
+  }
 }
 
 static void* user_thread(void* void_g) {
@@ -470,66 +495,87 @@ static void* proactor_thread(void* void_g) {
 static const int default_runtime = 1;
 static const int default_threads = 8;
 
-void usage(const char **argv, const char **arg, const char **end) {
+void usage(const char **argv, const char **arg) {
   fprintf(stderr, "usage: %s [options]\n", argv[0]);
   fprintf(stderr, "  -time TIME: total run-time in seconds (default %d)\n", default_runtime);
   fprintf(stderr, "  -threads THREADS: total number of threads (default %d)\n", default_threads);
   fprintf(stderr, "  -debug: print debug messages\n");
+  fprintf(stderr, "Flags to enable specific actions (all enabled by default)\n");
   fprintf(stderr, " ");
   for (int i = 0; i < (int)action_size; ++i) fprintf(stderr, " -%s", action_name[i]);
-  fprintf(stderr, ": enable actions\n\n");
-
-  fprintf(stderr, "bad argument: ");
-  for (const char **a = argv+1; a < arg; ++a) fprintf(stderr, "%s ", *a);
-  fprintf(stderr, ">>> %s <<<", *arg++);
-  for (; arg < end; ++arg) fprintf(stderr, " %s", *arg);
+  fprintf(stderr, "\n");
+  fprintf(stderr, "Flags to disable specific actions (all enabled by default)\n");
+  for (int i = 0; i < (int)action_size; ++i) fprintf(stderr, " -no-%s", action_name[i]);
+  fprintf(stderr, "\n\n");
+  fprintf(stderr, "bad argument: %s\n", *arg);
   exit(1);
 }
 
+size_t find_action(const char *name, const char **argv, const char **arg) {
+  for (size_t i = 0; i < action_size; ++i) {
+    if (!strcmp(name, action_name[i])) return i;
+  }
+  usage(argv, arg);
+  return 0;                     /* Can't get here. */
+}
+
 int main(int argc, const char* argv[]) {
   const char **arg = argv + 1;
   const char **end = argv + argc;
   int runtime = default_runtime;
   int threads = default_threads;
+  bool action_default = true;
+  for (size_t i = 0; i < action_size; ++i) action_enabled[i] = action_default;
 
   while (arg < end) {
     if (!strcmp(*arg, "-time") && ++arg < end) {
       runtime = atoi(*arg);
-      if (runtime <= 0) usage(argv, arg, end);
+      if (runtime <= 0) usage(argv, arg);
     }
     else if (!strcmp(*arg, "-threads") && ++arg < end) {
       threads = atoi(*arg);
-      if (threads <= 0) usage(argv, arg, end);
+      if (threads <= 0) usage(argv, arg);
       if (threads % 2) threads += 1; /* Round up to even: half proactor, half user */
     }
     else if (!strcmp(*arg, "-debug")) {
       debug_enable = true;
     }
-    else if (**arg == '-') {
-      size_t i = 0;
-      while (i < action_size && strcmp((*arg)+1, action_name[i])) ++i;
-      if (i == action_size) usage(argv, arg, end);
-      action_enabled[i] = true;
-    } else {
-      break;
+    else if (!strncmp(*arg, "-no-", 4)) {
+      action_enabled[find_action((*arg) + 4, argv, arg)] = false;
+    }
+    else if (!strncmp(*arg, "-", 1)) {
+      if (action_default) {   /* First enable-action flag, switch to default-off */
+        action_default = false;
+        for (size_t i = 0; i < action_size; ++i) action_enabled[i] = action_default;
+      }
+      action_enabled[find_action((*arg) + 1, argv, arg)] = true;
+    }
+    else {
+      usage(argv, arg);
     }
     ++arg;
   }
-  int i = 0;
-  /* If no actions are requested on command line, enable them all */
-  while (i < (int)action_size && !action_enabled[i]) ++i;
-  if (i == action_size) {
-    for (i = 0; i < (int)action_size; ++i) action_enabled[i] = true;
-  }
 
   /* Set up global state, start threads */
-  debug("threaderciser start threads=%d, time=%d\n", threads, runtime);
+
+  printf("threaderciser start: threads=%d, time=%d, actions=[", threads, runtime);
+  bool comma = false;
+  for (size_t i = 0; i < action_size; ++i) {
+    if (action_enabled[i]) {
+      printf("%s%s", (comma ? ", " : ""), action_name[i]);
+      comma = true;
+    }
+  }
+  printf("]\n");
+  fflush(stdout);
+
   global g;
   global_init(&g, threads);
   lpool_listen(&g.listeners, g.proactor); /* Start initial listener */
 
   pthread_t *user_threads = (pthread_t*)calloc(threads/2, sizeof(pthread_t));
   pthread_t *proactor_threads = (pthread_t*)calloc(threads/2, sizeof(pthread_t));
+  int i;
   for (i = 0; i < threads/2; ++i) {
     pthread_create(&user_threads[i], NULL, user_thread, &g);
     pthread_create(&proactor_threads[i], NULL, proactor_thread, &g);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/94dfe1bf/c/tests/threaderciser.tsupp
----------------------------------------------------------------------
diff --git a/c/tests/threaderciser.tsupp b/c/tests/threaderciser.tsupp
new file mode 100644
index 0000000..18ceb23
--- /dev/null
+++ b/c/tests/threaderciser.tsupp
@@ -0,0 +1,5 @@
+# TSAN suppressions for threaderciser
+
+# Benign race in pni_log_enabled
+race:pni_log_enabled
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org