You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/12 16:40:00 UTC
[cassandra] branch trunk updated: Refactor repair coordinator to
centralize stage change logic and improved the public facing errors
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new cd9fd9e Refactor repair coordinator to centralize stage change logic and improved the public facing errors
cd9fd9e is described below
commit cd9fd9e83f507e2bab5075399d812e3fb4368920
Author: David Capwell <dc...@gmail.com>
AuthorDate: Wed Mar 11 12:06:42 2020 -0700
Refactor repair coordinator to centralize stage change logic and improved the public facing errors
Patch by David Capwell; reviewed by Blake Eggleston, Zhao Yang, Dinesh Joshi, and Alex Petrov for CASSANDRA-15564
---
build.xml | 3 +
.../apache/cassandra/repair/RepairRunnable.java | 497 +++++++++++----------
.../repair/SomeRepairFailedException.java | 24 +-
.../repair/consistent/CoordinatorSession.java | 3 +-
.../cassandra/service/ActiveRepairService.java | 7 +-
src/java/org/apache/cassandra/tools/NodeProbe.java | 11 +
src/java/org/apache/cassandra/tools/NodeTool.java | 4 +-
.../cassandra/distributed/api/ICoordinator.java | 6 +-
.../cassandra/distributed/api/IInstance.java | 9 +-
.../cassandra/distributed/api/IMessageFilters.java | 52 ++-
.../LongTokenRange.java} | 23 +-
.../cassandra/distributed/api/NodeToolResult.java | 182 ++++++++
.../cassandra/distributed/api/QueryResult.java | 139 ++++++
.../org/apache/cassandra/distributed/api/Row.java | 119 +++++
.../distributed/impl/AbstractCluster.java | 16 +-
.../cassandra/distributed/impl/Coordinator.java | 20 +-
.../cassandra/distributed/impl/Instance.java | 111 ++++-
.../cassandra/distributed/impl/MessageFilters.java | 76 +++-
.../mock/nodetool/InternalNodeProbe.java | 32 +-
.../mock/nodetool/InternalNodeProbeFactory.java | 11 +-
.../distributed/test/DistributedRepairUtils.java | 208 +++++++++
.../FullRepairCoordinatorFastTest.java} | 21 +-
.../FullRepairCoordinatorSlowTest.java} | 21 +-
.../IncrementalRepairCoordinatorFastTest.java} | 21 +-
.../IncrementalRepairCoordinatorSlowTest.java} | 21 +-
.../distributed/test/MessageFiltersTest.java | 164 ++++---
.../PreviewRepairCoordinatorFastTest.java} | 21 +-
.../PreviewRepairCoordinatorSlowTest.java} | 21 +-
.../distributed/test/PreviewRepairTest.java | 4 +-
.../distributed/test/RepairCoordinatorBase.java | 102 +++++
.../test/RepairCoordinatorFailingMessageTest.java | 186 ++++++++
.../distributed/test/RepairCoordinatorFast.java | 384 ++++++++++++++++
.../distributed/test/RepairCoordinatorSlow.java | 230 ++++++++++
test/unit/org/apache/cassandra/utils/Retry.java | 222 +++++++++
34 files changed, 2527 insertions(+), 444 deletions(-)
diff --git a/build.xml b/build.xml
index bd1f557..6c816eb 100644
--- a/build.xml
+++ b/build.xml
@@ -541,6 +541,7 @@
<dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
<dependency groupId="junit" artifactId="junit" version="4.12" />
+ <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
<dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
<dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
@@ -675,6 +676,7 @@
artifactId="cassandra-parent"
version="${version}"/>
<dependency groupId="junit" artifactId="junit"/>
+ <dependency groupId="org.mockito" artifactId="mockito-core" />
<dependency groupId="org.quicktheories" artifactId="quicktheories" />
<dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
<dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
@@ -707,6 +709,7 @@
artifactId="cassandra-parent"
version="${version}"/>
<dependency groupId="junit" artifactId="junit"/>
+ <dependency groupId="org.mockito" artifactId="mockito-core" />
<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
<dependency groupId="io.netty" artifactId="netty-all"/>
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index ed7e208..c673a6c 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -19,7 +19,12 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -28,17 +33,18 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.*;
-
-import org.apache.cassandra.locator.EndpointsForRange;
-import org.apache.cassandra.locator.Replica;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,23 +53,26 @@ import com.codahale.metrics.Timer;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.repair.consistent.SyncStatSummary;
-import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.repair.consistent.CoordinatorSession;
-import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.repair.consistent.CoordinatorSession;
+import org.apache.cassandra.repair.consistent.SyncStatSummary;
import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
@@ -81,7 +90,7 @@ import org.apache.cassandra.utils.progress.ProgressEventNotifier;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.cassandra.utils.progress.ProgressListener;
-public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
+public class RepairRunnable implements Runnable, ProgressEventNotifier
{
private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
@@ -91,13 +100,18 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
private final String keyspace;
private final String tag;
- private final AtomicInteger progress = new AtomicInteger();
+ private final AtomicInteger progressCounter = new AtomicInteger();
private final int totalProgress;
+ private final long creationTimeMillis = System.currentTimeMillis();
+ private final UUID parentSession = UUIDGen.getTimeUUID();
+
private final List<ProgressListener> listeners = new ArrayList<>();
private static final AtomicInteger threadCounter = new AtomicInteger(1);
+ private TraceState traceState;
+
public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
{
this.storageService = storageService;
@@ -131,167 +145,243 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
}
}
- protected void fireErrorAndComplete(int progressCount, int totalProgress, String message)
+ public void notification(String msg)
+ {
+ logger.info(msg);
+ fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progressCounter.get(), totalProgress, msg));
+ }
+
+ private void skip(String msg)
{
+ notification("Repair " + parentSession + " skipped: " + msg);
+ success(msg);
+ }
+
+ private void success(String msg)
+ {
+ fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progressCounter.get(), totalProgress, msg));
+ ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
+ ImmutableList.of(msg));
+ complete(null);
+ }
+
+ public void notifyError(Throwable error)
+ {
+ // exception should be ignored
+ if (error instanceof SomeRepairFailedException)
+ return;
+ logger.error("Repair {} failed:", parentSession, error);
+
StorageMetrics.repairExceptions.inc();
- String errorMessage = String.format("Repair command #%d failed with error %s", cmd, message);
- fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, errorMessage));
- String completionMessage = String.format("Repair command #%d finished with error", cmd);
- fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress, completionMessage));
- recordFailure(errorMessage, completionMessage);
+ String errorMessage = String.format("Repair command #%d failed with error %s", cmd, error.getMessage());
+ fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progressCounter.get(), totalProgress, errorMessage));
+
+ // since this can fail, update table only after updating in-memory and notification state
+ maybeStoreParentRepairFailure(error);
}
+ private void fail(String reason)
+ {
+ if (reason == null)
+ reason = "Some repair failed";
+ String completionMessage = String.format("Repair command #%d finished with error", cmd);
+
+ // Note we rely on the first message being the reason for the failure
+ // when inspecting this state from RepairRunner.queryForCompletedRepair
+ ActiveRepairService.instance.recordRepairStatus(cmd, ParentRepairStatus.FAILED,
+ ImmutableList.of(reason, completionMessage));
+
+ complete(completionMessage);
+ }
- protected void runMayThrow() throws Exception
+ private void complete(String msg)
{
- ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
- final TraceState traceState;
- final UUID parentSession = UUIDGen.getTimeUUID();
- final String tag = "repair:" + cmd;
+ long durationMillis = System.currentTimeMillis() - creationTimeMillis;
+ if (msg == null)
+ {
+ String duration = DurationFormatUtils.formatDurationWords(durationMillis, true, true);
+ msg = String.format("Repair command #%d finished in %s", cmd, duration);
+ }
- final AtomicInteger progress = new AtomicInteger();
- final int totalProgress = 4 + options.getRanges().size(); // get valid column families, calculate neighbors, validation, prepare for repair + number of ranges to repair
+ fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progressCounter.get(), totalProgress, msg));
+ logger.info(msg);
- String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
- Iterable<ColumnFamilyStore> validColumnFamilies;
+ ActiveRepairService.instance.removeParentRepairSession(parentSession);
+ TraceState localState = traceState;
+ if (options.isTraced() && localState != null)
+ {
+ for (ProgressListener listener : listeners)
+ localState.removeProgressListener(listener);
+ // Because DebuggableThreadPoolExecutor#afterExecute and this callback
+ // run in a nondeterministic order (within the same thread), the
+ // TraceState may have been nulled out at this point. The TraceState
+ // should be traceState, so just set it without bothering to check if it
+ // actually was nulled out.
+ Tracing.instance.set(localState);
+ Tracing.traceRepair(msg);
+ Tracing.instance.stopSession();
+ }
+
+ Keyspace.open(keyspace).metric.repairTime.update(durationMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public void run()
+ {
try
{
- validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace, columnFamilies);
- progress.incrementAndGet();
+ runMayThrow();
}
- catch (IllegalArgumentException | IOException e)
+ catch (SkipRepairException e)
{
- logger.error("Repair {} failed:", parentSession, e);
- fireErrorAndComplete(progress.get(), totalProgress, e.getMessage());
- return;
+ skip(e.getMessage());
}
-
- if (Iterables.isEmpty(validColumnFamilies))
+ catch (Exception | Error e)
{
- String message = String.format("Empty keyspace, skipping repair: %s", keyspace);
- logger.info(message);
- fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, 0, 0, message));
- return;
+ notifyError(e);
+ fail(e.getMessage());
}
+ }
+
+ private void runMayThrow() throws Exception
+ {
+ ActiveRepairService.instance.recordRepairStatus(cmd, ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
+
+ List<ColumnFamilyStore> columnFamilies = getColumnFamilies();
+ String[] cfnames = columnFamilies.stream().map(cfs -> cfs.name).toArray(String[]::new);
+
+ this.traceState = maybeCreateTraceState(columnFamilies);
+
+ notifyStarting();
+
+ NeighborsAndRanges neighborsAndRanges = getNeighborsAndRanges();
+
+ maybeStoreParentRepairStart(cfnames);
+
+ prepare(columnFamilies, neighborsAndRanges.allNeighbors, neighborsAndRanges.force);
+
+ repair(cfnames, neighborsAndRanges);
+ }
- final long startTime = System.currentTimeMillis();
+ private List<ColumnFamilyStore> getColumnFamilies() throws IOException
+ {
+ String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
+ Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace, columnFamilies);
+ progressCounter.incrementAndGet();
+
+ if (Iterables.isEmpty(validColumnFamilies))
+ throw new SkipRepairException(String.format("Empty keyspace, skipping repair: %s", keyspace));
+ return Lists.newArrayList(validColumnFamilies);
+ }
+
+ private TraceState maybeCreateTraceState(Iterable<ColumnFamilyStore> columnFamilyStores)
+ {
+ if (!options.isTraced())
+ return null;
+
+ StringBuilder cfsb = new StringBuilder();
+ for (ColumnFamilyStore cfs : columnFamilyStores)
+ cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+ UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+ TraceState traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
+ cfsb.substring(2)));
+ traceState.enableActivityNotification(tag);
+ for (ProgressListener listener : listeners)
+ traceState.addProgressListener(listener);
+ Thread queryThread = createQueryThread(cmd, sessionId);
+ queryThread.setName("RepairTracePolling");
+ queryThread.start();
+ return traceState;
+ }
+
+ private void notifyStarting()
+ {
String message = String.format("Starting repair command #%d (%s), repairing keyspace %s with %s", cmd, parentSession, keyspace,
options);
logger.info(message);
- if (options.isTraced())
- {
- StringBuilder cfsb = new StringBuilder();
- for (ColumnFamilyStore cfs : validColumnFamilies)
- cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
-
- UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
- traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
- cfsb.substring(2)));
- message = message + " tracing with " + sessionId;
- fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message));
- Tracing.traceRepair(message);
- traceState.enableActivityNotification(tag);
- for (ProgressListener listener : listeners)
- traceState.addProgressListener(listener);
- Thread queryThread = createQueryThread(cmd, sessionId);
- queryThread.setName("RepairTracePolling");
- queryThread.start();
- }
- else
- {
- fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message));
- traceState = null;
- }
+ Tracing.traceRepair(message);
+ fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message));
+ }
+ private NeighborsAndRanges getNeighborsAndRanges()
+ {
Set<InetAddressAndPort> allNeighbors = new HashSet<>();
List<CommonRange> commonRanges = new ArrayList<>();
- try
- {
- //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
- //calculation multiple times
- Iterable<Range<Token>> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges();
-
- for (Range<Token> range : options.getRanges())
- {
- EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
- options.getDataCenters(),
- options.getHosts());
-
- addRangeToNeighbors(commonRanges, range, neighbors);
- allNeighbors.addAll(neighbors.endpoints());
- }
+ //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Iterable<Range<Token>> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges();
- progress.incrementAndGet();
- }
- catch (IllegalArgumentException e)
+ for (Range<Token> range : options.getRanges())
{
- logger.error("Repair {} failed:", parentSession, e);
- fireErrorAndComplete(progress.get(), totalProgress, e.getMessage());
- return;
- }
+ EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
+ options.getDataCenters(),
+ options.getHosts());
- // Validate columnfamilies
- List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
- try
- {
- Iterables.addAll(columnFamilyStores, validColumnFamilies);
- progress.incrementAndGet();
- }
- catch (IllegalArgumentException e)
- {
- logger.error("Repair {} failed:", parentSession, e);
- fireErrorAndComplete(progress.get(), totalProgress, e.getMessage());
- return;
+ addRangeToNeighbors(commonRanges, range, neighbors);
+ allNeighbors.addAll(neighbors.endpoints());
}
- String[] cfnames = new String[columnFamilyStores.size()];
- for (int i = 0; i < columnFamilyStores.size(); i++)
+ progressCounter.incrementAndGet();
+
+ boolean force = options.isForcedRepair();
+
+ if (force && options.isIncremental())
{
- cfnames[i] = columnFamilyStores.get(i).name;
+ Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive));
+ force = !allNeighbors.equals(actualNeighbors);
+ allNeighbors = actualNeighbors;
}
+ return new NeighborsAndRanges(force, allNeighbors, commonRanges);
+ }
+ private void maybeStoreParentRepairStart(String[] cfnames)
+ {
if (!options.isPreview())
{
SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options);
}
+ }
- boolean force = options.isForcedRepair();
-
- if (force && options.isIncremental())
+ private void maybeStoreParentRepairSuccess(Collection<Range<Token>> successfulRanges)
+ {
+ if (!options.isPreview())
{
- Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive));
- force = !allNeighbors.equals(actualNeighbors);
- allNeighbors = actualNeighbors;
+ SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
}
+ }
- try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time())
+ private void maybeStoreParentRepairFailure(Throwable error)
+ {
+ if (!options.isPreview())
{
- ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, force, columnFamilyStores);
- progress.incrementAndGet();
+ SystemDistributedKeyspace.failParentRepair(parentSession, error);
}
- catch (Throwable t)
+ }
+
+ private void prepare(List<ColumnFamilyStore> columnFamilies, Set<InetAddressAndPort> allNeighbors, boolean force)
+ {
+ try (Timer.Context ignore = Keyspace.open(keyspace).metric.repairPrepareTime.time())
{
- logger.error("Repair {} failed:", parentSession, t);
- if (!options.isPreview())
- {
- SystemDistributedKeyspace.failParentRepair(parentSession, t);
- }
- fireErrorAndComplete(progress.get(), totalProgress, t.getMessage());
- return;
+ ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, force, columnFamilies);
+ progressCounter.incrementAndGet();
}
+ }
+ private void repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges)
+ {
if (options.isPreview())
{
- previewRepair(parentSession, startTime, commonRanges, cfnames);
+ previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.commonRanges, cfnames);
}
else if (options.isIncremental())
{
- incrementalRepair(parentSession, startTime, force, traceState, allNeighbors, commonRanges, cfnames);
+ incrementalRepair(parentSession, creationTimeMillis, neighborsAndRanges.force, traceState,
+ neighborsAndRanges.allNeighbors, neighborsAndRanges.commonRanges, cfnames);
}
else
{
- normalRepair(parentSession, startTime, traceState, commonRanges, cfnames);
+ normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.commonRanges, cfnames);
}
}
@@ -354,7 +444,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
{
List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
- for (CommonRange commonRange: commonRanges)
+ for (CommonRange commonRange : commonRanges)
{
Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
@@ -381,9 +471,9 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
{
// the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
- .addAll(allNeighbors)
- .add(FBUtilities.getBroadcastAddressAndPort())
- .build();
+ .addAll(allNeighbors)
+ .add(FBUtilities.getBroadcastAddressAndPort())
+ .build();
List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
@@ -418,8 +508,14 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
{
try
{
+ if (results == null || results.stream().anyMatch(s -> s == null))
+ {
+ // something failed
+ fail(null);
+ return;
+ }
PreviewKind previewKind = options.getPreviewKind();
- assert previewKind != PreviewKind.NONE;
+ Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
SyncStatSummary summary = new SyncStatSummary(true);
summary.consumeSessionResults(results);
@@ -427,50 +523,31 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
if (summary.isEmpty())
{
message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
- logger.info(message);
- fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message));
}
else
{
- message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
- logger.info(message);
- fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message));
+ message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
}
+ notification(message);
- String successMessage = "Repair preview completed successfully";
- fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
- successMessage));
- String completionMessage = complete();
-
- ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
- ImmutableList.of(message, successMessage, completionMessage));
+ success("Repair preview completed successfully");
}
catch (Throwable t)
{
logger.error("Error completing preview repair", t);
onFailure(t);
}
+ finally
+ {
+ executor.shutdownNow();
+ }
}
public void onFailure(Throwable t)
{
- StorageMetrics.repairExceptions.inc();
- fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
- logger.error("Error completing preview repair", t);
- String completionMessage = complete();
- recordFailure(t.getMessage(), completionMessage);
- }
-
- private String complete()
- {
- logger.debug("Preview repair {} completed", parentSession);
-
- String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
- true, true);
- String message = String.format("Repair preview #%d finished in %s", cmd, duration);
- fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
+ notifyError(t);
+ fail("Error completing preview repair: " + t.getMessage());
executor.shutdownNow();
- return message;
}
}, MoreExecutors.directExecutor());
}
@@ -533,22 +610,16 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
session.ranges().toString());
logger.info(message);
fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
- progress.incrementAndGet(),
+ progressCounter.incrementAndGet(),
totalProgress,
message));
}
public void onFailure(Throwable t)
{
- StorageMetrics.repairExceptions.inc();
-
String message = String.format("Repair session %s for range %s failed with error %s",
session.getId(), session.ranges().toString(), t.getMessage());
- logger.error(message, t);
- fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR,
- progress.incrementAndGet(),
- totalProgress,
- message));
+ notifyError(new RuntimeException(message, t));
}
}
@@ -578,86 +649,26 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
public void onSuccess(Object result)
{
- if (!options.isPreview())
- {
- SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
- }
- final String message;
+ maybeStoreParentRepairSuccess(successfulRanges);
if (hasFailure.get())
{
- StorageMetrics.repairExceptions.inc();
- message = "Some repair failed";
- fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
- message));
+ fail(null);
}
else
{
- message = "Repair completed successfully";
- fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
- message));
- }
- String completionMessage = repairComplete();
- if (hasFailure.get())
- {
- recordFailure(message, completionMessage);
- }
- else
- {
- ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
- ImmutableList.of(message, completionMessage));
+ success("Repair completed successfully");
}
+ executor.shutdownNow();
}
public void onFailure(Throwable t)
{
- StorageMetrics.repairExceptions.inc();
- fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
- if (!options.isPreview())
- {
- SystemDistributedKeyspace.failParentRepair(parentSession, t);
- }
- String completionMessage = repairComplete();
- recordFailure(t.getMessage(), completionMessage);
- }
-
- private String repairComplete()
- {
- ActiveRepairService.instance.removeParentRepairSession(parentSession);
- long durationMillis = System.currentTimeMillis() - startTime;
- String duration = DurationFormatUtils.formatDurationWords(durationMillis,true, true);
- String message = String.format("Repair command #%d finished in %s", cmd, duration);
- fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
- logger.info(message);
- if (options.isTraced() && traceState != null)
- {
- for (ProgressListener listener : listeners)
- traceState.removeProgressListener(listener);
- // Because DebuggableThreadPoolExecutor#afterExecute and this callback
- // run in a nondeterministic order (within the same thread), the
- // TraceState may have been nulled out at this point. The TraceState
- // should be traceState, so just set it without bothering to check if it
- // actually was nulled out.
- Tracing.instance.set(traceState);
- Tracing.traceRepair(message);
- Tracing.instance.stopSession();
- }
+ notifyError(t);
+ fail(t.getMessage());
executor.shutdownNow();
- Keyspace.open(keyspace).metric.repairTime.update(durationMillis, TimeUnit.MILLISECONDS);
- return message;
}
}
- private void recordFailure(String failureMessage, String completionMessage)
- {
- // Note we rely on the first message being the reason for the failure
- // when inspecting this state from RepairRunner.queryForCompletedRepair
- String failure = failureMessage == null ? "unknown failure" : failureMessage;
- String completion = completionMessage == null ? "unknown completion" : completionMessage;
-
- ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.FAILED,
- ImmutableList.of(failure, completion));
- }
-
private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
{
Set<InetAddressAndPort> endpoints = neighbors.endpoints();
@@ -696,7 +707,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
InetAddressAndPort source = FBUtilities.getBroadcastAddressAndPort();
- HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
+ HashSet<UUID>[] seen = new HashSet[]{ new HashSet<>(), new HashSet<>() };
int si = 0;
UUID uuid;
@@ -741,7 +752,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
if (seen[si == 0 ? 1 : 0].contains(uuid))
continue;
String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
- fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
+ notification(message);
}
tlast = tcur;
@@ -751,4 +762,26 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
}
}, "Repair-Runnable-" + threadCounter.incrementAndGet());
}
+
+ private static final class SkipRepairException extends RuntimeException
+ {
+ SkipRepairException(String message)
+ {
+ super(message);
+ }
+ }
+
+ private static final class NeighborsAndRanges
+ {
+ private final boolean force;
+ private final Set<InetAddressAndPort> allNeighbors;
+ private final List<CommonRange> commonRanges;
+
+ private NeighborsAndRanges(boolean force, Set<InetAddressAndPort> allNeighbors, List<CommonRange> commonRanges)
+ {
+ this.force = force;
+ this.allNeighbors = allNeighbors;
+ this.commonRanges = commonRanges;
+ }
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
similarity index 58%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
index f7c9dcf..4b077b8 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
@@ -16,20 +16,16 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.repair;
-import java.io.IOException;
-
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
-
-public class InternalNodeProbeFactory implements INodeProbeFactory
+/**
+ * This is a special exception which states "I know something failed but I don't have access to the failure". This
+ * is mostly used to make sure the error notifications are clean and the history table has a meaningful exception.
+ *
+ * The expected behavior is that when this is thrown, this error should be ignored from history table and not used
+ * for notifications
+ */
+public class SomeRepairFailedException extends RuntimeException
{
- public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
- }
-
- public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
- }
+ public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException();
}
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 39549bd..d0b1f70 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.RepairSessionResult;
+import org.apache.cassandra.repair.SomeRepairFailedException;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
import org.apache.cassandra.repair.messages.FinalizePropose;
@@ -322,7 +323,7 @@ public class CoordinatorSession extends ConsistentSession
logger.debug("Incremental repair {} validation/stream phase completed in {}", sessionID, formatDuration(repairStart, finalizeStart));
}
- return Futures.immediateFailedFuture(new RuntimeException());
+ return Futures.immediateFailedFuture(SomeRepairFailedException.INSTANCE);
}
else
{
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 7499c36..467d2bc 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -520,12 +520,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId)
{
String snapshotName = parentSessionId.toString();
- for (ColumnFamilyStore cfs : getParentRepairSession(parentSessionId).columnFamilyStores.values())
+ ParentRepairSession session = parentRepairSessions.remove(parentSessionId);
+ if (session == null)
+ return null;
+ for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
{
if (cfs.snapshotExists(snapshotName))
cfs.clearSnapshot(snapshotName);
}
- return parentRepairSessions.remove(parentSessionId);
+ return session;
}
public void handleMessage(Message<? extends RepairMessage> message)
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 77efb0e..180b231 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -180,6 +180,13 @@ public class NodeProbe implements AutoCloseable
connect();
}
+ protected NodeProbe()
+ {
+ // this constructor is only used for extensions to rewrite their own connect method
+ this.host = "";
+ this.port = 0;
+ }
+
/**
* Create a connection to the JMX agent and setup the M[X]Bean proxies.
*
@@ -920,6 +927,10 @@ public class NodeProbe implements AutoCloseable
return spProxy;
}
+ public StorageServiceMBean getStorageService() {
+ return ssProxy;
+ }
+
public GossiperMBean getGossProxy()
{
return gossProxy;
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index e29f228..5af3fb1 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -276,13 +276,13 @@ public class NodeTool
}
}
- private static void badUse(Exception e)
+ protected void badUse(Exception e)
{
System.out.println("nodetool: " + e.getMessage());
System.out.println("See 'nodetool help' or 'nodetool help <command>'.");
}
- private static void err(Throwable e)
+ protected void err(Throwable e)
{
System.err.println("error: " + e.getMessage());
System.err.println("-- StackTrace --");
diff --git a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
index ef44853..fe969b1 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -27,7 +27,11 @@ public interface ICoordinator
{
// a bit hacky, but ConsistencyLevel draws in too many dependent classes, so we cannot have a cross-version
// method signature that accepts ConsistencyLevel directly. So we just accept an Enum<?> and cast.
- Object[][] execute(String query, Enum<?> consistencyLevel, Object... boundValues);
+ default Object[][] execute(String query, Enum<?> consistencyLevel, Object... boundValues)
+ {
+ return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays();
+ }
+ QueryResult executeWithResult(String query, Enum<?> consistencyLevel, Object... boundValues);
Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevel, int pageSize, Object... boundValues);
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 8e02e23..23ccd7c 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -43,7 +43,14 @@ public interface IInstance extends IIsolatedExecutor
int liveMemberCount();
- int nodetool(String... commandAndArgs);
+ NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs);
+ default NodeToolResult nodetoolResult(String... commandAndArgs)
+ {
+ return nodetoolResult(true, commandAndArgs);
+ }
+ default int nodetool(String... commandAndArgs) {
+ return nodetoolResult(commandAndArgs).getRc();
+ }
void uncaughtException(Thread t, Throwable e);
/**
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
index 01fe972..f2cd6ee 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.distributed.api;
+import java.util.function.Predicate;
+
public interface IMessageFilters
{
public interface Filter
@@ -31,6 +33,21 @@ public interface IMessageFilters
Builder from(int ... nums);
Builder to(int ... nums);
+ Builder verbs(int... verbs);
+ Builder allVerbs();
+
+ Builder inbound(boolean inbound);
+
+ default Builder inbound()
+ {
+ return inbound(true);
+ }
+
+ default Builder outbound()
+ {
+ return inbound(false);
+ }
+
/**
* Every message for which matcher returns `true` will be _dropped_ (assuming all
* other matchers in the chain will return `true` as well).
@@ -42,15 +59,42 @@ public interface IMessageFilters
public interface Matcher
{
boolean matches(int from, int to, IMessage message);
+
+ static Matcher of(Predicate<IMessage> fn) {
+ return (from, to, m) -> fn.test(m);
+ }
}
- Builder verbs(int... verbs);
- Builder allVerbs();
+ Builder inbound(boolean inbound);
+ default Builder inbound() {
+ return inbound(true);
+ }
+ default Builder outbound() {
+ return inbound(false);
+ }
+ default Builder verbs(int... verbs) {
+ return inbound().verbs(verbs);
+ }
+ default Builder allVerbs() {
+ return inbound().allVerbs();
+ }
void reset();
/**
- * {@code true} value returned by the implementation implies that the message was
+ * Checks if the message should be delivered. This is expected to run on "inbound", or on the reciever of
+ * the message (instance.config.num == to).
+ *
+ * @return {@code true} value returned by the implementation implies that the message was
+ * not matched by any filters and therefore should be delivered.
+ */
+ boolean permitInbound(int from, int to, IMessage msg);
+
+ /**
+ * Checks if the message should be delivered. This is expected to run on "outbound", or on the sender of
+ * the message (instance.config.num == from).
+ *
+ * @return {@code true} value returned by the implementation implies that the message was
* not matched by any filters and therefore should be delivered.
*/
- boolean permit(int from, int to, IMessage msg);
+ boolean permitOutbound(int from, int to, IMessage msg);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/api/LongTokenRange.java
similarity index 61%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/api/LongTokenRange.java
index f7c9dcf..06327e8 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/LongTokenRange.java
@@ -16,20 +16,23 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.api;
-import java.io.IOException;
+import java.io.Serializable;
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
-
-public class InternalNodeProbeFactory implements INodeProbeFactory
+public final class LongTokenRange implements Serializable
{
- public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
+ public final long minExclusive;
+ public final long maxInclusive;
+
+ public LongTokenRange(long minExclusive, long maxInclusive)
+ {
+ this.minExclusive = minExclusive;
+ this.maxInclusive = maxInclusive;
}
- public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ public String toString()
+ {
+ return "(" + minExclusive + "," + maxInclusive + "]";
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
new file mode 100644
index 0000000..9ba1127
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.management.Notification;
+
+import org.junit.Assert;
+
+public class NodeToolResult
+{
+ private final String[] commandAndArgs;
+ private final int rc;
+ private final List<Notification> notifications;
+ private final Throwable error;
+
+ public NodeToolResult(String[] commandAndArgs, int rc, List<Notification> notifications, Throwable error)
+ {
+ this.commandAndArgs = commandAndArgs;
+ this.rc = rc;
+ this.notifications = notifications;
+ this.error = error;
+ }
+
+ public String[] getCommandAndArgs()
+ {
+ return commandAndArgs;
+ }
+
+ public int getRc()
+ {
+ return rc;
+ }
+
+ public List<Notification> getNotifications()
+ {
+ return notifications;
+ }
+
+ public Throwable getError()
+ {
+ return error;
+ }
+
+ public Asserts asserts()
+ {
+ return new Asserts();
+ }
+
+ public final class Asserts {
+ public Asserts success() {
+ Assert.assertEquals("nodetool command " + commandAndArgs[0] + " was not successful", 0, rc);
+ return this;
+ }
+
+ public Asserts failure() {
+ Assert.assertNotEquals("nodetool command " + commandAndArgs[0] + " was successful but not expected to be", 0, rc);
+ return this;
+ }
+
+ public Asserts errorContains(String msg) {
+ Assert.assertNotNull("No exception was found but expected one", error);
+ Assert.assertTrue("Error message '" + error.getMessage() + "' does not contain '" + msg + "'", error.getMessage().contains(msg));
+ return this;
+ }
+
+ public Asserts notificationContains(String msg) {
+ Assert.assertNotNull("notifications not defined", notifications);
+ Assert.assertFalse("notifications not defined", notifications.isEmpty());
+ for (Notification n : notifications) {
+ if (n.getMessage().contains(msg)) {
+ return this;
+ }
+ }
+ Assert.fail("Unable to locate message " + msg + " in notifications: " + notifications);
+ return this; // unreachable
+ }
+
+ public Asserts notificationContains(ProgressEventType type, String msg) {
+ int userType = type.ordinal();
+ Assert.assertNotNull("notifications not defined", notifications);
+ Assert.assertFalse("notifications not defined", notifications.isEmpty());
+ for (Notification n : notifications) {
+ if (notificationType(n) == userType) {
+ if (n.getMessage().contains(msg)) {
+ return this;
+ }
+ }
+ }
+ Assert.fail("Unable to locate message '" + msg + "' in notifications: " + notifications);
+ return this; // unreachable
+ }
+ }
+
+ private static int notificationType(Notification n)
+ {
+ return ((Map<String, Integer>) n.getUserData()).get("type").intValue();
+ }
+
+ public String toString()
+ {
+ return "NodeToolResult{" +
+ "commandAndArgs=" + Arrays.toString(commandAndArgs) +
+ ", rc=" + rc +
+ ", notifications=[" + notifications.stream().map(n -> ProgressEventType.values()[notificationType(n)].name()).collect(Collectors.joining(", ")) + "]" +
+ ", error=" + error +
+ '}';
+ }
+
+ /**
+ * Progress event type.
+ *
+ * <p>
+ * Progress starts by emitting {@link #START}, followed by emitting zero or more {@link #PROGRESS} events,
+ * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link #SUCCESS}.
+ * Progress indicates its completion by emitting {@link #COMPLETE} at the end of process.
+ * </p>
+ * <p>
+ * {@link #NOTIFICATION} event type is used to just notify message without progress.
+ * </p>
+ */
+ public enum ProgressEventType
+ {
+ /**
+ * Fired first when progress starts.
+ * Happens only once.
+ */
+ START,
+
+ /**
+ * Fire when progress happens.
+ * This can be zero or more time after START.
+ */
+ PROGRESS,
+
+ /**
+ * When observing process completes with error, this is sent once before COMPLETE.
+ */
+ ERROR,
+
+ /**
+ * When observing process is aborted by user, this is sent once before COMPLETE.
+ */
+ ABORT,
+
+ /**
+ * When observing process completes successfully, this is sent once before COMPLETE.
+ */
+ SUCCESS,
+
+ /**
+ * Fire when progress complete.
+ * This is fired once, after ERROR/ABORT/SUCCESS is fired.
+ * After this, no more ProgressEvent should be fired for the same event.
+ */
+ COMPLETE,
+
+ /**
+ * Used when sending message without progress.
+ */
+ NOTIFICATION
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/QueryResult.java b/test/distributed/org/apache/cassandra/distributed/api/QueryResult.java
new file mode 100644
index 0000000..dcdfa14
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/api/QueryResult.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * A table of data representing a complete query result.
+ *
+ * A <code>QueryResult</code> is different from {@link java.sql.ResultSet} in several key ways:
+ *
+ * <ul>
+ * <li>represents a complete result rather than a cursor</li>
+ * <li>returns a {@link Row} to access the current row of data</li>
+ * <li>relies on object pooling; {@link #hasNext()} may return the same object just with different data, accessing a
+ * {@link Row} from a previous {@link #hasNext()} call has undefined behavior.</li>
+ * <li>includes {@link #filter(Predicate)}, this will do client side filtering since Apache Cassandra is more
+ * restrictive on server side filtering</li>
+ * </ul>
+ *
+ * <h2>Unsafe patterns</h2>
+ *
+ * Below are a few unsafe patterns which may lead to unexpected results
+ *
+ * <code>{@code
+ * while (rs.hasNext()) {
+ * list.add(rs.next());
+ * }
+ * }</code>
+ *
+ * <code>{@code
+ * rs.forEach(list::add)
+ * }</code>
+ *
+ * Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}. Since the same {@link Row}
+ * object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact
+ * points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
+ * should be used; this will clone the {@link Row} and return a new object pointing to the same data.
+ */
+public class QueryResult implements Iterator<Row>
+{
+ public static final QueryResult EMPTY = new QueryResult(new String[0], null);
+
+ private final String[] names;
+ private final Object[][] results;
+ private final Predicate<Row> filter;
+ private final Row row;
+ private int offset = -1;
+
+ public QueryResult(String[] names, Object[][] results)
+ {
+ this.names = Objects.requireNonNull(names, "names");
+ this.results = results;
+ this.row = new Row(names);
+ this.filter = ignore -> true;
+ }
+
+ private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
+ {
+ this.names = names;
+ this.results = results;
+ this.filter = filter;
+ this.offset = offset;
+ this.row = new Row(names);
+ }
+
+ public String[] getNames()
+ {
+ return names;
+ }
+
+ public boolean isEmpty()
+ {
+ return results.length == 0;
+ }
+
+ public int size()
+ {
+ return results.length;
+ }
+
+ public QueryResult filter(Predicate<Row> fn)
+ {
+ return new QueryResult(names, results, filter.and(fn), offset);
+ }
+
+ /**
+ * Get all rows as a 2d array. Any calls to {@link #filter(Predicate)} will be ignored and the array returned will
+ * be the full set from the query.
+ */
+ public Object[][] toObjectArrays()
+ {
+ return results;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (results == null)
+ return false;
+ while ((offset += 1) < results.length)
+ {
+ row.setResults(results[offset]);
+ if (filter.test(row))
+ {
+ return true;
+ }
+ }
+ row.setResults(null);
+ return false;
+ }
+
+ @Override
+ public Row next()
+ {
+ if (offset < 0 || offset >= results.length)
+ throw new NoSuchElementException();
+ return row;
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Row.java b/test/distributed/org/apache/cassandra/distributed/api/Row.java
new file mode 100644
index 0000000..43fa6d9
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/api/Row.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+import com.carrotsearch.hppc.ObjectIntHashMap;
+import com.carrotsearch.hppc.ObjectIntMap;
+
+/**
+ * Data representing a single row in a query result.
+ *
+ * This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls
+ * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call;
+ * to get around this, a call to {@link #copy()} will return a new object pointing to the same row.
+ */
+public class Row
+{
+ private final ObjectIntMap<String> nameIndex;
+ @Nullable private Object[] results; // mutable to avoid allocations in loops
+
+ public Row(String[] names)
+ {
+ Objects.requireNonNull(names, "names");
+ this.nameIndex = new ObjectIntHashMap<>(names.length);
+ for (int i = 0; i < names.length; i++) {
+ nameIndex.put(names[i], i);
+ }
+ }
+
+ private Row(ObjectIntMap<String> nameIndex)
+ {
+ this.nameIndex = nameIndex;
+ }
+
+ void setResults(@Nullable Object[] results)
+ {
+ this.results = results;
+ }
+
+ /**
+ * Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}.
+ */
+ public Row copy() {
+ Row copy = new Row(nameIndex);
+ copy.setResults(results);
+ return copy;
+ }
+
+ public <T> T get(String name)
+ {
+ checkAccess();
+ int idx = findIndex(name);
+ if (idx == -1)
+ return null;
+ return (T) results[idx];
+ }
+
+ public String getString(String name)
+ {
+ return get(name);
+ }
+
+ public UUID getUUID(String name)
+ {
+ return get(name);
+ }
+
+ public Date getTimestamp(String name)
+ {
+ return get(name);
+ }
+
+ public <T> Set<T> getSet(String name)
+ {
+ return get(name);
+ }
+
+ public String toString()
+ {
+ return "Row{" +
+ "names=" + nameIndex.keys() +
+ ", results=" + Arrays.toString(results) +
+ '}';
+ }
+
+ private void checkAccess()
+ {
+ if (results == null)
+ throw new NoSuchElementException();
+ }
+
+ private int findIndex(String name)
+ {
+ return nameIndex.getOrDefault(name, -1);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index f03bae0..371de54 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Verb;
@@ -151,9 +152,16 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
@Override
public synchronized void startup()
{
+ startup(AbstractCluster.this);
+ }
+
+ public synchronized void startup(ICluster cluster)
+ {
+ if (cluster != AbstractCluster.this)
+ throw new IllegalArgumentException("Only the owning cluster can be used for startup"); //TODO why have this in the API?
if (!isShutdown)
throw new IllegalStateException();
- delegate().startup(AbstractCluster.this);
+ delegate().startup(cluster);
isShutdown = false;
updateMessagingVersions();
}
@@ -183,9 +191,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
throw new IllegalStateException("Cannot get live member count on shutdown instance");
}
- public int nodetool(String... commandAndArgs)
+ public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
{
- return delegate().nodetool(commandAndArgs);
+ return delegate().nodetoolResult(withNotifications, commandAndArgs);
}
public long killAttempts()
@@ -355,7 +363,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
return filters;
}
- public MessageFilters.Builder verbs(Verb... verbs)
+ public IMessageFilters.Builder verbs(Verb... verbs)
{
int[] ids = new int[verbs.length];
for (int i = 0; i < verbs.length; ++i)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 94c7e9e..dee9049 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.QueryResult;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.QueryPager;
@@ -52,9 +53,9 @@ public class Coordinator implements ICoordinator
}
@Override
- public Object[][] execute(String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
+ public QueryResult executeWithResult(String query, Enum<?> consistencyLevel, Object... boundValues)
{
- return instance.sync(() -> executeInternal(query, consistencyLevelOrigin, boundValues)).call();
+ return instance.sync(() -> executeInternal(query, consistencyLevel, boundValues)).call();
}
public Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
@@ -63,7 +64,7 @@ public class Coordinator implements ICoordinator
try
{
Tracing.instance.newSession(sessionId, Collections.emptyMap());
- return executeInternal(query, consistencyLevelOrigin, boundValues);
+ return executeInternal(query, consistencyLevelOrigin, boundValues).toObjectArrays();
}
finally
{
@@ -72,7 +73,7 @@ public class Coordinator implements ICoordinator
}).call();
}
- private Object[][] executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues)
+ private QueryResult executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues)
{
ClientState clientState = makeFakeClientState();
CQLStatement prepared = QueryProcessor.getStatement(query, clientState);
@@ -94,9 +95,16 @@ public class Coordinator implements ICoordinator
System.nanoTime());
if (res != null && res.kind == ResultMessage.Kind.ROWS)
- return RowUtil.toObjects((ResultMessage.Rows) res);
+ {
+ ResultMessage.Rows rows = (ResultMessage.Rows) res;
+ String[] names = rows.result.metadata.names.stream().map(c -> c.name.toString()).toArray(String[]::new);
+ Object[][] results = RowUtil.toObjects(rows);
+ return new QueryResult(names, results);
+ }
else
- return new Object[][]{};
+ {
+ return QueryResult.EMPTY;
+ }
}
public Object[][] executeWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 1beb708..90d747e 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -25,10 +25,13 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
+import javax.management.ListenerNotFoundException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
import com.google.common.annotations.VisibleForTesting;
@@ -58,6 +61,8 @@ import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
@@ -81,6 +86,7 @@ import org.apache.cassandra.service.DefaultFSErrorHandler;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamTransferTask;
import org.apache.cassandra.streaming.async.StreamingInboundHandler;
@@ -199,31 +205,37 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
private void registerMockMessaging(ICluster cluster)
{
- BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
- BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
- if (permitMessage(cluster, to, message))
- deliverToInstance.accept(to, message);
- };
-
MessagingService.instance().outboundSink.add((message, to) -> {
- deliverToInstanceIfNotFiltered.accept(to, serializeMessage(message.from(), to, message));
+ cluster.get(to).receiveMessage(serializeMessage(message.from(), to, message));
return false;
});
}
- // unnecessary if registerMockMessaging used
- private void registerFilter(ICluster cluster)
+ private void registerInboundFilter(ICluster cluster)
{
- MessagingService.instance().outboundSink.add((message, to) -> {
- return permitMessage(cluster, to, serializeMessage(message.from(), to, message));
- });
+ MessagingService.instance().inboundSink.add(message ->
+ permitMessageInbound(cluster, serializeMessage(message.from(), broadcastAddressAndPort(), message)));
}
- private boolean permitMessage(ICluster cluster, InetAddressAndPort to, IMessage message)
+ private void registerOutboundFilter(ICluster cluster)
+ {
+
+ MessagingService.instance().outboundSink.add((message, to) ->
+ permitMessageOutbound(cluster, to, serializeMessage(message.from(), to, message)));
+ }
+
+ private boolean permitMessageInbound(ICluster cluster, IMessage message)
{
int fromNum = cluster.get(message.from()).config().num();
+ int toNum = config.num(); // since this instance is reciving the message, to will always be this instance
+ return cluster.filters().permitInbound(fromNum, toNum, message);
+ }
+
+ private boolean permitMessageOutbound(ICluster cluster, InetAddressAndPort to, IMessage message)
+ {
+ int fromNum = config.num(); // since this instance is sending the message, from will always be this instance
int toNum = cluster.get(to).config().num();
- return cluster.filters().permit(fromNum, toNum, message);
+ return cluster.filters().permitOutbound(fromNum, toNum, message);
}
public void uncaughtException(Thread thread, Throwable throwable)
@@ -274,8 +286,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
Message.Header header = messageIn.header;
TraceState state = Tracing.instance.initializeFromMessage(header);
if (state != null) state.trace("{} message received from {}", header.verb, header.from);
- header.verb.stage.execute(ThrowingRunnable.toRunnable(() -> messageIn.verb().handler().doVerb((Message<Object>) messageIn)),
- ExecutorLocals.create(state));
+ header.verb.stage.execute(() -> {
+ MessagingService.instance().inboundSink.accept(messageIn);
+ }, ExecutorLocals.create(state));
}).run();
}
@@ -362,7 +375,6 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
if (config.has(NETWORK))
{
- registerFilter(cluster);
MessagingService.instance().listen();
}
else
@@ -372,6 +384,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
// -- not sure what that means? SocketFactory.instance.getClass();
registerMockMessaging(cluster);
}
+ registerInboundFilter(cluster);
+ registerOutboundFilter(cluster);
JVMStabilityInspector.replaceKiller(new InstanceKiller());
// TODO: this is more than just gossip
@@ -554,9 +568,66 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}).call();
}
- public int nodetool(String... commandAndArgs)
+ public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
{
- return sync(() -> new NodeTool(new InternalNodeProbeFactory()).execute(commandAndArgs)).call();
+ return sync(() -> {
+ DTestNodeTool nodetool = new DTestNodeTool(withNotifications);
+ int rc = nodetool.execute(commandAndArgs);
+ return new NodeToolResult(commandAndArgs, rc, new ArrayList<>(nodetool.notifications.notifications), nodetool.latestError);
+ }).call();
+ }
+
+ private static class DTestNodeTool extends NodeTool {
+ private final StorageServiceMBean storageProxy;
+ private final CollectingNotificationListener notifications = new CollectingNotificationListener();
+
+ private Throwable latestError;
+
+ DTestNodeTool(boolean withNotifications) {
+ super(new InternalNodeProbeFactory(withNotifications));
+ storageProxy = new InternalNodeProbe(withNotifications).getStorageService();
+ storageProxy.addNotificationListener(notifications, null, null);
+ }
+
+ public int execute(String... args)
+ {
+ try
+ {
+ return super.execute(args);
+ }
+ finally
+ {
+ try
+ {
+ storageProxy.removeNotificationListener(notifications, null, null);
+ }
+ catch (ListenerNotFoundException e)
+ {
+ // ignored
+ }
+ }
+ }
+
+ protected void badUse(Exception e)
+ {
+ super.badUse(e);
+ latestError = e;
+ }
+
+ protected void err(Throwable e)
+ {
+ super.err(e);
+ latestError = e;
+ }
+ }
+
+ private static final class CollectingNotificationListener implements NotificationListener {
+ private final CopyOnWriteArrayList<Notification> notifications = new CopyOnWriteArrayList<>();
+
+ public void handleNotification(Notification notification, Object handback)
+ {
+ notifications.add(notification);
+ }
}
public long killAttempts()
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
index c92553f..b4017e1 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.impl;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.cassandra.distributed.api.IMessage;
@@ -27,9 +28,20 @@ import org.apache.cassandra.distributed.api.IMessageFilters;
public class MessageFilters implements IMessageFilters
{
- private final List<Filter> filters = new CopyOnWriteArrayList<>();
+ private final List<Filter> inboundFilters = new CopyOnWriteArrayList<>();
+ private final List<Filter> outboundFilters = new CopyOnWriteArrayList<>();
- public boolean permit(int from, int to, IMessage msg)
+ public boolean permitInbound(int from, int to, IMessage msg)
+ {
+ return permit(inboundFilters, from, to, msg);
+ }
+
+ public boolean permitOutbound(int from, int to, IMessage msg)
+ {
+ return permit(outboundFilters, from, to, msg);
+ }
+
+ private static boolean permit(List<Filter> filters, int from, int to, IMessage msg)
{
for (Filter filter : filters)
{
@@ -39,14 +51,15 @@ public class MessageFilters implements IMessageFilters
return true;
}
- public class Filter implements IMessageFilters.Filter
+ public static class Filter implements IMessageFilters.Filter
{
+ final List<Filter> parent;
final int[] from;
final int[] to;
final int[] verbs;
final Matcher matcher;
- Filter(int[] from, int[] to, int[] verbs, Matcher matcher)
+ private Filter(List<Filter> parent, int[] from, int[] to, int[] verbs, Matcher matcher)
{
if (from != null)
{
@@ -63,6 +76,7 @@ public class MessageFilters implements IMessageFilters
verbs = verbs.clone();
Arrays.sort(verbs);
}
+ this.parent = Objects.requireNonNull(parent, "parent");
this.from = from;
this.to = to;
this.verbs = verbs;
@@ -73,7 +87,8 @@ public class MessageFilters implements IMessageFilters
{
return (from == null ? 0 : Arrays.hashCode(from))
+ (to == null ? 0 : Arrays.hashCode(to))
- + (verbs == null ? 0 : Arrays.hashCode(verbs));
+ + (verbs == null ? 0 : Arrays.hashCode(verbs)
+ + parent.hashCode());
}
public boolean equals(Object that)
@@ -85,18 +100,19 @@ public class MessageFilters implements IMessageFilters
{
return Arrays.equals(from, that.from)
&& Arrays.equals(to, that.to)
- && Arrays.equals(verbs, that.verbs);
+ && Arrays.equals(verbs, that.verbs)
+ && parent.equals(that.parent);
}
public Filter off()
{
- filters.remove(this);
+ parent.remove(this);
return this;
}
public Filter on()
{
- filters.add(this);
+ parent.add(this);
return this;
}
@@ -111,55 +127,69 @@ public class MessageFilters implements IMessageFilters
public class Builder implements IMessageFilters.Builder
{
+ boolean inbound;
int[] from;
int[] to;
int[] verbs;
Matcher matcher;
- private Builder(int[] verbs)
+ private Builder(boolean inbound)
{
- this.verbs = verbs;
+ this.inbound = inbound;
}
- public Builder from(int... nums)
+ public IMessageFilters.Builder from(int... nums)
{
from = nums;
return this;
}
- public Builder to(int... nums)
+ public IMessageFilters.Builder to(int... nums)
{
to = nums;
return this;
}
+ public IMessageFilters.Builder verbs(int... verbs)
+ {
+ this.verbs = verbs;
+ return this;
+ }
+
+ public IMessageFilters.Builder allVerbs()
+ {
+ this.verbs = null;
+ return this;
+ }
+
+ public IMessageFilters.Builder inbound(boolean inbound)
+ {
+ this.inbound = inbound;
+ return this;
+ }
+
public IMessageFilters.Builder messagesMatching(Matcher matcher)
{
this.matcher = matcher;
return this;
}
- public Filter drop()
+ public IMessageFilters.Filter drop()
{
- return new Filter(from, to, verbs, matcher).on();
+ return new Filter(inbound ? inboundFilters : outboundFilters, from, to, verbs, matcher).on();
}
}
- public Builder verbs(int... verbs)
- {
- return new Builder(verbs);
- }
-
- @Override
- public Builder allVerbs()
+ public IMessageFilters.Builder inbound(boolean inbound)
{
- return new Builder(null);
+ return new Builder(inbound);
}
@Override
public void reset()
{
- filters.clear();
+ inboundFilters.clear();
+ outboundFilters.clear();
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
index 6786519..9ab264e 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
@@ -18,10 +18,10 @@
package org.apache.cassandra.distributed.mock.nodetool;
-import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Iterator;
import java.util.Map;
+import javax.management.ListenerNotFoundException;
import com.google.common.collect.Multimap;
@@ -45,14 +45,19 @@ import org.apache.cassandra.service.CacheServiceMBean;
import org.apache.cassandra.service.GCInspector;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.tools.NodeProbe;
+import org.mockito.Mockito;
public class InternalNodeProbe extends NodeProbe
{
- public InternalNodeProbe() throws IOException
+ private final boolean withNotifications;
+
+ public InternalNodeProbe(boolean withNotifications)
{
- super("", 0);
+ this.withNotifications = withNotifications;
+ connect();
}
protected void connect()
@@ -61,7 +66,26 @@ public class InternalNodeProbe extends NodeProbe
mbeanServerConn = null;
jmxc = null;
- ssProxy = StorageService.instance;
+ if (withNotifications)
+ {
+ ssProxy = StorageService.instance;
+ }
+ else
+ {
+ // replace the notification apis with a no-op method
+ StorageServiceMBean mock = Mockito.spy(StorageService.instance);
+ Mockito.doNothing().when(mock).addNotificationListener(Mockito.any(), Mockito.any(), Mockito.any());
+ try
+ {
+ Mockito.doNothing().when(mock).removeNotificationListener(Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.doNothing().when(mock).removeNotificationListener(Mockito.any());
+ }
+ catch (ListenerNotFoundException e)
+ {
+ throw new AssertionError(e);
+ }
+ ssProxy = mock;
+ }
msProxy = MessagingService.instance();
streamProxy = StreamManager.instance;
compactionProxy = CompactionManager.instance;
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
index f7c9dcf..1904aa7 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
@@ -25,11 +25,18 @@ import org.apache.cassandra.tools.INodeProbeFactory;
public class InternalNodeProbeFactory implements INodeProbeFactory
{
+ private final boolean withNotifications;
+
+ public InternalNodeProbeFactory(boolean withNotifications)
+ {
+ this.withNotifications = withNotifications;
+ }
+
public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
+ return new InternalNodeProbe(withNotifications);
}
public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ return new InternalNodeProbe(withNotifications);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
new file mode 100644
index 0000000..7f162e1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Assert;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.QueryResult;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.distributed.impl.IInvokableInstance;
+import org.apache.cassandra.metrics.StorageMetrics;
+
+import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking;
+
+public final class DistributedRepairUtils
+{
+ public static final int DEFAULT_COORDINATOR = 1;
+
+ private DistributedRepairUtils()
+ {
+
+ }
+
+ public static NodeToolResult repair(AbstractCluster<?> cluster, RepairType repairType, boolean withNotifications, String... args) {
+ return repair(cluster, DEFAULT_COORDINATOR, repairType, withNotifications, args);
+ }
+
+ public static NodeToolResult repair(AbstractCluster<?> cluster, int node, RepairType repairType, boolean withNotifications, String... args) {
+ args = repairType.append(args);
+ args = ArrayUtils.addAll(new String[] { "repair" }, args);
+ return cluster.get(node).nodetoolResult(withNotifications, args);
+ }
+
+ public static <I extends IInvokableInstance, C extends AbstractCluster<I>> long getRepairExceptions(C cluster)
+ {
+ return getRepairExceptions(cluster, DEFAULT_COORDINATOR);
+ }
+
+ public static <I extends IInvokableInstance, C extends AbstractCluster<I>> long getRepairExceptions(C cluster, int node)
+ {
+ return cluster.get(node).callOnInstance(() -> StorageMetrics.repairExceptions.getCount());
+ }
+
+ public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, String ks, String table)
+ {
+ return queryParentRepairHistory(cluster, DEFAULT_COORDINATOR, ks, table);
+ }
+
+ public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+ {
+ // This is kinda brittle since the caller never gets the ID and can't ask for the ID; it needs to infer the id
+ // this logic makes the assumption the ks/table pairs are unique (should be or else create should fail) so any
+ // repair for that pair will be the repair id
+ Set<String> tableNames = table == null? Collections.emptySet() : ImmutableSet.of(table);
+
+ QueryResult rs = retryWithBackoffBlocking(10, () -> cluster.coordinator(coordinator)
+ .executeWithResult("SELECT * FROM system_distributed.parent_repair_history", ConsistencyLevel.QUORUM)
+ .filter(row -> ks.equals(row.getString("keyspace_name")))
+ .filter(row -> tableNames.equals(row.getSet("columnfamily_names"))));
+ return rs;
+ }
+
+ public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks, String table)
+ {
+ assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks, table);
+ }
+
+ public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+ {
+ QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
+ Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
+ }
+
+ public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks)
+ {
+ assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks);
+ }
+
+ public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks)
+ {
+ QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, null);
+ Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
+ }
+
+ public static void assertParentRepairSuccess(AbstractCluster<?> cluster, String ks, String table)
+ {
+ assertParentRepairSuccess(cluster, DEFAULT_COORDINATOR, ks, table);
+ }
+
+ public static void assertParentRepairSuccess(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+ {
+ QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
+ validateExistingParentRepair(rs, row -> {
+ // check completed
+ Assert.assertNotNull("finished_at not found, the repair is not complete?", row.getTimestamp("finished_at"));
+
+ // check not failed (aka success)
+ Assert.assertNull("Exception found", row.getString("exception_stacktrace"));
+ Assert.assertNull("Exception found", row.getString("exception_message"));
+ });
+ }
+
+ public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, String ks, String table, String message)
+ {
+ assertParentRepairFailedWithMessageContains(cluster, DEFAULT_COORDINATOR, ks, table, message);
+ }
+
+ public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, int coordinator, String ks, String table, String message)
+ {
+ QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
+ validateExistingParentRepair(rs, row -> {
+ // check completed
+ Assert.assertNotNull("finished_at not found, the repair is not complete?", row.getTimestamp("finished_at"));
+
+ // check failed
+ Assert.assertNotNull("Exception not found", row.getString("exception_stacktrace"));
+ String exceptionMessage = row.getString("exception_message");
+ Assert.assertNotNull("Exception not found", exceptionMessage);
+
+ Assert.assertTrue("Unable to locate message '" + message + "' in repair error message: " + exceptionMessage, exceptionMessage.contains(message));
+ });
+ }
+
+ private static void validateExistingParentRepair(QueryResult rs, Consumer<Row> fn)
+ {
+ Assert.assertTrue("No rows found", rs.hasNext());
+ Row row = rs.next();
+
+ Assert.assertNotNull("parent_id (which is the primary key) was null", row.getUUID("parent_id"));
+
+ fn.accept(row);
+
+ // make sure no other records found
+ Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext());
+ }
+
+ public enum RepairType {
+ FULL {
+ public String[] append(String... args)
+ {
+ return ArrayUtils.add(args, "--full");
+ }
+ },
+ INCREMENTAL {
+ public String[] append(String... args)
+ {
+ // incremental is the default
+ return args;
+ }
+ },
+ PREVIEW {
+ public String[] append(String... args)
+ {
+ return ArrayUtils.addAll(args, "--preview");
+ }
+ };
+
+ public abstract String[] append(String... args);
+ }
+
+ public enum RepairParallelism {
+ SEQUENTIAL {
+ public String[] append(String... args)
+ {
+ return ArrayUtils.add(args, "--sequential");
+ }
+ },
+ PARALLEL {
+ public String[] append(String... args)
+ {
+ // default is to be parallel
+ return args;
+ }
+ },
+ DATACENTER_AWARE {
+ public String[] append(String... args)
+ {
+ return ArrayUtils.add(args, "--dc-parallel");
+ }
+ };
+
+ public abstract String[] append(String... args);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
similarity index 59%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
index f7c9dcf..e380985 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class FullRepairCoordinatorFastTest extends RepairCoordinatorFast
{
- public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
- }
-
- public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ public FullRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.FULL, parallelism, withNotifications);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
similarity index 59%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
index f7c9dcf..d3904b3 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class FullRepairCoordinatorSlowTest extends RepairCoordinatorSlow
{
- public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
- }
-
- public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ public FullRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.FULL, parallelism, withNotifications);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
similarity index 58%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
index f7c9dcf..7a4c98e 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class IncrementalRepairCoordinatorFastTest extends RepairCoordinatorFast
{
- public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
- }
-
- public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ public IncrementalRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.INCREMENTAL, parallelism, withNotifications);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
similarity index 58%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
index f7c9dcf..7f9b35f 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class IncrementalRepairCoordinatorSlowTest extends RepairCoordinatorSlow
{
- public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
- }
-
- public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ public IncrementalRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.INCREMENTAL, parallelism, withNotifications);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 3d73a5e..814e229 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@ -19,7 +19,10 @@
package org.apache.cassandra.distributed.test;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Sets;
@@ -35,12 +38,30 @@ import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.impl.MessageFilters;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.Verb;
public class MessageFiltersTest extends DistributedTestBase
{
@Test
- public void simpleFiltersTest() throws Throwable
+ public void simpleInboundFiltersTest()
+ {
+ simpleFiltersTest(true);
+ }
+
+ @Test
+ public void simpleOutboundFiltersTest()
+ {
+ simpleFiltersTest(false);
+ }
+
+ private interface Permit
+ {
+ boolean test(int from, int to, IMessage msg);
+ }
+
+ private static void simpleFiltersTest(boolean inbound)
{
int VERB1 = Verb.READ_REQ.id;
int VERB2 = Verb.READ_RSP.id;
@@ -52,61 +73,62 @@ public class MessageFiltersTest extends DistributedTestBase
String MSG2 = "msg2";
MessageFilters filters = new MessageFilters();
- MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+ Permit permit = inbound ? (from, to, msg) -> filters.permitInbound(from, to, msg) : (from, to, msg) -> filters.permitOutbound(from, to, msg);
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
- Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+ IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop();
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1)));
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1)));
+ Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
filter.off();
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
filters.reset();
- filters.verbs(VERB1).from(1).to(2).drop();
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
- Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+ filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop();
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1)));
+ Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1)));
filters.reset();
AtomicInteger counter = new AtomicInteger();
- filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> {
+ filters.verbs(VERB1).inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
counter.incrementAndGet();
return Arrays.equals(msg.bytes(), MSG1.getBytes());
}).drop();
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertEquals(counter.get(), 1);
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG2)));
Assert.assertEquals(counter.get(), 2);
// filter chain gets interrupted because a higher level filter returns no match
- Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
Assert.assertEquals(counter.get(), 2);
- Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+ Assert.assertTrue(permit.test(i2, i1, msg(VERB2, MSG1)));
Assert.assertEquals(counter.get(), 2);
filters.reset();
- filters.allVerbs().from(3, 2).to(2, 1).drop();
- Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
- Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
- Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+ filters.allVerbs().inbound(inbound).from(3, 2).to(2, 1).drop();
+ Assert.assertFalse(permit.test(i3, i1, msg(VERB1, MSG1)));
+ Assert.assertFalse(permit.test(i3, i2, msg(VERB1, MSG1)));
+ Assert.assertFalse(permit.test(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i2, i3, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
filters.reset();
counter.set(0);
- filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+ filters.allVerbs().inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
counter.incrementAndGet();
return false;
}).drop();
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertEquals(2, counter.get());
}
- IMessage msg(int verb, String msg)
+ private static IMessage msg(int verb, String msg)
{
return new IMessage()
{
@@ -166,29 +188,71 @@ public class MessageFiltersTest extends DistributedTestBase
Verb.MUTATION_REQ.id,
Verb.MUTATION_RSP.id));
- // Reads and writes are going to time out in both directions
- IMessageFilters.Filter filter = cluster.filters()
- .allVerbs()
- .from(1)
- .to(2)
- .messagesMatching((from, to, msg) -> {
- // Decode and verify message on instance; return the result back here
- Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> {
- Message decoded = Instance.deserializeMessage(msg);
- return (Integer) decoded.verb().id;
- }).call();
- Assert.assertTrue(verbs.contains(id));
- counter.incrementAndGet();
- return false;
- }).drop();
+ for (boolean inbound : Arrays.asList(true, false))
+ {
+ counter.set(0);
+ // Reads and writes are going to time out in both directions
+ IMessageFilters.Filter filter = cluster.filters()
+ .allVerbs()
+ .inbound(inbound)
+ .from(1)
+ .to(2)
+ .messagesMatching((from, to, msg) -> {
+ // Decode and verify message on instance; return the result back here
+ Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> {
+ Message decoded = Instance.deserializeMessage(msg);
+ return (Integer) decoded.verb().id;
+ }).call();
+ Assert.assertTrue(verbs.contains(id));
+ counter.incrementAndGet();
+ return false;
+ }).drop();
- for (int i : new int[]{ 1, 2 })
- cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
- for (int i : new int[]{ 1, 2 })
- cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+ for (int i : new int[]{ 1, 2 })
+ cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
+ for (int i : new int[]{ 1, 2 })
+ cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+
+ filter.off();
+ Assert.assertEquals(4, counter.get());
+ }
+ }
+ }
+
+ @Test
+ public void outboundBeforeInbound() throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(2))
+ {
+ InetAddressAndPort other = cluster.get(2).broadcastAddressAndPort();
+ CountDownLatch waitForIt = new CountDownLatch(1);
+ Set<Integer> outboundMessagesSeen = new HashSet<>();
+ Set<Integer> inboundMessagesSeen = new HashSet<>();
+ AtomicBoolean outboundAfterInbound = new AtomicBoolean(false);
+ cluster.filters().outbound().verbs(Verb.ECHO_REQ.id, Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
+ outboundMessagesSeen.add(msg.verb());
+ if (inboundMessagesSeen.contains(msg.verb()))
+ outboundAfterInbound.set(true);
+ return false;
+ }).drop(); // drop is confusing since I am not dropping, im just listening...
+ cluster.filters().inbound().verbs(Verb.ECHO_REQ.id, Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
+ inboundMessagesSeen.add(msg.verb());
+ return false;
+ }).drop(); // drop is confusing since I am not dropping, im just listening...
+ cluster.filters().inbound().verbs(Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
+ waitForIt.countDown();
+ return false;
+ }).drop(); // drop is confusing since I am not dropping, im just listening...
+ cluster.get(1).runOnInstance(() -> {
+ MessagingService.instance().send(Message.out(Verb.ECHO_REQ, NoPayload.noPayload), other);
+ });
+
+ waitForIt.await();
- filter.off();
- Assert.assertEquals(4, counter.get());
+ Assert.assertEquals(outboundMessagesSeen, inboundMessagesSeen);
+ // since both are equal, only need to confirm the size of one
+ Assert.assertEquals(2, outboundMessagesSeen.size());
+ Assert.assertFalse("outbound message saw after inbound", outboundAfterInbound.get());
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
similarity index 59%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
index f7c9dcf..bafef05 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class PreviewRepairCoordinatorFastTest extends RepairCoordinatorFast
{
- public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
- }
-
- public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ public PreviewRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.PREVIEW, parallelism, withNotifications);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
similarity index 59%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
index f7c9dcf..2d52475 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class PreviewRepairCoordinatorSlowTest extends RepairCoordinatorSlow
{
- public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
- }
-
- public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ public PreviewRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(RepairType.PREVIEW, parallelism, withNotifications);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index ed29a30..c712948 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -131,7 +131,7 @@ public class PreviewRepairTest extends DistributedTestBase
SimpleCondition continuePreviewRepair = new SimpleCondition();
DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
// this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
- cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+ cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
Future<Pair<Boolean, Boolean>> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
Thread.sleep(1000);
@@ -170,7 +170,7 @@ public class PreviewRepairTest extends DistributedTestBase
// pause preview repair validation messages on node2 until node1 has finished
SimpleCondition continuePreviewRepair = new SimpleCondition();
DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
- cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+ cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
// get local ranges to repair two separate ranges:
List<String> localRanges = cluster.get(1).callOnInstance(() -> {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
new file mode 100644
index 0000000..4651376
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+public class RepairCoordinatorBase extends DistributedTestBase
+{
+ protected static Cluster CLUSTER;
+
+ protected final RepairType repairType;
+ protected final RepairParallelism parallelism;
+ protected final boolean withNotifications;
+
+ public RepairCoordinatorBase(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+ {
+ this.repairType = repairType;
+ this.parallelism = parallelism;
+ this.withNotifications = withNotifications;
+ }
+
+ @Parameterized.Parameters(name = "{0}/{1}")
+ public static Collection<Object[]> testsWithoutType()
+ {
+ List<Object[]> tests = new ArrayList<>();
+ for (RepairParallelism p : RepairParallelism.values())
+ {
+ tests.add(new Object[] { p, true });
+ tests.add(new Object[] { p, false });
+ }
+ return tests;
+ }
+
+ @BeforeClass
+ public static void before()
+ {
+ // This only works because the way CI works
+ // In CI a new JVM is spun up for each test file, so this doesn't have to worry about another test file
+ // getting this set first
+ System.setProperty("cassandra.nodetool.jmx_notification_poll_interval_seconds", "1");
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws IOException
+ {
+ // streaming requires networking ATM
+ // streaming also requires gossip or isn't setup properly
+ CLUSTER = init(Cluster.build(2)
+ .withConfig(c -> c.with(Feature.NETWORK)
+ .with(Feature.GOSSIP))
+ .start());
+ }
+
+ @AfterClass
+ public static void teardownCluster()
+ {
+ if (CLUSTER != null)
+ CLUSTER.close();
+ }
+
+ protected String tableName(String prefix) {
+ return prefix + "_" + postfix();
+ }
+
+ protected String postfix()
+ {
+ return repairType.name().toLowerCase() + "_" + parallelism.name().toLowerCase() + "_" + withNotifications;
+ }
+
+ protected NodeToolResult repair(int node, String... args) {
+ return DistributedRepairUtils.repair(CLUSTER, node, repairType, withNotifications, parallelism.append(args));
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java
new file mode 100644
index 0000000..ac31334
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.net.Verb;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+
+@RunWith(Parameterized.class)
+@Ignore("Until CASSANDRA-15566 is in these tests all time out")
+public class RepairCoordinatorFailingMessageTest extends DistributedTestBase implements Serializable
+{
+ private static Cluster CLUSTER;
+
+ private final RepairType repairType;
+ private final boolean withNotifications;
+
+ public RepairCoordinatorFailingMessageTest(RepairType repairType, boolean withNotifications)
+ {
+ this.repairType = repairType;
+ this.withNotifications = withNotifications;
+ }
+
+ @Parameterized.Parameters(name = "{0}/{1}")
+ public static Collection<Object[]> messages()
+ {
+ List<Object[]> tests = new ArrayList<>();
+ for (RepairType type : RepairType.values())
+ {
+ tests.add(new Object[] { type, true });
+ tests.add(new Object[] { type, false });
+ }
+ return tests;
+ }
+
+ @BeforeClass
+ public static void before()
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws IOException
+ {
+ // streaming requires networking ATM
+ // streaming also requires gossip or isn't setup properly
+ CLUSTER = init(Cluster.build(3) // set to 3 so streaming hits non-local case
+ .withConfig(c -> c.with(Feature.NETWORK)
+ .with(Feature.GOSSIP))
+ .start());
+ }
+
+ @AfterClass
+ public static void teardownCluster()
+ {
+ if (CLUSTER != null)
+ CLUSTER.close();
+ }
+
+ private String tableName(String prefix) {
+ return prefix + "_" + postfix() + "_" + withNotifications;
+ }
+
+ private String postfix()
+ {
+ return repairType.name().toLowerCase();
+ }
+
+ private NodeToolResult repair(int node, String... args) {
+ return DistributedRepairUtils.repair(CLUSTER, node, repairType, withNotifications, args);
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void prepareIrFailure()
+ {
+ Assume.assumeTrue("The Verb.PREPARE_CONSISTENT_REQ is only for incremental, so disable in non-incremental", repairType == RepairType.INCREMENTAL);
+ // Wait, isn't this copy paste of RepairCoordinatorTest::prepareFailure? NO!
+ // Incremental repair sends the PREPARE message the same way full does, but then after it does it sends
+ // a consistent prepare message... and that one doesn't handle errors...
+ CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".prepareirfailure (key text, value text, PRIMARY KEY (key))");
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_CONSISTENT_REQ).messagesMatching(of(m -> {
+ throw new RuntimeException("prepare fail");
+ })).drop();
+ try
+ {
+ NodeToolResult result = repair(1, KEYSPACE, "prepareirfailure");
+ result.asserts()
+ .failure()
+ .errorContains("error prepare fail")
+ .notificationContains(NodeToolResult.ProgressEventType.ERROR, "error prepare fail")
+ .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ }
+ finally
+ {
+ filter.off();
+ }
+ }
+
+ //TODO failure reply murkle tree
+ //TODO failure reply murkle tree IR
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void validationFailure()
+ {
+ String table = tableName("validationfailure");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.VALIDATION_REQ).messagesMatching(of(m -> {
+ throw new RuntimeException("validation fail");
+ })).drop();
+ try
+ {
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ result.asserts()
+ .failure()
+ .errorContains("Some repair failed")
+ .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Some repair failed")
+ .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ }
+ finally
+ {
+ filter.off();
+ }
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void streamFailure()
+ {
+ String table = tableName("streamfailure");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ // there needs to be a difference to cause streaming to happen, so add to one node
+ CLUSTER.get(2).executeInternal(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), "some data");
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SYNC_REQ).messagesMatching(of(m -> {
+ throw new RuntimeException("stream fail");
+ })).drop();
+ try
+ {
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ result.asserts()
+ .failure()
+ .errorContains("Some repair failed")
+ .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Some repair failed")
+ .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ }
+ finally
+ {
+ filter.off();
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
new file mode 100644
index 0000000..edcb9c3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.LongTokenRange;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.NodeToolResult.ProgressEventType;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.StorageService;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+
+public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
+{
+ public RepairCoordinatorFast(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(repairType, parallelism, withNotifications);
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void simple() {
+ String table = tableName("simple");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY KEY (key))", KEYSPACE, table));
+ CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, table);
+ result.asserts().success();
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.SUCCESS, repairType != RepairType.PREVIEW ? "Repair completed successfully": "Repair preview completed successfully")
+ .notificationContains(ProgressEventType.COMPLETE, "finished");
+ }
+
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void missingKeyspace()
+ {
+ // as of this moment the check is done in nodetool so the JMX notifications are not imporant
+ // nor is the history stored
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, "doesnotexist");
+ result.asserts()
+ .failure()
+ .errorContains("Keyspace [doesnotexist] does not exist.");
+
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+
+ assertParentRepairNotExist(CLUSTER, "doesnotexist");
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void missingTable()
+ {
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, "doesnotexist");
+ result.asserts()
+ .failure();
+ if (withNotifications)
+ {
+ result.asserts()
+ .errorContains("failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
+ // Start notification is ignored since this is checked during setup (aka before start)
+ .notificationContains(ProgressEventType.ERROR, "failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void noTablesToRepair()
+ {
+ // index CF currently don't support repair, so they get dropped when listed
+ // this is done in this test to cause the keyspace to have 0 tables to repair, which causes repair to no-op
+ // early and skip.
+ String table = tableName("withindex");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s (value)", postfix(), KEYSPACE, table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ // if CF has a . in it, it is assumed to be a 2i which rejects repairs
+ NodeToolResult result = repair(2, KEYSPACE, table + ".value");
+ result.asserts().success();
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains("Empty keyspace")
+ .notificationContains("skipping repair: " + KEYSPACE)
+ // Start notification is ignored since this is checked during setup (aka before start)
+ .notificationContains(ProgressEventType.SUCCESS, "Empty keyspace") // will fail since success isn't returned; only complete
+ .notificationContains(ProgressEventType.COMPLETE, "finished"); // will fail since it doesn't do this
+ }
+
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
+
+ // this is actually a SKIP and not a FAILURE, so shouldn't increment
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void intersectingRange()
+ {
+ // this test exists to show that this case will cause repair to finish; success or failure isn't imporant
+ // if repair is enhanced to allow intersecting ranges w/ local then this test will fail saying that we expected
+ // repair to fail but it didn't, this would be fine and this test should be updated to reflect the new
+ // semantic
+ String table = tableName("intersectingrange");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+ //TODO dtest api for this?
+ LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
+ Set<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
+ Range<Token> range = Iterables.getFirst(ranges, null);
+ long left = (long) range.left.getTokenValue();
+ long right = (long) range.right.getTokenValue();
+ return new LongTokenRange(left, right);
+ });
+ LongTokenRange intersectingRange = new LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, table,
+ "--start-token", Long.toString(intersectingRange.minExclusive),
+ "--end-token", Long.toString(intersectingRange.maxInclusive));
+ result.asserts()
+ .failure()
+ .errorContains("Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void unknownHost()
+ {
+ String table = tableName("unknownhost");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "thisreally.should.not.exist.apache.org");
+ result.asserts()
+ .failure()
+ .errorContains("Unknown host specified thisreally.should.not.exist.apache.org");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Unknown host specified thisreally.should.not.exist.apache.org")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void desiredHostNotCoordinator()
+ {
+ // current limitation is that the coordinator must be apart of the repair, so as long as that exists this test
+ // verifies that the validation logic will termniate the repair properly
+ String table = tableName("desiredhostnotcoordinator");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "localhost");
+ result.asserts()
+ .failure()
+ .errorContains("The current host must be part of the repair");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "The current host must be part of the repair")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void onlyCoordinator()
+ {
+ // this is very similar to ::desiredHostNotCoordinator but has the difference that the only host to do repair
+ // is the coordinator
+ String table = tableName("onlycoordinator");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 2);
+ NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", "localhost");
+ result.asserts()
+ .failure()
+ .errorContains("Specified hosts [localhost] do not share range");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Specified hosts [localhost] do not share range")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+ //TODO should this be marked as fail to match others? Should they not be marked?
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void replicationFactorOne()
+ {
+ // In the case of rf=1 repair fails to create a cmd handle so node tool exists early
+ String table = tableName("one");
+ // since cluster is shared and this test gets called multiple times, need "IF NOT EXISTS" so the second+ attempt
+ // does not fail
+ CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS replicationfactor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
+ CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s (key text, value text, PRIMARY KEY (key))", table));
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, "replicationfactor", table);
+ result.asserts()
+ .success();
+
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+ Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 1));
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void prepareFailure()
+ {
+ String table = tableName("preparefailure");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
+ throw new RuntimeException("prepare fail");
+ })).drop();
+ try
+ {
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ result.asserts()
+ .failure()
+ .errorContains("Got negative replies from endpoints");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Got negative replies from endpoints")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+ }
+ finally
+ {
+ filter.off();
+ }
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void snapshotFailure()
+ {
+ Assume.assumeFalse("incremental does not do snapshot", repairType == RepairType.INCREMENTAL);
+ Assume.assumeFalse("Parallel repair does not perform snapshots", parallelism == RepairParallelism.PARALLEL);
+
+ String table = tableName("snapshotfailure");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
+ throw new RuntimeException("snapshot fail");
+ })).drop();
+ try
+ {
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ result.asserts()
+ .failure();
+ if (withNotifications)
+ {
+ result.asserts()
+ .errorContains("Could not create snapshot")
+ .notificationContains(ProgressEventType.START, "Starting repair command")
+ .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(ProgressEventType.ERROR, "Could not create snapshot ")
+ .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+ }
+ else
+ {
+ // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
+ // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
+ // polls on) is ignored. With notifications off, the poll await fails and queries cmd state, and that
+ // will have the below error.
+ // NOTE: this isn't desireable, would be good to propgate
+ result.asserts()
+ .errorContains("Some repair failed");
+ }
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Could not create snapshot");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+ }
+ finally
+ {
+ filter.off();
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
new file mode 100644
index 0000000..32538ec
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.impl.MessageFilters;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+
+public abstract class RepairCoordinatorSlow extends RepairCoordinatorBase
+{
+ public RepairCoordinatorSlow(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+ {
+ super(repairType, parallelism, withNotifications);
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void prepareRPCTimeout()
+ {
+ String table = tableName("preparerpctimeout");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).drop();
+ try
+ {
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ result.asserts()
+ .failure()
+ .errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
+ .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
+ .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ }
+ finally
+ {
+ filter.off();
+ }
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void neighbourDown() throws InterruptedException, ExecutionException
+ {
+ String table = tableName("neighbourdown");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
+ String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().toString());
+ try
+ {
+ // wait for the node to stop
+ shutdownFuture.get();
+ // wait for the failure detector to detect this
+ CLUSTER.get(1).runOnInstance(() -> {
+ InetAddressAndPort neighbor;
+ try
+ {
+ neighbor = InetAddressAndPort.getByName(downNodeAddress);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ while (FailureDetector.instance.isAlive(neighbor))
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ });
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ result.asserts()
+ .failure()
+ .errorContains("Endpoint not alive");
+ if (withNotifications)
+ {
+ result.asserts()
+ .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
+ .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+ .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not alive")
+ .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ }
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ }
+ finally
+ {
+ CLUSTER.get(2).startup();
+ }
+
+ // make sure to call outside of the try/finally so the node is up so we can actually query
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint not alive");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+ }
+
+ @Test(timeout = 1 * 60 * 1000)
+ public void validationParticipentCrashesAndComesBack()
+ {
+ // Test what happens when a participant restarts in the middle of validation
+ // Currently this isn't recoverable but could be.
+ // TODO since this is a real restart, how would I test "long pause"? Can't send SIGSTOP since same procress
+ String table = tableName("validationparticipentcrashesandcomesback");
+ CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+ AtomicReference<Future<Void>> participantShutdown = new AtomicReference<>();
+ IMessageFilters.Filter filter = CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
+ // the nice thing about this is that this lambda is "capturing" and not "transfer", what this means is that
+ // this lambda isn't serialized and any object held isn't copied.
+ participantShutdown.set(CLUSTER.get(2).shutdown());
+ return true; // drop it so this node doesn't reply before shutdown.
+ })).drop();
+ try
+ {
+ // since nodetool is blocking, need to handle participantShutdown in the background
+ CompletableFuture<Void> recovered = CompletableFuture.runAsync(() -> {
+ try {
+ while (participantShutdown.get() == null) {
+ // event not happened, wait for it
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ Future<Void> f = participantShutdown.get();
+ f.get(); // wait for shutdown to complete
+ CLUSTER.get(2).startup();
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new RuntimeException(e);
+ }
+ });
+
+ long repairExceptions = getRepairExceptions(CLUSTER, 1);
+ NodeToolResult result = repair(1, KEYSPACE, table);
+ recovered.join(); // if recovery didn't happen then the results are not what are being tested, so block here first
+ result.asserts()
+ .failure();
+ if (withNotifications)
+ {
+ result.asserts()
+ .errorContains("Endpoint 127.0.0.2:7012 died")
+ .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint 127.0.0.2:7012 died")
+ .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+ }
+ else
+ {
+ // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
+ // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
+ // polls on) is ignored. With notifications off, the poll await fails and queries cmd state, and that
+ // will have the below error.
+ // NOTE: this isn't desireable, would be good to propgate
+ result.asserts()
+ .errorContains("Some repair failed");
+ }
+
+ Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+ if (repairType != RepairType.PREVIEW)
+ {
+ assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint 127.0.0.2:7012 died");
+ }
+ else
+ {
+ assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+ }
+ }
+ finally
+ {
+ filter.off();
+ try {
+ CLUSTER.get(2).startup();
+ } catch (Exception e) {
+ // if you call startup twice it is allowed to fail, so ignore it... hope this didn't brike the other tests =x
+ }
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/utils/Retry.java b/test/unit/org/apache/cassandra/utils/Retry.java
new file mode 100644
index 0000000..513b0f2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/Retry.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.IntToLongFunction;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+/**
+ * Class for retryable actions.
+ *
+ * @see {@link #retryWithBackoff(int, Supplier, Predicate)}
+ */
+public final class Retry
+{
+ private static final ScheduledExecutorService SCHEDULED = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("RetryScheduler"));
+
+ private Retry()
+ {
+
+ }
+
+ /**
+ * Schedule code to run after the defined duration.
+ *
+ * Since a executor was not defined, the global {@link ForkJoinPool#commonPool()} executor will be used, if this
+ * is not desirable then should use {@link #schedule(Duration, Executor, Runnable)}.
+ *
+ * @param duration how long to delay
+ * @param fn code to run
+ * @return future representing result
+ */
+ public static CompletableFuture<Void> schedule(final Duration duration, final Runnable fn)
+ {
+ return schedule(duration, ForkJoinPool.commonPool(), fn);
+ }
+
+ /**
+ * Schedule code to run after the defined duration on the provided executor.
+ *
+ * @param duration how long to delay
+ * @param executor to run on
+ * @param fn code to run
+ * @return future representing result
+ */
+ public static CompletableFuture<Void> schedule(final Duration duration, final Executor executor, final Runnable fn)
+ {
+ long nanos = duration.toNanos();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ SCHEDULED.schedule(() -> run0(executor, future, fn), nanos, TimeUnit.NANOSECONDS);
+ return future;
+ }
+
+ private static void run0(final Executor executor, final CompletableFuture<Void> future, final Runnable fn)
+ {
+ try
+ {
+ executor.execute(() -> {
+ try
+ {
+ fn.run();
+ future.complete(null);
+ }
+ catch (Exception e)
+ {
+ future.completeExceptionally(e);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ future.completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Continously attempting to call the provided future supplier until successful or until no longer able to retry.
+ *
+ * @param maxRetries to allow
+ * @param fn asyncronous operation to retry
+ * @param retryableException used to say if retry is allowed
+ * @return future representing the result. If retries were not able to get a successful result, the exception is the last exception seen.
+ */
+ public static <A> CompletableFuture<A> retryWithBackoff(final int maxRetries,
+ final Supplier<CompletableFuture<A>> fn,
+ final Predicate<Throwable> retryableException)
+ {
+ CompletableFuture<A> future = new CompletableFuture<>();
+ retryWithBackoff0(future, 0, maxRetries, fn, retryableException, retryCount -> computeSleepTimeMillis(retryCount, 50, 1000));
+ return future;
+ }
+
+ /**
+ * This is the same as {@link #retryWithBackoff(int, Supplier, Predicate)}, but takes a blocking retryable action
+ * and blocks the caller until done.
+ */
+ public static <A> A retryWithBackoffBlocking(final int maxRetries, final Supplier<A> fn)
+ {
+ return retryWithBackoffBlocking(maxRetries, fn, (ignore) -> true);
+ }
+
+ /**
+ * This is the same as {@link #retryWithBackoff(int, Supplier, Predicate)}, but takes a blocking retryable action
+ * and blocks the caller until done.
+ */
+ public static <A> A retryWithBackoffBlocking(final int maxRetries,
+ final Supplier<A> fn,
+ final Predicate<Throwable> retryableException)
+ {
+ return retryWithBackoff(maxRetries, () -> CompletableFuture.completedFuture(fn.get()), retryableException).join();
+ }
+
+ private static <A> void retryWithBackoff0(final CompletableFuture<A> result,
+ final int retryCount,
+ final int maxRetry,
+ final Supplier<CompletableFuture<A>> body,
+ final Predicate<Throwable> retryableException,
+ final IntToLongFunction completeSleep)
+ {
+ try
+ {
+ Consumer<Throwable> attemptRetry = cause -> {
+ if (retryCount >= maxRetry || !retryableException.test(cause))
+ {
+ // too many attempts or exception isn't retryable, so fail
+ result.completeExceptionally(cause);
+ }
+ else
+ {
+ long sleepMillis = completeSleep.applyAsLong(retryCount);
+ schedule(Duration.ofMillis(sleepMillis), () -> {
+ retryWithBackoff0(result, retryCount + 1, maxRetry, body, retryableException, completeSleep);
+ });
+ }
+ };
+
+ // sanity check that the future isn't filled
+ // the most likely cause for this is when the future is composed with other futures (such as .successAsList);
+ // the failure of a different future may cancel this one, so stop running
+ if (result.isDone())
+ {
+ if (!(result.isCancelled() || result.isCompletedExceptionally()))
+ {
+ // the result is success! But we didn't fill it...
+ new RuntimeException("Attempt to retry but found future was successful... aborting " + body).printStackTrace();
+ }
+ return;
+ }
+
+ CompletableFuture<A> future;
+ try
+ {
+ future = body.get();
+ }
+ catch (Exception e)
+ {
+ attemptRetry.accept(e);
+ return;
+ }
+
+ future.whenComplete((success, failure) -> {
+ if (failure == null)
+ {
+ result.complete(success);
+ }
+ else
+ {
+ attemptRetry.accept(failure instanceof CompletionException ? failure.getCause() : failure);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ result.completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Compute a expoential delay based off the retry count and min/max delay.
+ */
+ private static long computeSleepTimeMillis(int retryCount, long baseSleepTimeMillis, long maxSleepMillis)
+ {
+ long baseTime = baseSleepTimeMillis * (1L << retryCount);
+ // its possible that this overflows, so fall back to max;
+ if (baseTime <= 0)
+ {
+ baseTime = maxSleepMillis;
+ }
+ // now make sure this is capped to target max
+ baseTime = Math.min(baseTime, maxSleepMillis);
+
+ return (long) (baseTime * (ThreadLocalRandom.current().nextDouble() + 0.5));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org