You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/04/01 13:11:59 UTC
[cassandra] branch trunk updated: Fix flaky test
o.a.c.d.test.*RepairCoordinatorFastTest
This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new dfc279a Fix flaky test o.a.c.d.test.*RepairCoordinatorFastTest
dfc279a is described below
commit dfc279a22a5563ac7a832a586914d5410426e9b7
Author: David Capwell <dc...@gmail.com>
AuthorDate: Sat Mar 21 20:57:50 2020 -0700
Fix flaky test o.a.c.d.test.*RepairCoordinatorFastTest
patch by David Capwell; reviewed by Ekaterina Dimitrova and Benjamin Lerer
for CASSANDRA-15650
---
.../cassandra/concurrent/NamedThreadFactory.java | 5 +-
src/java/org/apache/cassandra/tools/NodeProbe.java | 2 +-
.../org/apache/cassandra/tools/RepairRunner.java | 40 +-
.../org/apache/cassandra/utils/Throwables.java | 10 +
.../cassandra/distributed/api/NodeToolResult.java | 48 +-
.../distributed/impl/IsolatedExecutor.java | 8 +-
...=> FullRepairCoordinatorNeighbourDownTest.java} | 4 +-
.../test/FullRepairCoordinatorTimeoutTest.java | 16 +
...ementalRepairCoordinatorNeighbourDownTest.java} | 4 +-
.../IncrementalRepairCoordinatorTimeoutTest.java | 16 +
...PreviewRepairCoordinatorNeighbourDownTest.java} | 4 +-
.../test/PreviewRepairCoordinatorTimeoutTest.java | 16 +
.../distributed/test/RepairCoordinatorFast.java | 482 +++++++++++----------
...ow.java => RepairCoordinatorNeighbourDown.java} | 180 ++++----
.../distributed/test/RepairCoordinatorTimeout.java | 67 +++
.../org/apache/cassandra/utils/AssertUtil.java | 128 ++++++
16 files changed, 665 insertions(+), 365 deletions(-)
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 7cc73bd..bcf686f 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -36,7 +36,10 @@ public class NamedThreadFactory implements ThreadFactory
{
private static volatile String globalPrefix;
public static void setGlobalPrefix(String prefix) { globalPrefix = prefix; }
- public static String globalPrefix() { return globalPrefix == null ? "" : globalPrefix; }
+ public static String globalPrefix() {
+ String prefix = globalPrefix;
+ return prefix == null ? "" : prefix;
+ }
public final String id;
private final int priority;
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 180b231..f9f2c1a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -415,7 +415,7 @@ public class NodeProbe implements AutoCloseable
}
catch (Exception e)
{
- throw new IOException(e) ;
+ throw new IOException(e);
}
finally
{
diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java b/src/java/org/apache/cassandra/tools/RepairRunner.java
index d1b3409..593bc26 100644
--- a/src/java/org/apache/cassandra/tools/RepairRunner.java
+++ b/src/java/org/apache/cassandra/tools/RepairRunner.java
@@ -25,6 +25,9 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -40,7 +43,7 @@ public class RepairRunner extends JMXNotificationProgressListener
private final StorageServiceMBean ssProxy;
private final String keyspace;
private final Map<String, String> options;
- private final Condition condition = new SimpleCondition();
+ private final SimpleCondition condition = new SimpleCondition();
private int cmd;
private volatile Exception error;
@@ -59,8 +62,8 @@ public class RepairRunner extends JMXNotificationProgressListener
if (cmd <= 0)
{
// repairAsync can only return 0 for replication factor 1.
- String message = String.format("[%s] Replication factor is 1. No repair is needed for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
- out.println(message);
+ String message = String.format("Replication factor is 1. No repair is needed for keyspace '%s'", keyspace);
+ printMessage(message);
}
else
{
@@ -69,6 +72,14 @@ public class RepairRunner extends JMXNotificationProgressListener
queryForCompletedRepair(String.format("After waiting for poll interval of %s seconds",
NodeProbe.JMX_NOTIFICATION_POLL_INTERVAL_SECONDS));
}
+ Exception error = this.error;
+ if (error == null)
+ {
+ // notifications are lossy so its possible to see complete and not error; request latest state
+ // from the server
+ queryForCompletedRepair("condition satisfied");
+ error = this.error;
+ }
if (error != null)
{
throw error;
@@ -111,12 +122,12 @@ public class RepairRunner extends JMXNotificationProgressListener
public void progress(String tag, ProgressEvent event)
{
ProgressEventType type = event.getType();
- String message = String.format("[%s] %s", format.format(System.currentTimeMillis()), event.getMessage());
+ String message = event.getMessage();
if (type == ProgressEventType.PROGRESS)
{
message = message + " (progress: " + (int) event.getProgressPercentage() + "%)";
}
- out.println(message);
+ printMessage(message);
if (type == ProgressEventType.ERROR)
{
error = new RuntimeException(String.format("Repair job has failed with the error message: %s. " +
@@ -136,9 +147,9 @@ public class RepairRunner extends JMXNotificationProgressListener
String queriedString = "queried for parent session status and";
if (status == null)
{
- String message = String.format("[%s] %s %s couldn't find repair status for cmd: %s", triggeringCondition,
- queriedString, format.format(System.currentTimeMillis()), cmd);
- out.println(message);
+ String message = String.format("%s %s couldn't find repair status for cmd: %s", triggeringCondition,
+ queriedString, cmd);
+ printMessage(message);
}
else
{
@@ -148,8 +159,8 @@ public class RepairRunner extends JMXNotificationProgressListener
{
case COMPLETED:
case FAILED:
- out.println(String.format("[%s] %s %s discovered repair %s.",
- this.format.format(System.currentTimeMillis()), triggeringCondition,
+ printMessage(String.format("%s %s discovered repair %s.",
+ triggeringCondition,
queriedString, parentRepairStatus.name().toLowerCase()));
if (parentRepairStatus == ActiveRepairService.ParentRepairStatus.FAILED)
{
@@ -161,7 +172,7 @@ public class RepairRunner extends JMXNotificationProgressListener
case IN_PROGRESS:
break;
default:
- out.println(String.format("[%s] WARNING Encountered unexpected RepairRunnable.ParentRepairStatus: %s", System.currentTimeMillis(), parentRepairStatus));
+ printMessage(String.format("WARNING Encountered unexpected RepairRunnable.ParentRepairStatus: %s", parentRepairStatus));
printMessages(messages);
break;
}
@@ -172,7 +183,12 @@ public class RepairRunner extends JMXNotificationProgressListener
{
for (String message : messages)
{
- out.println(String.format("[%s] %s", this.format.format(System.currentTimeMillis()), message));
+ printMessage(message);
}
}
+
+ private void printMessage(String message)
+ {
+ out.println(String.format("[%s] %s", this.format.format(System.currentTimeMillis()), message));
+ }
}
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 9c6da60..f727b5a 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -231,6 +231,16 @@ public final class Throwables
}
/**
+ * throw the exception as a unchecked exception, wrapping if a checked exception, else rethroing as is.
+ */
+ public static RuntimeException throwAsUncheckedException(Throwable t)
+ {
+ if (t instanceof Error)
+ throw (Error) t;
+ throw unchecked(t);
+ }
+
+ /**
* A shortcut for {@code unchecked(unwrapped(t))}. This is called "cleaned" because this basically removes the annoying
* cruft surrounding an exception :).
*/
diff --git a/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
index 9ba1127..8f33ae5 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -19,11 +19,14 @@
package org.apache.cassandra.distributed.api;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.management.Notification;
+import com.google.common.base.Throwables;
import org.junit.Assert;
public class NodeToolResult
@@ -68,18 +71,22 @@ public class NodeToolResult
public final class Asserts {
public Asserts success() {
- Assert.assertEquals("nodetool command " + commandAndArgs[0] + " was not successful", 0, rc);
+ if (rc != 0)
+ fail("was not successful");
return this;
}
public Asserts failure() {
- Assert.assertNotEquals("nodetool command " + commandAndArgs[0] + " was successful but not expected to be", 0, rc);
+ if (rc == 0)
+ fail("was successful but not expected to be");
return this;
}
- public Asserts errorContains(String msg) {
+ public Asserts errorContains(String... messages) {
+ Assert.assertNotEquals("no error messages defined to check against", 0, messages.length);
Assert.assertNotNull("No exception was found but expected one", error);
- Assert.assertTrue("Error message '" + error.getMessage() + "' does not contain '" + msg + "'", error.getMessage().contains(msg));
+ if (!Stream.of(messages).anyMatch(msg -> error.getMessage().contains(msg)))
+ fail("Error message '" + error.getMessage() + "' does not contain any of " + Arrays.toString(messages));
return this;
}
@@ -91,7 +98,7 @@ public class NodeToolResult
return this;
}
}
- Assert.fail("Unable to locate message " + msg + " in notifications: " + notifications);
+ fail("Unable to locate message " + msg + " in notifications: " + NodeToolResult.toString(notifications));
return this; // unreachable
}
@@ -106,9 +113,38 @@ public class NodeToolResult
}
}
}
- Assert.fail("Unable to locate message '" + msg + "' in notifications: " + notifications);
+ fail("Unable to locate message '" + msg + "' in notifications: " + NodeToolResult.toString(notifications));
return this; // unreachable
}
+
+ private void fail(String message)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("nodetool command ").append(Arrays.toString(commandAndArgs)).append(" ").append(message).append("\n");
+ sb.append("Notifications:\n");
+ for (Notification n : notifications)
+ sb.append(NodeToolResult.toString(n)).append("\n");
+ if (error != null)
+ sb.append("Error:\n").append(Throwables.getStackTraceAsString(error)).append("\n");
+ throw new AssertionError(sb.toString());
+ }
+ }
+
+ private static String toString(Collection<Notification> notifications)
+ {
+ return notifications.stream().map(NodeToolResult::toString).collect(Collectors.joining(", "));
+ }
+
+ private static String toString(Notification notification)
+ {
+ ProgressEventType type = ProgressEventType.values()[notificationType(notification)];
+ String msg = notification.getMessage();
+ Object src = notification.getSource();
+ return "Notification{" +
+ "type=" + type +
+ ", src=" + src +
+ ", message=" + msg +
+ "}";
}
private static int notificationType(Notification n)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index fc31fdf..0d8f96f 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -47,6 +47,7 @@ import ch.qos.logback.classic.LoggerContext;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.Throwables;
public class IsolatedExecutor implements IIsolatedExecutor
{
@@ -65,7 +66,7 @@ public class IsolatedExecutor implements IIsolatedExecutor
public Future<Void> shutdown()
{
- isolatedExecutor.shutdown();
+ isolatedExecutor.shutdownNow();
/* Use a thread pool with a core pool size of zero to terminate the thread as soon as possible
** so the instance class loader can be garbage collected. Uses a custom thread factory
@@ -202,11 +203,12 @@ public class IsolatedExecutor implements IIsolatedExecutor
}
catch (InterruptedException e)
{
- throw new RuntimeException(e);
+ Thread.currentThread().interrupt();
+ throw Throwables.throwAsUncheckedException(e);
}
catch (ExecutionException e)
{
- throw new RuntimeException(e.getCause());
+ throw Throwables.throwAsUncheckedException(e.getCause());
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
index d3904b3..1053925 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
@RunWith(Parameterized.class)
-public class FullRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class FullRepairCoordinatorNeighbourDownTest extends RepairCoordinatorNeighbourDown
{
- public FullRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+ public FullRepairCoordinatorNeighbourDownTest(RepairParallelism parallelism, boolean withNotifications)
{
super(RepairType.FULL, parallelism, withNotifications);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..d91cb5d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class FullRepairCoordinatorTimeoutTest extends RepairCoordinatorTimeout
+{
+ public FullRepairCoordinatorTimeoutTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.FULL, parallelism, withNotifications);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
index 7f9b35f..af17567 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
@RunWith(Parameterized.class)
-public class IncrementalRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class IncrementalRepairCoordinatorNeighbourDownTest extends RepairCoordinatorNeighbourDown
{
- public IncrementalRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+ public IncrementalRepairCoordinatorNeighbourDownTest(RepairParallelism parallelism, boolean withNotifications)
{
super(RepairType.INCREMENTAL, parallelism, withNotifications);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..0fdae57
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class IncrementalRepairCoordinatorTimeoutTest extends RepairCoordinatorTimeout
+{
+ public IncrementalRepairCoordinatorTimeoutTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.INCREMENTAL, parallelism, withNotifications);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
index 2d52475..1926f9b 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
@RunWith(Parameterized.class)
-public class PreviewRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class PreviewRepairCoordinatorNeighbourDownTest extends RepairCoordinatorNeighbourDown
{
- public PreviewRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+ public PreviewRepairCoordinatorNeighbourDownTest(RepairParallelism parallelism, boolean withNotifications)
{
super(RepairType.PREVIEW, parallelism, withNotifications);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..8b90909
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class PreviewRepairCoordinatorTimeoutTest extends RepairCoordinatorTimeout
+{
+ public PreviewRepairCoordinatorTimeoutTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.PREVIEW, parallelism, withNotifications);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
index b05706a..8d26b2e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.distributed.test;
+import java.time.Duration;
import java.util.Set;
import com.google.common.collect.Iterables;
@@ -43,6 +44,7 @@ import static org.apache.cassandra.distributed.test.DistributedRepairUtils.asser
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
{
@@ -51,104 +53,113 @@ public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
super(repairType, parallelism, withNotifications);
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void simple() {
String table = tableName("simple");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY KEY (key))", KEYSPACE, table));
- CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
-
- long repairExceptions = getRepairExceptions(CLUSTER, 2);
- NodeToolResult result = repair(2, KEYSPACE, table);
- result.asserts().success();
- if (withNotifications)
- {
- result.asserts()
- .notificationContains(ProgressEventType.START, "Starting repair command")
- .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(ProgressEventType.SUCCESS, repairType != RepairType.PREVIEW ? "Repair completed successfully": "Repair preview completed successfully")
- .notificationContains(ProgressEventType.COMPLETE, "finished");
- }
-
- if (repairType != RepairType.PREVIEW)
- {
- assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
- }
- else
- {
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- }
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY KEY (key))", KEYSPACE, table));
+ CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
- Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, table);
+ result.asserts().success();
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.SUCCESS, repairType != RepairType.PREVIEW ? "Repair completed successfully": "Repair preview completed successfully")
+ .notificationContains(ProgressEventType.COMPLETE, "finished");
+ }
+
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void missingKeyspace()
{
- // as of this moment the check is done in nodetool so the JMX notifications are not imporant
- // nor is the history stored
- long repairExceptions = getRepairExceptions(CLUSTER, 2);
- NodeToolResult result = repair(2, "doesnotexist");
- result.asserts()
- .failure()
- .errorContains("Keyspace [doesnotexist] does not exist.");
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ // as of this moment the check is done in nodetool so the JMX notifications are not imporant
+ // nor is the history stored
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, "doesnotexist");
+ result.asserts()
+ .failure()
+ .errorContains("Keyspace [doesnotexist] does not exist.");
- Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
- assertParentRepairNotExist(CLUSTER, "doesnotexist");
+ assertParentRepairNotExist(CLUSTER, "doesnotexist");
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void missingTable()
{
- long repairExceptions = getRepairExceptions(CLUSTER, 2);
- NodeToolResult result = repair(2, KEYSPACE, "doesnotexist");
- result.asserts()
- .failure();
- if (withNotifications)
- {
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ String tableName = tableName("doesnotexist");
+ NodeToolResult result = repair(2, KEYSPACE, tableName);
result.asserts()
- .errorContains("failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
- // Start notification is ignored since this is checked during setup (aka before start)
- .notificationContains(ProgressEventType.ERROR, "failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
- .notificationContains(ProgressEventType.COMPLETE, "finished with error");
- }
+ .failure();
+ if (withNotifications)
+ {
+ result.asserts()
+ .errorContains("Unknown keyspace/cf pair (distributed_test_keyspace." + tableName + ")")
+ // Start notification is ignored since this is checked during setup (aka before start)
+ .notificationContains(ProgressEventType.ERROR, "failed with error Unknown keyspace/cf pair (distributed_test_keyspace." + tableName + ")")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
- assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
- Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void noTablesToRepair()
{
// index CF currently don't support repair, so they get dropped when listed
// this is done in this test to cause the keyspace to have 0 tables to repair, which causes repair to no-op
// early and skip.
String table = tableName("withindex");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
- CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s (value)", postfix(), KEYSPACE, table));
-
- long repairExceptions = getRepairExceptions(CLUSTER, 2);
- // if CF has a . in it, it is assumed to be a 2i which rejects repairs
- NodeToolResult result = repair(2, KEYSPACE, table + ".value");
- result.asserts().success();
- if (withNotifications)
- {
- result.asserts()
- .notificationContains("Empty keyspace")
- .notificationContains("skipping repair: " + KEYSPACE)
- // Start notification is ignored since this is checked during setup (aka before start)
- .notificationContains(ProgressEventType.SUCCESS, "Empty keyspace") // will fail since success isn't returned; only complete
- .notificationContains(ProgressEventType.COMPLETE, "finished"); // will fail since it doesn't do this
- }
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s (value)", postfix(), KEYSPACE, table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ // if CF has a . in it, it is assumed to be a 2i which rejects repairs
+ NodeToolResult result = repair(2, KEYSPACE, table + ".value");
+ result.asserts().success();
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains("Empty keyspace")
+ .notificationContains("skipping repair: " + KEYSPACE)
+ // Start notification is ignored since this is checked during setup (aka before start)
+ .notificationContains(ProgressEventType.SUCCESS, "Empty keyspace") // will fail since success isn't returned; only complete
+ .notificationContains(ProgressEventType.COMPLETE, "finished"); // will fail since it doesn't do this
+ }
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
- // this is actually a SKIP and not a FAILURE, so shouldn't increment
- Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ // this is actually a SKIP and not a FAILURE, so shouldn't increment
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void intersectingRange()
{
// this test exists to show that this case will cause repair to finish; success or failure isn't imporant
@@ -156,229 +167,238 @@ public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
// repair to fail but it didn't, this would be fine and this test should be updated to reflect the new
// semantic
String table = tableName("intersectingrange");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-
- //TODO dtest api for this?
- LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
- Set<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
- Range<Token> range = Iterables.getFirst(ranges, null);
- long left = (long) range.left.getTokenValue();
- long right = (long) range.right.getTokenValue();
- return new LongTokenRange(left, right);
- });
- LongTokenRange intersectingRange = new LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
-
- long repairExceptions = getRepairExceptions(CLUSTER, 2);
- NodeToolResult result = repair(2, KEYSPACE, table,
- "--start-token", Long.toString(intersectingRange.minExclusive),
- "--end-token", Long.toString(intersectingRange.maxInclusive));
- result.asserts()
- .failure()
- .errorContains("Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one");
- if (withNotifications)
- {
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+ //TODO dtest api for this?
+ LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
+ Set<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
+ Range<Token> range = Iterables.getFirst(ranges, null);
+ long left = (long) range.left.getTokenValue();
+ long right = (long) range.right.getTokenValue();
+ return new LongTokenRange(left, right);
+ });
+ LongTokenRange intersectingRange = new LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, table,
+ "--start-token", Long.toString(intersectingRange.minExclusive),
+ "--end-token", Long.toString(intersectingRange.maxInclusive));
result.asserts()
- .notificationContains(ProgressEventType.START, "Starting repair command")
- .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(ProgressEventType.ERROR, "Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one")
- .notificationContains(ProgressEventType.COMPLETE, "finished with error");
- }
+ .failure()
+ .errorContains("Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void unknownHost()
{
String table = tableName("unknownhost");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-
- long repairExceptions = getRepairExceptions(CLUSTER, 2);
- NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "thisreally.should.not.exist.apache.org");
- result.asserts()
- .failure()
- .errorContains("Unknown host specified thisreally.should.not.exist.apache.org");
- if (withNotifications)
- {
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "thisreally.should.not.exist.apache.org");
result.asserts()
- .notificationContains(ProgressEventType.START, "Starting repair command")
- .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(ProgressEventType.ERROR, "Unknown host specified thisreally.should.not.exist.apache.org")
- .notificationContains(ProgressEventType.COMPLETE, "finished with error");
- }
+ .failure()
+ .errorContains("Unknown host specified thisreally.should.not.exist.apache.org");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Unknown host specified thisreally.should.not.exist.apache.org")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void desiredHostNotCoordinator()
{
// current limitation is that the coordinator must be apart of the repair, so as long as that exists this test
// verifies that the validation logic will termniate the repair properly
String table = tableName("desiredhostnotcoordinator");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-
- long repairExceptions = getRepairExceptions(CLUSTER, 2);
- NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "localhost");
- result.asserts()
- .failure()
- .errorContains("The current host must be part of the repair");
- if (withNotifications)
- {
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "localhost");
result.asserts()
- .notificationContains(ProgressEventType.START, "Starting repair command")
- .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(ProgressEventType.ERROR, "The current host must be part of the repair")
- .notificationContains(ProgressEventType.COMPLETE, "finished with error");
- }
+ .failure()
+ .errorContains("The current host must be part of the repair");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "The current host must be part of the repair")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void onlyCoordinator()
{
// this is very similar to ::desiredHostNotCoordinator but has the difference that the only host to do repair
// is the coordinator
String table = tableName("onlycoordinator");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
-
- long repairExceptions = getRepairExceptions(CLUSTER, 2);
- NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", "localhost");
- result.asserts()
- .failure()
- .errorContains("Specified hosts [localhost] do not share range");
- if (withNotifications)
- {
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", "localhost");
result.asserts()
- .notificationContains(ProgressEventType.START, "Starting repair command")
- .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(ProgressEventType.ERROR, "Specified hosts [localhost] do not share range")
- .notificationContains(ProgressEventType.COMPLETE, "finished with error");
- }
+ .failure()
+ .errorContains("Specified hosts [localhost] do not share range");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Specified hosts [localhost] do not share range")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- //TODO should this be marked as fail to match others? Should they not be marked?
- Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ //TODO should this be marked as fail to match others? Should they not be marked?
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void replicationFactorOne()
{
// In the case of rf=1 repair fails to create a cmd handle so node tool exists early
String table = tableName("one");
- // since cluster is shared and this test gets called multiple times, need "IF NOT EXISTS" so the second+ attempt
- // does not fail
- CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS replicationfactor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
- CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s (key text, value text, PRIMARY KEY (key))", table));
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ // since cluster is shared and this test gets called multiple times, need "IF NOT EXISTS" so the second+ attempt
+ // does not fail
+ CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS replicationfactor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
+ CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s (key text, value text, PRIMARY KEY (key))", table));
- long repairExceptions = getRepairExceptions(CLUSTER, 1);
- NodeToolResult result = repair(1, "replicationfactor", table);
- result.asserts()
- .success();
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, "replicationfactor", table);
+ result.asserts()
+ .success();
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 1));
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 1));
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void prepareFailure()
{
String table = tableName("preparefailure");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
- IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
- throw new RuntimeException("prepare fail");
- })).drop();
- try
- {
- long repairExceptions = getRepairExceptions(CLUSTER, 1);
- NodeToolResult result = repair(1, KEYSPACE, table);
- result.asserts()
- .failure()
- .errorContains("Got negative replies from endpoints");
- if (withNotifications)
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
+ throw new RuntimeException("prepare fail");
+ })).drop();
+ try
{
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
result.asserts()
- .notificationContains(ProgressEventType.START, "Starting repair command")
- .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(ProgressEventType.ERROR, "Got negative replies from endpoints")
- .notificationContains(ProgressEventType.COMPLETE, "finished with error");
- }
-
- Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
- if (repairType != RepairType.PREVIEW)
- {
- assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints");
+ .failure()
+ .errorContains("Got negative replies from endpoints");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Got negative replies from endpoints")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
}
- else
+ finally
{
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ filter.off();
}
- }
- finally
- {
- filter.off();
- }
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void snapshotFailure()
{
Assume.assumeFalse("incremental does not do snapshot", repairType == RepairType.INCREMENTAL);
Assume.assumeFalse("Parallel repair does not perform snapshots", parallelism == RepairParallelism.PARALLEL);
String table = tableName("snapshotfailure");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
- IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
- throw new RuntimeException("snapshot fail");
- })).drop();
- try
- {
- long repairExceptions = getRepairExceptions(CLUSTER, 1);
- NodeToolResult result = repair(1, KEYSPACE, table);
- result.asserts()
- .failure();
- if (withNotifications)
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
+ throw new RuntimeException("snapshot fail");
+ })).drop();
+ try
{
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
result.asserts()
- .errorContains("Could not create snapshot")
- .notificationContains(ProgressEventType.START, "Starting repair command")
- .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(ProgressEventType.ERROR, "Could not create snapshot ")
- .notificationContains(ProgressEventType.COMPLETE, "finished with error");
- }
- else
- {
- // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
- // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
- // polls on) is ignored. With notifications off, the poll await fails and queries cmd state, and that
- // will have the below error.
- // NOTE: this isn't desireable, would be good to propgate
- result.asserts()
- .errorContains("Some repair failed");
+ .failure()
+ // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
+ // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
+ // polls on) is ignored. With notifications off or dropped, the poll await fails and queries cmd
+ // state, and that will have the below error.
+ // NOTE: this isn't desireable, would be good to propgate
+ .errorContains("Could not create snapshot", "Some repair failed");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Could not create snapshot ")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Could not create snapshot");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
}
-
- Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
- if (repairType != RepairType.PREVIEW)
+ finally
{
- assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Could not create snapshot");
+ filter.off();
}
- else
- {
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- }
- }
- finally
- {
- filter.off();
- }
+ });
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
similarity index 52%
rename from test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
rename to test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
index 7be8ed1..db01b13 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
@@ -19,17 +19,17 @@
package org.apache.cassandra.distributed.test;
import java.net.UnknownHostException;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
@@ -43,128 +43,107 @@ import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
-public abstract class RepairCoordinatorSlow extends RepairCoordinatorBase
+public abstract class RepairCoordinatorNeighbourDown extends RepairCoordinatorBase
{
- public RepairCoordinatorSlow(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+ public RepairCoordinatorNeighbourDown(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
{
super(repairType, parallelism, withNotifications);
}
- @Test(timeout = 1 * 60 * 1000)
- public void prepareRPCTimeout()
+ @Before
+ public void beforeTest()
{
- String table = tableName("preparerpctimeout");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
- IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).drop();
- try
- {
- long repairExceptions = getRepairExceptions(CLUSTER, 1);
- NodeToolResult result = repair(1, KEYSPACE, table);
- result.asserts()
- .failure()
- .errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
- if (withNotifications)
+ CLUSTER.filters().reset();
+ CLUSTER.forEach(i -> {
+ try
{
- result.asserts()
- .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
- .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
- .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ i.startup();
}
-
- if (repairType != RepairType.PREVIEW)
+ catch (IllegalStateException e)
{
- assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
+ // ignore, node wasn't down
}
- else
- {
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- }
-
- Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
- }
- finally
- {
- filter.off();
- }
+ });
}
- @Test(timeout = 1 * 60 * 1000)
- public void neighbourDown() throws InterruptedException, ExecutionException
+ @Test
+ public void neighbourDown()
{
String table = tableName("neighbourdown");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
- Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
- String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().toString());
- try
- {
- // wait for the node to stop
- shutdownFuture.get();
- // wait for the failure detector to detect this
- CLUSTER.get(1).runOnInstance(() -> {
- InetAddressAndPort neighbor;
- try
- {
- neighbor = InetAddressAndPort.getByName(downNodeAddress);
- }
- catch (UnknownHostException e)
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
+ String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().toString());
+ try
+ {
+ // wait for the node to stop
+ shutdownFuture.get();
+ // wait for the failure detector to detect this
+ CLUSTER.get(1).runOnInstance(() -> {
+ InetAddressAndPort neighbor;
+ try
+ {
+ neighbor = InetAddressAndPort.getByName(downNodeAddress);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ while (FailureDetector.instance.isAlive(neighbor))
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ });
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ result.asserts()
+ .failure()
+ .errorContains("Endpoint not alive");
+ if (withNotifications)
{
- throw new RuntimeException(e);
+ result.asserts()
+ .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
+ .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not alive")
+ .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
}
- while (FailureDetector.instance.isAlive(neighbor))
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- });
- long repairExceptions = getRepairExceptions(CLUSTER, 1);
- NodeToolResult result = repair(1, KEYSPACE, table);
- result.asserts()
- .failure()
- .errorContains("Endpoint not alive");
- if (withNotifications)
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ }
+ finally
{
- result.asserts()
- .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
- .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
- .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not alive")
- .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ CLUSTER.get(2).startup();
}
- Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
- }
- finally
- {
- CLUSTER.get(2).startup();
- }
-
- // make sure to call outside of the try/finally so the node is up so we can actually query
- if (repairType != RepairType.PREVIEW)
- {
- assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint not alive");
- }
- else
- {
- assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
- }
+ // make sure to call outside of the try/finally so the node is up so we can actually query
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint not alive");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+ });
}
- @Test(timeout = 1 * 60 * 1000)
+ @Test
public void validationParticipentCrashesAndComesBack()
{
// Test what happens when a participant restarts in the middle of validation
// Currently this isn't recoverable but could be.
// TODO since this is a real restart, how would I test "long pause"? Can't send SIGSTOP since same procress
String table = tableName("validationparticipentcrashesandcomesback");
- CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
- AtomicReference<Future<Void>> participantShutdown = new AtomicReference<>();
- IMessageFilters.Filter filter = CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
- // the nice thing about this is that this lambda is "capturing" and not "transfer", what this means is that
- // this lambda isn't serialized and any object held isn't copied.
- participantShutdown.set(CLUSTER.get(2).shutdown());
- return true; // drop it so this node doesn't reply before shutdown.
- })).drop();
- try
- {
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ AtomicReference<Future<Void>> participantShutdown = new AtomicReference<>();
+ CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
+ // the nice thing about this is that this lambda is "capturing" and not "transfer", what this means is that
+ // this lambda isn't serialized and any object held isn't copied.
+ participantShutdown.set(CLUSTER.get(2).shutdown());
+ return true; // drop it so this node doesn't reply before shutdown.
+ })).drop();
// since nodetool is blocking, need to handle participantShutdown in the background
CompletableFuture<Void> recovered = CompletableFuture.runAsync(() -> {
try {
@@ -215,15 +194,6 @@ public abstract class RepairCoordinatorSlow extends RepairCoordinatorBase
{
assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
}
- }
- finally
- {
- filter.off();
- try {
- CLUSTER.get(2).startup();
- } catch (Exception e) {
- // if you call startup twice it is allowed to fail, so ignore it... hope this didn't brike the other tests =x
- }
- }
+ });
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
new file mode 100644
index 0000000..7a08187
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
@@ -0,0 +1,67 @@
+package org.apache.cassandra.distributed.test;
+
+import java.time.Duration;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.net.Verb;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
+
+public abstract class RepairCoordinatorTimeout extends RepairCoordinatorBase
+{
+ public RepairCoordinatorTimeout(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(repairType, parallelism, withNotifications);
+ }
+
+ @Before
+ public void beforeTest()
+ {
+ CLUSTER.filters().reset();
+ }
+
+ @Test
+ public void prepareRPCTimeout()
+ {
+ String table = tableName("preparerpctimeout");
+ assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ CLUSTER.verbs(Verb.PREPARE_MSG).drop();
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ result.asserts()
+ .failure()
+ .errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
+ .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
+ .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ });
+ }
+}
diff --git a/test/unit/org/apache/cassandra/utils/AssertUtil.java b/test/unit/org/apache/cassandra/utils/AssertUtil.java
new file mode 100644
index 0000000..4d35ede
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/AssertUtil.java
@@ -0,0 +1,128 @@
+package org.apache.cassandra.utils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+public final class AssertUtil
+{
+ private AssertUtil()
+ {
+
+ }
+
+ /**
+ * Launch the input in another thread, throws a assert failure if it takes longer than the defined timeout.
+ *
+ * An attempt to halt the thread uses an interrupt, but only works if the underline logic respects it.
+ *
+ * The assert message will contain the stacktrace at the time of the timeout; grouped by common threads.
+ */
+ public static void assertTimeoutPreemptively(Duration timeout, Executable fn)
+ {
+ StackTraceElement caller = Thread.currentThread().getStackTrace()[2];
+ assertTimeoutPreemptively(caller, timeout, () -> {
+ fn.execute();
+ return null;
+ });
+ }
+
+ /**
+ * Launch the input in another thread, throws a assert failure if it takes longer than the defined timeout.
+ *
+ * An attempt to halt the thread uses an interrupt, but only works if the underline logic respects it.
+ *
+ * The assert message will contain the stacktrace at the time of the timeout; grouped by common threads.
+ */
+ public static <T> T assertTimeoutPreemptively(Duration timeout, ThrowingSupplier<T> supplier)
+ {
+ StackTraceElement caller = Thread.currentThread().getStackTrace()[2];
+ return assertTimeoutPreemptively(caller, timeout, supplier);
+ }
+
+ private static <T> T assertTimeoutPreemptively(StackTraceElement caller, Duration timeout, ThrowingSupplier<T> supplier)
+ {
+
+ String[] split = caller.getClassName().split("\\.");
+ String simpleClassName = split[split.length - 1];
+ ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("TimeoutTest-" + simpleClassName + "#" + caller.getMethodName()));
+ try
+ {
+ Future<T> future = executorService.submit(() -> {
+ try {
+ return supplier.get();
+ }
+ catch (Throwable throwable) {
+ throw Throwables.throwAsUncheckedException(throwable);
+ }
+ });
+
+ long timeoutInNanos = timeout.toNanos();
+ try
+ {
+ return future.get(timeoutInNanos, TimeUnit.NANOSECONDS);
+ }
+ catch (TimeoutException ex)
+ {
+ future.cancel(true);
+ Map<Thread, StackTraceElement[]> threadDump = Thread.getAllStackTraces();
+ StringBuilder sb = new StringBuilder("execution timed out after ").append(TimeUnit.NANOSECONDS.toMillis(timeoutInNanos)).append(" ms\n");
+ Multimap<List<StackTraceElement>, Thread> groupCommonThreads = HashMultimap.create();
+ for (Map.Entry<Thread, StackTraceElement[]> e : threadDump.entrySet())
+ groupCommonThreads.put(Arrays.asList(e.getValue()), e.getKey());
+
+ for (Map.Entry<List<StackTraceElement>, Collection<Thread>> e : groupCommonThreads.asMap().entrySet())
+ {
+ sb.append("Threads: ");
+ Joiner.on(", ").appendTo(sb, e.getValue().stream().map(Thread::getName).iterator());
+ sb.append("\n");
+ for (StackTraceElement elem : e.getKey())
+ sb.append("\t").append(elem.getClassName()).append(".").append(elem.getMethodName()).append("[").append(elem.getLineNumber()).append("]\n");
+ sb.append("\n");
+ }
+ throw new AssertionError(sb.toString());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw Throwables.throwAsUncheckedException(e);
+ }
+ catch (ExecutionException ex)
+ {
+ throw Throwables.throwAsUncheckedException(ex.getCause());
+ }
+ catch (Throwable ex)
+ {
+ throw Throwables.throwAsUncheckedException(ex);
+ }
+ }
+ finally
+ {
+ executorService.shutdownNow();
+ }
+ }
+
+ public interface ThrowingSupplier<T>
+ {
+ T get() throws Throwable;
+ }
+
+ public interface Executable
+ {
+ void execute() throws Throwable;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org