You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/04/01 13:11:59 UTC

[cassandra] branch trunk updated: Fix flaky test o.a.c.d.test.*RepairCoordinatorFastTest

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dfc279a  Fix flaky test o.a.c.d.test.*RepairCoordinatorFastTest
dfc279a is described below

commit dfc279a22a5563ac7a832a586914d5410426e9b7
Author: David Capwell <dc...@gmail.com>
AuthorDate: Sat Mar 21 20:57:50 2020 -0700

    Fix flaky test o.a.c.d.test.*RepairCoordinatorFastTest
    
    patch by David Capwell; reviewed by Ekaterina Dimitrova and Benjamin Lerer
    for CASSANDRA-15650
---
 .../cassandra/concurrent/NamedThreadFactory.java   |   5 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   2 +-
 .../org/apache/cassandra/tools/RepairRunner.java   |  40 +-
 .../org/apache/cassandra/utils/Throwables.java     |  10 +
 .../cassandra/distributed/api/NodeToolResult.java  |  48 +-
 .../distributed/impl/IsolatedExecutor.java         |   8 +-
 ...=> FullRepairCoordinatorNeighbourDownTest.java} |   4 +-
 .../test/FullRepairCoordinatorTimeoutTest.java     |  16 +
 ...ementalRepairCoordinatorNeighbourDownTest.java} |   4 +-
 .../IncrementalRepairCoordinatorTimeoutTest.java   |  16 +
 ...PreviewRepairCoordinatorNeighbourDownTest.java} |   4 +-
 .../test/PreviewRepairCoordinatorTimeoutTest.java  |  16 +
 .../distributed/test/RepairCoordinatorFast.java    | 482 +++++++++++----------
 ...ow.java => RepairCoordinatorNeighbourDown.java} | 180 ++++----
 .../distributed/test/RepairCoordinatorTimeout.java |  67 +++
 .../org/apache/cassandra/utils/AssertUtil.java     | 128 ++++++
 16 files changed, 665 insertions(+), 365 deletions(-)

diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 7cc73bd..bcf686f 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -36,7 +36,10 @@ public class NamedThreadFactory implements ThreadFactory
 {
     private static volatile String globalPrefix;
     public static void setGlobalPrefix(String prefix) { globalPrefix = prefix; }
-    public static String globalPrefix() { return globalPrefix == null ? "" : globalPrefix; }
+    public static String globalPrefix() {
+        String prefix = globalPrefix;
+        return prefix == null ? "" : prefix;
+    }
 
     public final String id;
     private final int priority;
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 180b231..f9f2c1a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -415,7 +415,7 @@ public class NodeProbe implements AutoCloseable
         }
         catch (Exception e)
         {
-            throw new IOException(e) ;
+            throw new IOException(e);
         }
         finally
         {
diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java b/src/java/org/apache/cassandra/tools/RepairRunner.java
index d1b3409..593bc26 100644
--- a/src/java/org/apache/cassandra/tools/RepairRunner.java
+++ b/src/java/org/apache/cassandra/tools/RepairRunner.java
@@ -25,6 +25,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -40,7 +43,7 @@ public class RepairRunner extends JMXNotificationProgressListener
     private final StorageServiceMBean ssProxy;
     private final String keyspace;
     private final Map<String, String> options;
-    private final Condition condition = new SimpleCondition();
+    private final SimpleCondition condition = new SimpleCondition();
 
     private int cmd;
     private volatile Exception error;
@@ -59,8 +62,8 @@ public class RepairRunner extends JMXNotificationProgressListener
         if (cmd <= 0)
         {
             // repairAsync can only return 0 for replication factor 1.
-            String message = String.format("[%s] Replication factor is 1. No repair is needed for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
-            out.println(message);
+            String message = String.format("Replication factor is 1. No repair is needed for keyspace '%s'", keyspace);
+            printMessage(message);
         }
         else
         {
@@ -69,6 +72,14 @@ public class RepairRunner extends JMXNotificationProgressListener
                 queryForCompletedRepair(String.format("After waiting for poll interval of %s seconds",
                                                       NodeProbe.JMX_NOTIFICATION_POLL_INTERVAL_SECONDS));
             }
+            Exception error = this.error;
+            if (error == null)
+            {
+                // notifications are lossy so its possible to see complete and not error; request latest state
+                // from the server
+                queryForCompletedRepair("condition satisfied");
+                error = this.error;
+            }
             if (error != null)
             {
                 throw error;
@@ -111,12 +122,12 @@ public class RepairRunner extends JMXNotificationProgressListener
     public void progress(String tag, ProgressEvent event)
     {
         ProgressEventType type = event.getType();
-        String message = String.format("[%s] %s", format.format(System.currentTimeMillis()), event.getMessage());
+        String message = event.getMessage();
         if (type == ProgressEventType.PROGRESS)
         {
             message = message + " (progress: " + (int) event.getProgressPercentage() + "%)";
         }
-        out.println(message);
+        printMessage(message);
         if (type == ProgressEventType.ERROR)
         {
             error = new RuntimeException(String.format("Repair job has failed with the error message: %s. " +
@@ -136,9 +147,9 @@ public class RepairRunner extends JMXNotificationProgressListener
         String queriedString = "queried for parent session status and";
         if (status == null)
         {
-            String message = String.format("[%s] %s %s couldn't find repair status for cmd: %s", triggeringCondition,
-                                           queriedString, format.format(System.currentTimeMillis()), cmd);
-            out.println(message);
+            String message = String.format("%s %s couldn't find repair status for cmd: %s", triggeringCondition,
+                                           queriedString, cmd);
+            printMessage(message);
         }
         else
         {
@@ -148,8 +159,8 @@ public class RepairRunner extends JMXNotificationProgressListener
             {
                 case COMPLETED:
                 case FAILED:
-                    out.println(String.format("[%s] %s %s discovered repair %s.",
-                                              this.format.format(System.currentTimeMillis()), triggeringCondition,
+                    printMessage(String.format("%s %s discovered repair %s.",
+                                              triggeringCondition,
                                               queriedString, parentRepairStatus.name().toLowerCase()));
                     if (parentRepairStatus == ActiveRepairService.ParentRepairStatus.FAILED)
                     {
@@ -161,7 +172,7 @@ public class RepairRunner extends JMXNotificationProgressListener
                 case IN_PROGRESS:
                     break;
                 default:
-                    out.println(String.format("[%s] WARNING Encountered unexpected RepairRunnable.ParentRepairStatus: %s", System.currentTimeMillis(), parentRepairStatus));
+                    printMessage(String.format("WARNING Encountered unexpected RepairRunnable.ParentRepairStatus: %s", parentRepairStatus));
                     printMessages(messages);
                     break;
             }
@@ -172,7 +183,12 @@ public class RepairRunner extends JMXNotificationProgressListener
     {
         for (String message : messages)
         {
-            out.println(String.format("[%s] %s", this.format.format(System.currentTimeMillis()), message));
+            printMessage(message);
         }
     }
+
+    private void printMessage(String message)
+    {
+        out.println(String.format("[%s] %s", this.format.format(System.currentTimeMillis()), message));
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 9c6da60..f727b5a 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -231,6 +231,16 @@ public final class Throwables
     }
 
     /**
+     * throw the exception as a unchecked exception, wrapping if a checked exception, else rethroing as is.
+     */
+    public static RuntimeException throwAsUncheckedException(Throwable t)
+    {
+        if (t instanceof Error)
+            throw (Error) t;
+        throw unchecked(t);
+    }
+
+    /**
      * A shortcut for {@code unchecked(unwrapped(t))}. This is called "cleaned" because this basically removes the annoying
      * cruft surrounding an exception :).
      */
diff --git a/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
index 9ba1127..8f33ae5 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -19,11 +19,14 @@
 package org.apache.cassandra.distributed.api;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.management.Notification;
 
+import com.google.common.base.Throwables;
 import org.junit.Assert;
 
 public class NodeToolResult
@@ -68,18 +71,22 @@ public class NodeToolResult
 
     public final class Asserts {
         public Asserts success() {
-            Assert.assertEquals("nodetool command " + commandAndArgs[0] + " was not successful", 0, rc);
+            if (rc != 0)
+                fail("was not successful");
             return this;
         }
 
         public Asserts failure() {
-            Assert.assertNotEquals("nodetool command " + commandAndArgs[0] + " was successful but not expected to be", 0, rc);
+            if (rc == 0)
+                fail("was successful but not expected to be");
             return this;
         }
 
-        public Asserts errorContains(String msg) {
+        public Asserts errorContains(String... messages) {
+            Assert.assertNotEquals("no error messages defined to check against", 0, messages.length);
             Assert.assertNotNull("No exception was found but expected one", error);
-            Assert.assertTrue("Error message '" + error.getMessage() + "' does not contain '" + msg + "'", error.getMessage().contains(msg));
+            if (!Stream.of(messages).anyMatch(msg -> error.getMessage().contains(msg)))
+                fail("Error message '" + error.getMessage() + "' does not contain any of " + Arrays.toString(messages));
             return this;
         }
 
@@ -91,7 +98,7 @@ public class NodeToolResult
                     return this;
                 }
             }
-            Assert.fail("Unable to locate message " + msg + " in notifications: " + notifications);
+            fail("Unable to locate message " + msg + " in notifications: " + NodeToolResult.toString(notifications));
             return this; // unreachable
         }
 
@@ -106,9 +113,38 @@ public class NodeToolResult
                     }
                 }
             }
-            Assert.fail("Unable to locate message '" + msg + "' in notifications: " + notifications);
+            fail("Unable to locate message '" + msg + "' in notifications: " + NodeToolResult.toString(notifications));
             return this; // unreachable
         }
+
+        private void fail(String message)
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append("nodetool command ").append(Arrays.toString(commandAndArgs)).append(" ").append(message).append("\n");
+            sb.append("Notifications:\n");
+            for (Notification n : notifications)
+                sb.append(NodeToolResult.toString(n)).append("\n");
+            if (error != null)
+                sb.append("Error:\n").append(Throwables.getStackTraceAsString(error)).append("\n");
+            throw new AssertionError(sb.toString());
+        }
+    }
+
+    private static String toString(Collection<Notification> notifications)
+    {
+        return notifications.stream().map(NodeToolResult::toString).collect(Collectors.joining(", "));
+    }
+
+    private static String toString(Notification notification)
+    {
+        ProgressEventType type = ProgressEventType.values()[notificationType(notification)];
+        String msg = notification.getMessage();
+        Object src = notification.getSource();
+        return "Notification{" +
+               "type=" + type +
+               ", src=" + src +
+               ", message=" + msg +
+               "}";
     }
 
     private static int notificationType(Notification n)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index fc31fdf..0d8f96f 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -47,6 +47,7 @@ import ch.qos.logback.classic.LoggerContext;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.Throwables;
 
 public class IsolatedExecutor implements IIsolatedExecutor
 {
@@ -65,7 +66,7 @@ public class IsolatedExecutor implements IIsolatedExecutor
 
     public Future<Void> shutdown()
     {
-        isolatedExecutor.shutdown();
+        isolatedExecutor.shutdownNow();
 
         /* Use a thread pool with a core pool size of zero to terminate the thread as soon as possible
         ** so the instance class loader can be garbage collected.  Uses a custom thread factory
@@ -202,11 +203,12 @@ public class IsolatedExecutor implements IIsolatedExecutor
         }
         catch (InterruptedException e)
         {
-            throw new RuntimeException(e);
+            Thread.currentThread().interrupt();
+            throw Throwables.throwAsUncheckedException(e);
         }
         catch (ExecutionException e)
         {
-            throw new RuntimeException(e.getCause());
+            throw Throwables.throwAsUncheckedException(e.getCause());
         }
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
index d3904b3..1053925 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
 @RunWith(Parameterized.class)
-public class FullRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class FullRepairCoordinatorNeighbourDownTest extends RepairCoordinatorNeighbourDown
 {
-    public FullRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+    public FullRepairCoordinatorNeighbourDownTest(RepairParallelism parallelism, boolean withNotifications)
     {
         super(RepairType.FULL, parallelism, withNotifications);
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..d91cb5d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class FullRepairCoordinatorTimeoutTest extends RepairCoordinatorTimeout
+{
+    public FullRepairCoordinatorTimeoutTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.FULL, parallelism, withNotifications);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
index 7f9b35f..af17567 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
 @RunWith(Parameterized.class)
-public class IncrementalRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class IncrementalRepairCoordinatorNeighbourDownTest extends RepairCoordinatorNeighbourDown
 {
-    public IncrementalRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+    public IncrementalRepairCoordinatorNeighbourDownTest(RepairParallelism parallelism, boolean withNotifications)
     {
         super(RepairType.INCREMENTAL, parallelism, withNotifications);
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..0fdae57
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class IncrementalRepairCoordinatorTimeoutTest extends RepairCoordinatorTimeout
+{
+    public IncrementalRepairCoordinatorTimeoutTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.INCREMENTAL, parallelism, withNotifications);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
index 2d52475..1926f9b 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
 @RunWith(Parameterized.class)
-public class PreviewRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class PreviewRepairCoordinatorNeighbourDownTest extends RepairCoordinatorNeighbourDown
 {
-    public PreviewRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+    public PreviewRepairCoordinatorNeighbourDownTest(RepairParallelism parallelism, boolean withNotifications)
     {
         super(RepairType.PREVIEW, parallelism, withNotifications);
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..8b90909
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class PreviewRepairCoordinatorTimeoutTest extends RepairCoordinatorTimeout
+{
+    public PreviewRepairCoordinatorTimeoutTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.PREVIEW, parallelism, withNotifications);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
index b05706a..8d26b2e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.time.Duration;
 import java.util.Set;
 
 import com.google.common.collect.Iterables;
@@ -43,6 +44,7 @@ import static org.apache.cassandra.distributed.test.DistributedRepairUtils.asser
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
 
 public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
 {
@@ -51,104 +53,113 @@ public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
         super(repairType, parallelism, withNotifications);
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void simple() {
         String table = tableName("simple");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY KEY (key))", KEYSPACE, table));
-        CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, table);
-        result.asserts().success();
-        if (withNotifications)
-        {
-            result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting repair command")
-                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.SUCCESS, repairType != RepairType.PREVIEW ? "Repair completed successfully": "Repair preview completed successfully")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished");
-        }
-
-        if (repairType != RepairType.PREVIEW)
-        {
-            assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
-        }
-        else
-        {
-            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
-        }
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY KEY (key))", KEYSPACE, table));
+            CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
 
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, KEYSPACE, table);
+            result.asserts().success();
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting repair command")
+                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.SUCCESS, repairType != RepairType.PREVIEW ? "Repair completed successfully": "Repair preview completed successfully")
+                      .notificationContains(ProgressEventType.COMPLETE, "finished");
+            }
+
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void missingKeyspace()
     {
-        // as of this moment the check is done in nodetool so the JMX notifications are not imporant
-        // nor is the history stored
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, "doesnotexist");
-        result.asserts()
-              .failure()
-              .errorContains("Keyspace [doesnotexist] does not exist.");
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            // as of this moment the check is done in nodetool so the JMX notifications are not imporant
+            // nor is the history stored
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, "doesnotexist");
+            result.asserts()
+                  .failure()
+                  .errorContains("Keyspace [doesnotexist] does not exist.");
 
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
 
-        assertParentRepairNotExist(CLUSTER, "doesnotexist");
+            assertParentRepairNotExist(CLUSTER, "doesnotexist");
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void missingTable()
     {
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, "doesnotexist");
-        result.asserts()
-              .failure();
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            String tableName = tableName("doesnotexist");
+            NodeToolResult result = repair(2, KEYSPACE, tableName);
             result.asserts()
-                  .errorContains("failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
-                  // Start notification is ignored since this is checked during setup (aka before start)
-                  .notificationContains(ProgressEventType.ERROR, "failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
-        }
+                  .failure();
+            if (withNotifications)
+            {
+                result.asserts()
+                      .errorContains("Unknown keyspace/cf pair (distributed_test_keyspace." + tableName + ")")
+                      // Start notification is ignored since this is checked during setup (aka before start)
+                      .notificationContains(ProgressEventType.ERROR, "failed with error Unknown keyspace/cf pair (distributed_test_keyspace." + tableName + ")")
+                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
 
-        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void noTablesToRepair()
     {
         // index CF currently don't support repair, so they get dropped when listed
         // this is done in this test to cause the keyspace to have 0 tables to repair, which causes repair to no-op
         // early and skip.
         String table = tableName("withindex");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-        CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s (value)", postfix(), KEYSPACE, table));
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        // if CF has a . in it, it is assumed to be a 2i which rejects repairs
-        NodeToolResult result = repair(2, KEYSPACE, table + ".value");
-        result.asserts().success();
-        if (withNotifications)
-        {
-            result.asserts()
-                  .notificationContains("Empty keyspace")
-                  .notificationContains("skipping repair: " + KEYSPACE)
-                  // Start notification is ignored since this is checked during setup (aka before start)
-                  .notificationContains(ProgressEventType.SUCCESS, "Empty keyspace") // will fail since success isn't returned; only complete
-                  .notificationContains(ProgressEventType.COMPLETE, "finished"); // will fail since it doesn't do this
-        }
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+            CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s (value)", postfix(), KEYSPACE, table));
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            // if CF has a . in it, it is assumed to be a 2i which rejects repairs
+            NodeToolResult result = repair(2, KEYSPACE, table + ".value");
+            result.asserts().success();
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains("Empty keyspace")
+                      .notificationContains("skipping repair: " + KEYSPACE)
+                      // Start notification is ignored since this is checked during setup (aka before start)
+                      .notificationContains(ProgressEventType.SUCCESS, "Empty keyspace") // will fail since success isn't returned; only complete
+                      .notificationContains(ProgressEventType.COMPLETE, "finished"); // will fail since it doesn't do this
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
 
-        // this is actually a SKIP and not a FAILURE, so shouldn't increment
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            // this is actually a SKIP and not a FAILURE, so shouldn't increment
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void intersectingRange()
     {
         // this test exists to show that this case will cause repair to finish; success or failure isn't imporant
@@ -156,229 +167,238 @@ public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
         // repair to fail but it didn't, this would be fine and this test should be updated to reflect the new
         // semantic
         String table = tableName("intersectingrange");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-
-        //TODO dtest api for this?
-        LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
-            Set<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
-            Range<Token> range = Iterables.getFirst(ranges, null);
-            long left = (long) range.left.getTokenValue();
-            long right = (long) range.right.getTokenValue();
-            return new LongTokenRange(left, right);
-        });
-        LongTokenRange intersectingRange = new LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, table,
-                                       "--start-token", Long.toString(intersectingRange.minExclusive),
-                                       "--end-token", Long.toString(intersectingRange.maxInclusive));
-        result.asserts()
-              .failure()
-              .errorContains("Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one");
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+            //TODO dtest api for this?
+            LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
+                Set<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
+                Range<Token> range = Iterables.getFirst(ranges, null);
+                long left = (long) range.left.getTokenValue();
+                long right = (long) range.right.getTokenValue();
+                return new LongTokenRange(left, right);
+            });
+            LongTokenRange intersectingRange = new LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, KEYSPACE, table,
+                                           "--start-token", Long.toString(intersectingRange.minExclusive),
+                                           "--end-token", Long.toString(intersectingRange.maxInclusive));
             result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting repair command")
-                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.ERROR, "Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
-        }
+                  .failure()
+                  .errorContains("Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting repair command")
+                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, "Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one")
+                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void unknownHost()
     {
         String table = tableName("unknownhost");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "thisreally.should.not.exist.apache.org");
-        result.asserts()
-              .failure()
-              .errorContains("Unknown host specified thisreally.should.not.exist.apache.org");
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "thisreally.should.not.exist.apache.org");
             result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting repair command")
-                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.ERROR, "Unknown host specified thisreally.should.not.exist.apache.org")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
-        }
+                  .failure()
+                  .errorContains("Unknown host specified thisreally.should.not.exist.apache.org");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting repair command")
+                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, "Unknown host specified thisreally.should.not.exist.apache.org")
+                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void desiredHostNotCoordinator()
     {
         // current limitation is that the coordinator must be apart of the repair, so as long as that exists this test
         // verifies that the validation logic will termniate the repair properly
         String table = tableName("desiredhostnotcoordinator");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "localhost");
-        result.asserts()
-              .failure()
-              .errorContains("The current host must be part of the repair");
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "localhost");
             result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting repair command")
-                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.ERROR, "The current host must be part of the repair")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
-        }
+                  .failure()
+                  .errorContains("The current host must be part of the repair");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting repair command")
+                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, "The current host must be part of the repair")
+                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void onlyCoordinator()
     {
         // this is very similar to ::desiredHostNotCoordinator but has the difference that the only host to do repair
         // is the coordinator
         String table = tableName("onlycoordinator");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", "localhost");
-        result.asserts()
-              .failure()
-              .errorContains("Specified hosts [localhost] do not share range");
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", "localhost");
             result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting repair command")
-                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.ERROR, "Specified hosts [localhost] do not share range")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
-        }
+                  .failure()
+                  .errorContains("Specified hosts [localhost] do not share range");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting repair command")
+                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, "Specified hosts [localhost] do not share range")
+                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        //TODO should this be marked as fail to match others?  Should they not be marked?
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            //TODO should this be marked as fail to match others?  Should they not be marked?
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void replicationFactorOne()
     {
         // In the case of rf=1 repair fails to create a cmd handle so node tool exists early
         String table = tableName("one");
-        // since cluster is shared and this test gets called multiple times, need "IF NOT EXISTS" so the second+ attempt
-        // does not fail
-        CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS replicationfactor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
-        CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s (key text, value text, PRIMARY KEY (key))", table));
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            // since cluster is shared and this test gets called multiple times, need "IF NOT EXISTS" so the second+ attempt
+            // does not fail
+            CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS replicationfactor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
+            CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s (key text, value text, PRIMARY KEY (key))", table));
 
-        long repairExceptions = getRepairExceptions(CLUSTER, 1);
-        NodeToolResult result = repair(1, "replicationfactor", table);
-        result.asserts()
-              .success();
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, "replicationfactor", table);
+            result.asserts()
+                  .success();
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 1));
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 1));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void prepareFailure()
     {
         String table = tableName("preparefailure");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
-            throw new RuntimeException("prepare fail");
-        })).drop();
-        try
-        {
-            long repairExceptions = getRepairExceptions(CLUSTER, 1);
-            NodeToolResult result = repair(1, KEYSPACE, table);
-            result.asserts()
-                  .failure()
-                  .errorContains("Got negative replies from endpoints");
-            if (withNotifications)
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+            IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
+                throw new RuntimeException("prepare fail");
+            })).drop();
+            try
             {
+                long repairExceptions = getRepairExceptions(CLUSTER, 1);
+                NodeToolResult result = repair(1, KEYSPACE, table);
                 result.asserts()
-                      .notificationContains(ProgressEventType.START, "Starting repair command")
-                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                      .notificationContains(ProgressEventType.ERROR, "Got negative replies from endpoints")
-                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
-            }
-
-            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
-            if (repairType != RepairType.PREVIEW)
-            {
-                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints");
+                      .failure()
+                      .errorContains("Got negative replies from endpoints");
+                if (withNotifications)
+                {
+                    result.asserts()
+                          .notificationContains(ProgressEventType.START, "Starting repair command")
+                          .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                          .notificationContains(ProgressEventType.ERROR, "Got negative replies from endpoints")
+                          .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+                }
+
+                Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+                if (repairType != RepairType.PREVIEW)
+                {
+                    assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints");
+                }
+                else
+                {
+                    assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+                }
             }
-            else
+            finally
             {
-                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+                filter.off();
             }
-        }
-        finally
-        {
-            filter.off();
-        }
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void snapshotFailure()
     {
         Assume.assumeFalse("incremental does not do snapshot", repairType == RepairType.INCREMENTAL);
         Assume.assumeFalse("Parallel repair does not perform snapshots", parallelism == RepairParallelism.PARALLEL);
 
         String table = tableName("snapshotfailure");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
-            throw new RuntimeException("snapshot fail");
-        })).drop();
-        try
-        {
-            long repairExceptions = getRepairExceptions(CLUSTER, 1);
-            NodeToolResult result = repair(1, KEYSPACE, table);
-            result.asserts()
-                  .failure();
-            if (withNotifications)
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+            IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
+                throw new RuntimeException("snapshot fail");
+            })).drop();
+            try
             {
+                long repairExceptions = getRepairExceptions(CLUSTER, 1);
+                NodeToolResult result = repair(1, KEYSPACE, table);
                 result.asserts()
-                      .errorContains("Could not create snapshot")
-                      .notificationContains(ProgressEventType.START, "Starting repair command")
-                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                      .notificationContains(ProgressEventType.ERROR, "Could not create snapshot ")
-                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
-            }
-            else
-            {
-                // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
-                // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
-                // polls on) is ignored.  With notifications off, the poll await fails and queries cmd state, and that
-                // will have the below error.
-                // NOTE: this isn't desireable, would be good to propgate
-                result.asserts()
-                      .errorContains("Some repair failed");
+                      .failure()
+                      // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
+                      // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
+                      // polls on) is ignored.  With notifications off or dropped, the poll await fails and queries cmd
+                      // state, and that will have the below error.
+                      // NOTE: this isn't desireable, would be good to propgate
+                      .errorContains("Could not create snapshot", "Some repair failed");
+                if (withNotifications)
+                {
+                    result.asserts()
+                          .notificationContains(ProgressEventType.START, "Starting repair command")
+                          .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                          .notificationContains(ProgressEventType.ERROR, "Could not create snapshot ")
+                          .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+                }
+
+                Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+                if (repairType != RepairType.PREVIEW)
+                {
+                    assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Could not create snapshot");
+                }
+                else
+                {
+                    assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+                }
             }
-
-            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
-            if (repairType != RepairType.PREVIEW)
+            finally
             {
-                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Could not create snapshot");
+                filter.off();
             }
-            else
-            {
-                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
-            }
-        }
-        finally
-        {
-            filter.off();
-        }
+        });
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
similarity index 52%
rename from test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
rename to test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
index 7be8ed1..db01b13 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
@@ -19,17 +19,17 @@
 package org.apache.cassandra.distributed.test;
 
 import java.net.UnknownHostException;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
@@ -43,128 +43,107 @@ import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
 
-public abstract class RepairCoordinatorSlow extends RepairCoordinatorBase
+public abstract class RepairCoordinatorNeighbourDown extends RepairCoordinatorBase
 {
-    public RepairCoordinatorSlow(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+    public RepairCoordinatorNeighbourDown(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
     {
         super(repairType, parallelism, withNotifications);
     }
 
-    @Test(timeout = 1 * 60 * 1000)
-    public void prepareRPCTimeout()
+    @Before
+    public void beforeTest()
     {
-        String table = tableName("preparerpctimeout");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).drop();
-        try
-        {
-            long repairExceptions = getRepairExceptions(CLUSTER, 1);
-            NodeToolResult result = repair(1, KEYSPACE, table);
-            result.asserts()
-                  .failure()
-                  .errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
-            if (withNotifications)
+        CLUSTER.filters().reset();
+        CLUSTER.forEach(i -> {
+            try
             {
-                result.asserts()
-                      .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
-                      .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
-                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+                i.startup();
             }
-
-            if (repairType != RepairType.PREVIEW)
+            catch (IllegalStateException e)
             {
-                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
+                // ignore, node wasn't down
             }
-            else
-            {
-                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
-            }
-
-            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
-        }
-        finally
-        {
-            filter.off();
-        }
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
-    public void neighbourDown() throws InterruptedException, ExecutionException
+    @Test
+    public void neighbourDown()
     {
         String table = tableName("neighbourdown");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-        Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
-        String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().toString());
-        try
-        {
-            // wait for the node to stop
-            shutdownFuture.get();
-            // wait for the failure detector to detect this
-            CLUSTER.get(1).runOnInstance(() -> {
-                InetAddressAndPort neighbor;
-                try
-                {
-                    neighbor = InetAddressAndPort.getByName(downNodeAddress);
-                }
-                catch (UnknownHostException e)
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+            Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
+            String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().toString());
+            try
+            {
+                // wait for the node to stop
+                shutdownFuture.get();
+                // wait for the failure detector to detect this
+                CLUSTER.get(1).runOnInstance(() -> {
+                    InetAddressAndPort neighbor;
+                    try
+                    {
+                        neighbor = InetAddressAndPort.getByName(downNodeAddress);
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    while (FailureDetector.instance.isAlive(neighbor))
+                        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+                });
+
+                long repairExceptions = getRepairExceptions(CLUSTER, 1);
+                NodeToolResult result = repair(1, KEYSPACE, table);
+                result.asserts()
+                      .failure()
+                      .errorContains("Endpoint not alive");
+                if (withNotifications)
                 {
-                    throw new RuntimeException(e);
+                    result.asserts()
+                          .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
+                          .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                          .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not alive")
+                          .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
                 }
-                while (FailureDetector.instance.isAlive(neighbor))
-                    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-            });
 
-            long repairExceptions = getRepairExceptions(CLUSTER, 1);
-            NodeToolResult result = repair(1, KEYSPACE, table);
-            result.asserts()
-                  .failure()
-                  .errorContains("Endpoint not alive");
-            if (withNotifications)
+                Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+            }
+            finally
             {
-                result.asserts()
-                      .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
-                      .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
-                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not alive")
-                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+                CLUSTER.get(2).startup();
             }
 
-            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
-        }
-        finally
-        {
-            CLUSTER.get(2).startup();
-        }
-
-        // make sure to call outside of the try/finally so the node is up so we can actually query
-        if (repairType != RepairType.PREVIEW)
-        {
-            assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint not alive");
-        }
-        else
-        {
-            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
-        }
+            // make sure to call outside of the try/finally so the node is up so we can actually query
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint not alive");
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void validationParticipentCrashesAndComesBack()
     {
         // Test what happens when a participant restarts in the middle of validation
         // Currently this isn't recoverable but could be.
         // TODO since this is a real restart, how would I test "long pause"? Can't send SIGSTOP since same procress
         String table = tableName("validationparticipentcrashesandcomesback");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-        AtomicReference<Future<Void>> participantShutdown = new AtomicReference<>();
-        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
-            // the nice thing about this is that this lambda is "capturing" and not "transfer", what this means is that
-            // this lambda isn't serialized and any object held isn't copied.
-            participantShutdown.set(CLUSTER.get(2).shutdown());
-            return true; // drop it so this node doesn't reply before shutdown.
-        })).drop();
-        try
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+            AtomicReference<Future<Void>> participantShutdown = new AtomicReference<>();
+            CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
+                // the nice thing about this is that this lambda is "capturing" and not "transfer", what this means is that
+                // this lambda isn't serialized and any object held isn't copied.
+                participantShutdown.set(CLUSTER.get(2).shutdown());
+                return true; // drop it so this node doesn't reply before shutdown.
+            })).drop();
             // since nodetool is blocking, need to handle participantShutdown in the background
             CompletableFuture<Void> recovered = CompletableFuture.runAsync(() -> {
                 try {
@@ -215,15 +194,6 @@ public abstract class RepairCoordinatorSlow extends RepairCoordinatorBase
             {
                 assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
             }
-        }
-        finally
-        {
-            filter.off();
-            try {
-                CLUSTER.get(2).startup();
-            } catch (Exception e) {
-                // if you call startup twice it is allowed to fail, so ignore it... hope this didn't brike the other tests =x
-            }
-        }
+        });
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
new file mode 100644
index 0000000..7a08187
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
@@ -0,0 +1,67 @@
+package org.apache.cassandra.distributed.test;
+
+import java.time.Duration;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.net.Verb;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
+
+public abstract class RepairCoordinatorTimeout extends RepairCoordinatorBase
+{
+    public RepairCoordinatorTimeout(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(repairType, parallelism, withNotifications);
+    }
+
+    @Before
+    public void beforeTest()
+    {
+        CLUSTER.filters().reset();
+    }
+
+    @Test
+    public void prepareRPCTimeout()
+    {
+        String table = tableName("preparerpctimeout");
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+            CLUSTER.verbs(Verb.PREPARE_MSG).drop();
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            result.asserts()
+                  .failure()
+                  .errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
+                      .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
+                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+            }
+
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+        });
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/AssertUtil.java b/test/unit/org/apache/cassandra/utils/AssertUtil.java
new file mode 100644
index 0000000..4d35ede
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/AssertUtil.java
@@ -0,0 +1,128 @@
+package org.apache.cassandra.utils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+public final class AssertUtil
+{
+    private AssertUtil()
+    {
+
+    }
+
+    /**
+     * Launch the input in another thread, throws a assert failure if it takes longer than the defined timeout.
+     *
+     * An attempt to halt the thread uses an interrupt, but only works if the underline logic respects it.
+     *
+     * The assert message will contain the stacktrace at the time of the timeout; grouped by common threads.
+     */
+    public static void assertTimeoutPreemptively(Duration timeout, Executable fn)
+    {
+        StackTraceElement caller = Thread.currentThread().getStackTrace()[2];
+        assertTimeoutPreemptively(caller, timeout, () -> {
+            fn.execute();
+            return null;
+        });
+    }
+
+    /**
+     * Launch the input in another thread, throws a assert failure if it takes longer than the defined timeout.
+     *
+     * An attempt to halt the thread uses an interrupt, but only works if the underline logic respects it.
+     *
+     * The assert message will contain the stacktrace at the time of the timeout; grouped by common threads.
+     */
+    public static <T> T assertTimeoutPreemptively(Duration timeout, ThrowingSupplier<T> supplier)
+    {
+        StackTraceElement caller = Thread.currentThread().getStackTrace()[2];
+        return assertTimeoutPreemptively(caller, timeout, supplier);
+    }
+
+    private static <T> T assertTimeoutPreemptively(StackTraceElement caller, Duration timeout, ThrowingSupplier<T> supplier)
+    {
+
+        String[] split = caller.getClassName().split("\\.");
+        String simpleClassName = split[split.length - 1];
+        ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("TimeoutTest-" + simpleClassName + "#" + caller.getMethodName()));
+        try
+        {
+            Future<T> future = executorService.submit(() -> {
+                try {
+                    return supplier.get();
+                }
+                catch (Throwable throwable) {
+                    throw Throwables.throwAsUncheckedException(throwable);
+                }
+            });
+
+            long timeoutInNanos = timeout.toNanos();
+            try
+            {
+                return future.get(timeoutInNanos, TimeUnit.NANOSECONDS);
+            }
+            catch (TimeoutException ex)
+            {
+                future.cancel(true);
+                Map<Thread, StackTraceElement[]> threadDump = Thread.getAllStackTraces();
+                StringBuilder sb = new StringBuilder("execution timed out after ").append(TimeUnit.NANOSECONDS.toMillis(timeoutInNanos)).append(" ms\n");
+                Multimap<List<StackTraceElement>, Thread> groupCommonThreads = HashMultimap.create();
+                for (Map.Entry<Thread, StackTraceElement[]> e : threadDump.entrySet())
+                    groupCommonThreads.put(Arrays.asList(e.getValue()), e.getKey());
+
+                for (Map.Entry<List<StackTraceElement>, Collection<Thread>> e : groupCommonThreads.asMap().entrySet())
+                {
+                    sb.append("Threads: ");
+                    Joiner.on(", ").appendTo(sb, e.getValue().stream().map(Thread::getName).iterator());
+                    sb.append("\n");
+                    for (StackTraceElement elem : e.getKey())
+                        sb.append("\t").append(elem.getClassName()).append(".").append(elem.getMethodName()).append("[").append(elem.getLineNumber()).append("]\n");
+                    sb.append("\n");
+                }
+                throw new AssertionError(sb.toString());
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                throw Throwables.throwAsUncheckedException(e);
+            }
+            catch (ExecutionException ex)
+            {
+                throw Throwables.throwAsUncheckedException(ex.getCause());
+            }
+            catch (Throwable ex)
+            {
+                throw Throwables.throwAsUncheckedException(ex);
+            }
+        }
+        finally
+        {
+            executorService.shutdownNow();
+        }
+    }
+
+    public interface ThrowingSupplier<T>
+    {
+        T get() throws Throwable;
+    }
+
+    public interface Executable
+    {
+        void execute() throws Throwable;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org