You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/12 16:40:00 UTC

[cassandra] branch trunk updated: Refactor repair coordinator to centralize stage change logic and improved the public facing errors

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cd9fd9e  Refactor repair coordinator to centralize stage change logic and improved the public facing errors
cd9fd9e is described below

commit cd9fd9e83f507e2bab5075399d812e3fb4368920
Author: David Capwell <dc...@gmail.com>
AuthorDate: Wed Mar 11 12:06:42 2020 -0700

    Refactor repair coordinator to centralize stage change logic and improved the public facing errors
    
    Patch by David Capwell; reviewed by Blake Eggleston, Zhao Yang, Dinesh Joshi, and Alex Petrov for CASSANDRA-15564
---
 build.xml                                          |   3 +
 .../apache/cassandra/repair/RepairRunnable.java    | 497 +++++++++++----------
 .../repair/SomeRepairFailedException.java          |  24 +-
 .../repair/consistent/CoordinatorSession.java      |   3 +-
 .../cassandra/service/ActiveRepairService.java     |   7 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |  11 +
 src/java/org/apache/cassandra/tools/NodeTool.java  |   4 +-
 .../cassandra/distributed/api/ICoordinator.java    |   6 +-
 .../cassandra/distributed/api/IInstance.java       |   9 +-
 .../cassandra/distributed/api/IMessageFilters.java |  52 ++-
 .../LongTokenRange.java}                           |  23 +-
 .../cassandra/distributed/api/NodeToolResult.java  | 182 ++++++++
 .../cassandra/distributed/api/QueryResult.java     | 139 ++++++
 .../org/apache/cassandra/distributed/api/Row.java  | 119 +++++
 .../distributed/impl/AbstractCluster.java          |  16 +-
 .../cassandra/distributed/impl/Coordinator.java    |  20 +-
 .../cassandra/distributed/impl/Instance.java       | 111 ++++-
 .../cassandra/distributed/impl/MessageFilters.java |  76 +++-
 .../mock/nodetool/InternalNodeProbe.java           |  32 +-
 .../mock/nodetool/InternalNodeProbeFactory.java    |  11 +-
 .../distributed/test/DistributedRepairUtils.java   | 208 +++++++++
 .../FullRepairCoordinatorFastTest.java}            |  21 +-
 .../FullRepairCoordinatorSlowTest.java}            |  21 +-
 .../IncrementalRepairCoordinatorFastTest.java}     |  21 +-
 .../IncrementalRepairCoordinatorSlowTest.java}     |  21 +-
 .../distributed/test/MessageFiltersTest.java       | 164 ++++---
 .../PreviewRepairCoordinatorFastTest.java}         |  21 +-
 .../PreviewRepairCoordinatorSlowTest.java}         |  21 +-
 .../distributed/test/PreviewRepairTest.java        |   4 +-
 .../distributed/test/RepairCoordinatorBase.java    | 102 +++++
 .../test/RepairCoordinatorFailingMessageTest.java  | 186 ++++++++
 .../distributed/test/RepairCoordinatorFast.java    | 384 ++++++++++++++++
 .../distributed/test/RepairCoordinatorSlow.java    | 230 ++++++++++
 test/unit/org/apache/cassandra/utils/Retry.java    | 222 +++++++++
 34 files changed, 2527 insertions(+), 444 deletions(-)

diff --git a/build.xml b/build.xml
index bd1f557..6c816eb 100644
--- a/build.xml
+++ b/build.xml
@@ -541,6 +541,7 @@
 
           <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
           <dependency groupId="junit" artifactId="junit" version="4.12" />
+          <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
           <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
           <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
           <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
@@ -675,6 +676,7 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
+        <dependency groupId="org.mockito" artifactId="mockito-core" />
         <dependency groupId="org.quicktheories" artifactId="quicktheories" />
         <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
         <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
@@ -707,6 +709,7 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
+        <dependency groupId="org.mockito" artifactId="mockito-core" />
         <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
         <dependency groupId="io.netty" artifactId="netty-all"/>
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index ed7e208..c673a6c 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -19,7 +19,12 @@ package org.apache.cassandra.repair;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -28,17 +33,18 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableCollection;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.*;
-
-import org.apache.cassandra.locator.EndpointsForRange;
-import org.apache.cassandra.locator.Replica;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,23 +53,26 @@ import com.codahale.metrics.Timer;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.repair.consistent.SyncStatSummary;
-import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.repair.consistent.CoordinatorSession;
-import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.repair.consistent.CoordinatorSession;
+import org.apache.cassandra.repair.consistent.SyncStatSummary;
 import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
@@ -81,7 +90,7 @@ import org.apache.cassandra.utils.progress.ProgressEventNotifier;
 import org.apache.cassandra.utils.progress.ProgressEventType;
 import org.apache.cassandra.utils.progress.ProgressListener;
 
-public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
+public class RepairRunnable implements Runnable, ProgressEventNotifier
 {
     private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
 
@@ -91,13 +100,18 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
     private final String keyspace;
 
     private final String tag;
-    private final AtomicInteger progress = new AtomicInteger();
+    private final AtomicInteger progressCounter = new AtomicInteger();
     private final int totalProgress;
 
+    private final long creationTimeMillis = System.currentTimeMillis();
+    private final UUID parentSession = UUIDGen.getTimeUUID();
+
     private final List<ProgressListener> listeners = new ArrayList<>();
 
     private static final AtomicInteger threadCounter = new AtomicInteger(1);
 
+    private TraceState traceState;
+
     public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
     {
         this.storageService = storageService;
@@ -131,167 +145,243 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         }
     }
 
-    protected void fireErrorAndComplete(int progressCount, int totalProgress, String message)
+    public void notification(String msg)
+    {
+        logger.info(msg);
+        fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progressCounter.get(), totalProgress, msg));
+    }
+
+    private void skip(String msg)
     {
+        notification("Repair " + parentSession + " skipped: " + msg);
+        success(msg);
+    }
+
+    private void success(String msg)
+    {
+        fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progressCounter.get(), totalProgress, msg));
+        ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
+                                                        ImmutableList.of(msg));
+        complete(null);
+    }
+
+    public void notifyError(Throwable error)
+    {
+        // exception should be ignored
+        if (error instanceof SomeRepairFailedException)
+            return;
+        logger.error("Repair {} failed:", parentSession, error);
+
         StorageMetrics.repairExceptions.inc();
-        String errorMessage = String.format("Repair command #%d failed with error %s", cmd, message);
-        fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, errorMessage));
-        String completionMessage = String.format("Repair command #%d finished with error", cmd);
-        fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress, completionMessage));
-        recordFailure(errorMessage, completionMessage);
+        String errorMessage = String.format("Repair command #%d failed with error %s", cmd, error.getMessage());
+        fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progressCounter.get(), totalProgress, errorMessage));
+
+        // since this can fail, update table only after updating in-memory and notification state
+        maybeStoreParentRepairFailure(error);
     }
 
+    private void fail(String reason)
+    {
+        if (reason == null)
+            reason = "Some repair failed";
+        String completionMessage = String.format("Repair command #%d finished with error", cmd);
+
+        // Note we rely on the first message being the reason for the failure
+        // when inspecting this state from RepairRunner.queryForCompletedRepair
+        ActiveRepairService.instance.recordRepairStatus(cmd, ParentRepairStatus.FAILED,
+                                                        ImmutableList.of(reason, completionMessage));
+
+        complete(completionMessage);
+    }
 
-    protected void runMayThrow() throws Exception
+    private void complete(String msg)
     {
-        ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
-        final TraceState traceState;
-        final UUID parentSession = UUIDGen.getTimeUUID();
-        final String tag = "repair:" + cmd;
+        long durationMillis = System.currentTimeMillis() - creationTimeMillis;
+        if (msg == null)
+        {
+            String duration = DurationFormatUtils.formatDurationWords(durationMillis, true, true);
+            msg = String.format("Repair command #%d finished in %s", cmd, duration);
+        }
 
-        final AtomicInteger progress = new AtomicInteger();
-        final int totalProgress = 4 + options.getRanges().size(); // get valid column families, calculate neighbors, validation, prepare for repair + number of ranges to repair
+        fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progressCounter.get(), totalProgress, msg));
+        logger.info(msg);
 
-        String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
-        Iterable<ColumnFamilyStore> validColumnFamilies;
+        ActiveRepairService.instance.removeParentRepairSession(parentSession);
+        TraceState localState = traceState;
+        if (options.isTraced() && localState != null)
+        {
+            for (ProgressListener listener : listeners)
+                localState.removeProgressListener(listener);
+            // Because DebuggableThreadPoolExecutor#afterExecute and this callback
+            // run in a nondeterministic order (within the same thread), the
+            // TraceState may have been nulled out at this point. The TraceState
+            // should be traceState, so just set it without bothering to check if it
+            // actually was nulled out.
+            Tracing.instance.set(localState);
+            Tracing.traceRepair(msg);
+            Tracing.instance.stopSession();
+        }
+
+        Keyspace.open(keyspace).metric.repairTime.update(durationMillis, TimeUnit.MILLISECONDS);
+    }
+
+    public void run()
+    {
         try
         {
-            validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace, columnFamilies);
-            progress.incrementAndGet();
+            runMayThrow();
         }
-        catch (IllegalArgumentException | IOException e)
+        catch (SkipRepairException e)
         {
-            logger.error("Repair {} failed:", parentSession, e);
-            fireErrorAndComplete(progress.get(), totalProgress, e.getMessage());
-            return;
+            skip(e.getMessage());
         }
-
-        if (Iterables.isEmpty(validColumnFamilies))
+        catch (Exception | Error e)
         {
-            String message = String.format("Empty keyspace, skipping repair: %s", keyspace);
-            logger.info(message);
-            fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, 0, 0, message));
-            return;
+            notifyError(e);
+            fail(e.getMessage());
         }
+    }
+
+    private void runMayThrow() throws Exception
+    {
+        ActiveRepairService.instance.recordRepairStatus(cmd, ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
+
+        List<ColumnFamilyStore> columnFamilies = getColumnFamilies();
+        String[] cfnames = columnFamilies.stream().map(cfs -> cfs.name).toArray(String[]::new);
+
+        this.traceState = maybeCreateTraceState(columnFamilies);
+
+        notifyStarting();
+
+        NeighborsAndRanges neighborsAndRanges = getNeighborsAndRanges();
+
+        maybeStoreParentRepairStart(cfnames);
+
+        prepare(columnFamilies, neighborsAndRanges.allNeighbors, neighborsAndRanges.force);
+
+        repair(cfnames, neighborsAndRanges);
+    }
 
-        final long startTime = System.currentTimeMillis();
+    private List<ColumnFamilyStore> getColumnFamilies() throws IOException
+    {
+        String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
+        Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace, columnFamilies);
+        progressCounter.incrementAndGet();
+
+        if (Iterables.isEmpty(validColumnFamilies))
+            throw new SkipRepairException(String.format("Empty keyspace, skipping repair: %s", keyspace));
+        return Lists.newArrayList(validColumnFamilies);
+    }
+
+    private TraceState maybeCreateTraceState(Iterable<ColumnFamilyStore> columnFamilyStores)
+    {
+        if (!options.isTraced())
+            return null;
+
+        StringBuilder cfsb = new StringBuilder();
+        for (ColumnFamilyStore cfs : columnFamilyStores)
+            cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+        UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+        TraceState traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
+                                                                                 cfsb.substring(2)));
+        traceState.enableActivityNotification(tag);
+        for (ProgressListener listener : listeners)
+            traceState.addProgressListener(listener);
+        Thread queryThread = createQueryThread(cmd, sessionId);
+        queryThread.setName("RepairTracePolling");
+        queryThread.start();
+        return traceState;
+    }
+
+    private void notifyStarting()
+    {
         String message = String.format("Starting repair command #%d (%s), repairing keyspace %s with %s", cmd, parentSession, keyspace,
                                        options);
         logger.info(message);
-        if (options.isTraced())
-        {
-            StringBuilder cfsb = new StringBuilder();
-            for (ColumnFamilyStore cfs : validColumnFamilies)
-                cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
-
-            UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
-            traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
-                                                                          cfsb.substring(2)));
-            message = message + " tracing with " + sessionId;
-            fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message));
-            Tracing.traceRepair(message);
-            traceState.enableActivityNotification(tag);
-            for (ProgressListener listener : listeners)
-                traceState.addProgressListener(listener);
-            Thread queryThread = createQueryThread(cmd, sessionId);
-            queryThread.setName("RepairTracePolling");
-            queryThread.start();
-        }
-        else
-        {
-            fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message));
-            traceState = null;
-        }
+        Tracing.traceRepair(message);
+        fireProgressEvent(new ProgressEvent(ProgressEventType.START, 0, 100, message));
+    }
 
+    private NeighborsAndRanges getNeighborsAndRanges()
+    {
         Set<InetAddressAndPort> allNeighbors = new HashSet<>();
         List<CommonRange> commonRanges = new ArrayList<>();
 
-        try
-        {
-            //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
-            //calculation multiple times
-            Iterable<Range<Token>> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges();
-
-            for (Range<Token> range : options.getRanges())
-            {
-                EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
-                                                                               options.getDataCenters(),
-                                                                               options.getHosts());
-
-                addRangeToNeighbors(commonRanges, range, neighbors);
-                allNeighbors.addAll(neighbors.endpoints());
-            }
+        //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
+        //calculation multiple times
+        Iterable<Range<Token>> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges();
 
-            progress.incrementAndGet();
-        }
-        catch (IllegalArgumentException e)
+        for (Range<Token> range : options.getRanges())
         {
-            logger.error("Repair {} failed:", parentSession, e);
-            fireErrorAndComplete(progress.get(), totalProgress, e.getMessage());
-            return;
-        }
+            EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
+                                                                           options.getDataCenters(),
+                                                                           options.getHosts());
 
-        // Validate columnfamilies
-        List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
-        try
-        {
-            Iterables.addAll(columnFamilyStores, validColumnFamilies);
-            progress.incrementAndGet();
-        }
-        catch (IllegalArgumentException e)
-        {
-            logger.error("Repair {} failed:", parentSession, e);
-            fireErrorAndComplete(progress.get(), totalProgress, e.getMessage());
-            return;
+            addRangeToNeighbors(commonRanges, range, neighbors);
+            allNeighbors.addAll(neighbors.endpoints());
         }
 
-        String[] cfnames = new String[columnFamilyStores.size()];
-        for (int i = 0; i < columnFamilyStores.size(); i++)
+        progressCounter.incrementAndGet();
+
+        boolean force = options.isForcedRepair();
+
+        if (force && options.isIncremental())
         {
-            cfnames[i] = columnFamilyStores.get(i).name;
+            Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive));
+            force = !allNeighbors.equals(actualNeighbors);
+            allNeighbors = actualNeighbors;
         }
+        return new NeighborsAndRanges(force, allNeighbors, commonRanges);
+    }
 
+    private void maybeStoreParentRepairStart(String[] cfnames)
+    {
         if (!options.isPreview())
         {
             SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options);
         }
+    }
 
-        boolean force = options.isForcedRepair();
-
-        if (force && options.isIncremental())
+    private void maybeStoreParentRepairSuccess(Collection<Range<Token>> successfulRanges)
+    {
+        if (!options.isPreview())
         {
-            Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive));
-            force = !allNeighbors.equals(actualNeighbors);
-            allNeighbors = actualNeighbors;
+            SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
         }
+    }
 
-        try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time())
+    private void maybeStoreParentRepairFailure(Throwable error)
+    {
+        if (!options.isPreview())
         {
-            ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, force, columnFamilyStores);
-            progress.incrementAndGet();
+            SystemDistributedKeyspace.failParentRepair(parentSession, error);
         }
-        catch (Throwable t)
+    }
+
+    private void prepare(List<ColumnFamilyStore> columnFamilies, Set<InetAddressAndPort> allNeighbors, boolean force)
+    {
+        try (Timer.Context ignore = Keyspace.open(keyspace).metric.repairPrepareTime.time())
         {
-            logger.error("Repair {} failed:", parentSession, t);
-            if (!options.isPreview())
-            {
-                SystemDistributedKeyspace.failParentRepair(parentSession, t);
-            }
-            fireErrorAndComplete(progress.get(), totalProgress, t.getMessage());
-            return;
+            ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, force, columnFamilies);
+            progressCounter.incrementAndGet();
         }
+    }
 
+    private void repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges)
+    {
         if (options.isPreview())
         {
-            previewRepair(parentSession, startTime, commonRanges, cfnames);
+            previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.commonRanges, cfnames);
         }
         else if (options.isIncremental())
         {
-            incrementalRepair(parentSession, startTime, force, traceState, allNeighbors, commonRanges, cfnames);
+            incrementalRepair(parentSession, creationTimeMillis, neighborsAndRanges.force, traceState,
+                              neighborsAndRanges.allNeighbors, neighborsAndRanges.commonRanges, cfnames);
         }
         else
         {
-            normalRepair(parentSession, startTime, traceState, commonRanges, cfnames);
+            normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.commonRanges, cfnames);
         }
     }
 
@@ -354,7 +444,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         {
             List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
 
-            for (CommonRange commonRange: commonRanges)
+            for (CommonRange commonRange : commonRanges)
             {
                 Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
                 Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
@@ -381,9 +471,9 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
     {
         // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
         Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
-                                           .addAll(allNeighbors)
-                                           .add(FBUtilities.getBroadcastAddressAndPort())
-                                           .build();
+                                                  .addAll(allNeighbors)
+                                                  .add(FBUtilities.getBroadcastAddressAndPort())
+                                                  .build();
 
         List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
 
@@ -418,8 +508,14 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
             {
                 try
                 {
+                    if (results == null || results.stream().anyMatch(s -> s == null))
+                    {
+                        // something failed
+                        fail(null);
+                        return;
+                    }
                     PreviewKind previewKind = options.getPreviewKind();
-                    assert previewKind != PreviewKind.NONE;
+                    Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
                     SyncStatSummary summary = new SyncStatSummary(true);
                     summary.consumeSessionResults(results);
 
@@ -427,50 +523,31 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                     if (summary.isEmpty())
                     {
                         message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
-                        logger.info(message);
-                        fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message));
                     }
                     else
                     {
-                        message =  (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
-                        logger.info(message);
-                        fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, progress.get(), totalProgress, message));
+                        message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
                     }
+                    notification(message);
 
-                    String successMessage = "Repair preview completed successfully";
-                    fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
-                                                        successMessage));
-                    String completionMessage = complete();
-
-                    ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
-                                                           ImmutableList.of(message, successMessage, completionMessage));
+                    success("Repair preview completed successfully");
                 }
                 catch (Throwable t)
                 {
                     logger.error("Error completing preview repair", t);
                     onFailure(t);
                 }
+                finally
+                {
+                    executor.shutdownNow();
+                }
             }
 
             public void onFailure(Throwable t)
             {
-                StorageMetrics.repairExceptions.inc();
-                fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
-                logger.error("Error completing preview repair", t);
-                String completionMessage = complete();
-                recordFailure(t.getMessage(), completionMessage);
-            }
-
-            private String complete()
-            {
-                logger.debug("Preview repair {} completed", parentSession);
-
-                String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
-                                                                          true, true);
-                String message = String.format("Repair preview #%d finished in %s", cmd, duration);
-                fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
+                notifyError(t);
+                fail("Error completing preview repair: " + t.getMessage());
                 executor.shutdownNow();
-                return message;
             }
         }, MoreExecutors.directExecutor());
     }
@@ -533,22 +610,16 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                            session.ranges().toString());
             logger.info(message);
             fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
-                                                progress.incrementAndGet(),
+                                                progressCounter.incrementAndGet(),
                                                 totalProgress,
                                                 message));
         }
 
         public void onFailure(Throwable t)
         {
-            StorageMetrics.repairExceptions.inc();
-
             String message = String.format("Repair session %s for range %s failed with error %s",
                                            session.getId(), session.ranges().toString(), t.getMessage());
-            logger.error(message, t);
-            fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR,
-                                                progress.incrementAndGet(),
-                                                totalProgress,
-                                                message));
+            notifyError(new RuntimeException(message, t));
         }
     }
 
@@ -578,86 +649,26 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
         public void onSuccess(Object result)
         {
-            if (!options.isPreview())
-            {
-                SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
-            }
-            final String message;
+            maybeStoreParentRepairSuccess(successfulRanges);
             if (hasFailure.get())
             {
-                StorageMetrics.repairExceptions.inc();
-                message = "Some repair failed";
-                fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
-                                                    message));
+                fail(null);
             }
             else
             {
-                message = "Repair completed successfully";
-                fireProgressEvent(new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
-                                                    message));
-            }
-            String completionMessage = repairComplete();
-            if (hasFailure.get())
-            {
-                recordFailure(message, completionMessage);
-            }
-            else
-            {
-                ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.COMPLETED,
-                                                       ImmutableList.of(message, completionMessage));
+                success("Repair completed successfully");
             }
+            executor.shutdownNow();
         }
 
         public void onFailure(Throwable t)
         {
-            StorageMetrics.repairExceptions.inc();
-            fireProgressEvent(new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
-            if (!options.isPreview())
-            {
-                SystemDistributedKeyspace.failParentRepair(parentSession, t);
-            }
-            String completionMessage = repairComplete();
-            recordFailure(t.getMessage(), completionMessage);
-        }
-
-        private String repairComplete()
-        {
-            ActiveRepairService.instance.removeParentRepairSession(parentSession);
-            long durationMillis = System.currentTimeMillis() - startTime;
-            String duration = DurationFormatUtils.formatDurationWords(durationMillis,true, true);
-            String message = String.format("Repair command #%d finished in %s", cmd, duration);
-            fireProgressEvent(new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
-            logger.info(message);
-            if (options.isTraced() && traceState != null)
-            {
-                for (ProgressListener listener : listeners)
-                    traceState.removeProgressListener(listener);
-                // Because DebuggableThreadPoolExecutor#afterExecute and this callback
-                // run in a nondeterministic order (within the same thread), the
-                // TraceState may have been nulled out at this point. The TraceState
-                // should be traceState, so just set it without bothering to check if it
-                // actually was nulled out.
-                Tracing.instance.set(traceState);
-                Tracing.traceRepair(message);
-                Tracing.instance.stopSession();
-            }
+            notifyError(t);
+            fail(t.getMessage());
             executor.shutdownNow();
-            Keyspace.open(keyspace).metric.repairTime.update(durationMillis, TimeUnit.MILLISECONDS);
-            return message;
         }
     }
 
-    private void recordFailure(String failureMessage, String completionMessage)
-    {
-        // Note we rely on the first message being the reason for the failure
-        // when inspecting this state from RepairRunner.queryForCompletedRepair
-        String failure = failureMessage == null ? "unknown failure" : failureMessage;
-        String completion = completionMessage == null ? "unknown completion" : completionMessage;
-
-        ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.FAILED,
-                                               ImmutableList.of(failure, completion));
-    }
-
     private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
     {
         Set<InetAddressAndPort> endpoints = neighbors.endpoints();
@@ -696,7 +707,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                 ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
                 InetAddressAndPort source = FBUtilities.getBroadcastAddressAndPort();
 
-                HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
+                HashSet<UUID>[] seen = new HashSet[]{ new HashSet<>(), new HashSet<>() };
                 int si = 0;
                 UUID uuid;
 
@@ -741,7 +752,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                         if (seen[si == 0 ? 1 : 0].contains(uuid))
                             continue;
                         String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
-                        fireProgressEvent(new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
+                        notification(message);
                     }
                     tlast = tcur;
 
@@ -751,4 +762,26 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
             }
         }, "Repair-Runnable-" + threadCounter.incrementAndGet());
     }
+
+    private static final class SkipRepairException extends RuntimeException
+    {
+        SkipRepairException(String message)
+        {
+            super(message);
+        }
+    }
+
+    private static final class NeighborsAndRanges
+    {
+        private final boolean force;
+        private final Set<InetAddressAndPort> allNeighbors;
+        private final List<CommonRange> commonRanges;
+
+        private NeighborsAndRanges(boolean force, Set<InetAddressAndPort> allNeighbors, List<CommonRange> commonRanges)
+        {
+            this.force = force;
+            this.allNeighbors = allNeighbors;
+            this.commonRanges = commonRanges;
+        }
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
similarity index 58%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
index f7c9dcf..4b077b8 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/src/java/org/apache/cassandra/repair/SomeRepairFailedException.java
@@ -16,20 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.repair;
 
-import java.io.IOException;
-
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
-
-public class InternalNodeProbeFactory implements INodeProbeFactory
+/**
+ * This is a special exception which states "I know something failed but I don't have access to the failure". This
+ * is mostly used to make sure the error notifications are clean and the history table has a meaningful exception.
+ *
+ * The expected behavior is that when this is thrown, this error should be ignored from history table and not used
+ * for notifications
+ */
+public class SomeRepairFailedException extends RuntimeException
 {
-    public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
-    }
-
-    public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
-    }
+    public static final SomeRepairFailedException INSTANCE = new SomeRepairFailedException();
 }
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 39549bd..d0b1f70 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.RepairSessionResult;
+import org.apache.cassandra.repair.SomeRepairFailedException;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
 import org.apache.cassandra.repair.messages.FinalizePropose;
@@ -322,7 +323,7 @@ public class CoordinatorSession extends ConsistentSession
                         logger.debug("Incremental repair {} validation/stream phase completed in {}", sessionID, formatDuration(repairStart, finalizeStart));
 
                     }
-                    return Futures.immediateFailedFuture(new RuntimeException());
+                    return Futures.immediateFailedFuture(SomeRepairFailedException.INSTANCE);
                 }
                 else
                 {
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 7499c36..467d2bc 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -520,12 +520,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
     public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId)
     {
         String snapshotName = parentSessionId.toString();
-        for (ColumnFamilyStore cfs : getParentRepairSession(parentSessionId).columnFamilyStores.values())
+        ParentRepairSession session = parentRepairSessions.remove(parentSessionId);
+        if (session == null)
+            return null;
+        for (ColumnFamilyStore cfs : session.columnFamilyStores.values())
         {
             if (cfs.snapshotExists(snapshotName))
                 cfs.clearSnapshot(snapshotName);
         }
-        return parentRepairSessions.remove(parentSessionId);
+        return session;
     }
 
     public void handleMessage(Message<? extends RepairMessage> message)
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 77efb0e..180b231 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -180,6 +180,13 @@ public class NodeProbe implements AutoCloseable
         connect();
     }
 
+    protected NodeProbe()
+    {
+        // this constructor is only used for extensions to rewrite their own connect method
+        this.host = "";
+        this.port = 0;
+    }
+
     /**
      * Create a connection to the JMX agent and setup the M[X]Bean proxies.
      *
@@ -920,6 +927,10 @@ public class NodeProbe implements AutoCloseable
         return spProxy;
     }
 
+    public StorageServiceMBean getStorageService() {
+        return ssProxy;
+    }
+
     public GossiperMBean getGossProxy()
     {
         return gossProxy;
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index e29f228..5af3fb1 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -276,13 +276,13 @@ public class NodeTool
         }
     }
 
-    private static void badUse(Exception e)
+    protected void badUse(Exception e)
     {
         System.out.println("nodetool: " + e.getMessage());
         System.out.println("See 'nodetool help' or 'nodetool help <command>'.");
     }
 
-    private static void err(Throwable e)
+    protected void err(Throwable e)
     {
         System.err.println("error: " + e.getMessage());
         System.err.println("-- StackTrace --");
diff --git a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
index ef44853..fe969b1 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -27,7 +27,11 @@ public interface ICoordinator
 {
     // a bit hacky, but ConsistencyLevel draws in too many dependent classes, so we cannot have a cross-version
     // method signature that accepts ConsistencyLevel directly.  So we just accept an Enum<?> and cast.
-    Object[][] execute(String query, Enum<?> consistencyLevel, Object... boundValues);
+    default Object[][] execute(String query, Enum<?> consistencyLevel, Object... boundValues)
+    {
+        return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays();
+    }
+    QueryResult executeWithResult(String query, Enum<?> consistencyLevel, Object... boundValues);
 
     Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevel, int pageSize, Object... boundValues);
 
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 8e02e23..23ccd7c 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -43,7 +43,14 @@ public interface IInstance extends IIsolatedExecutor
 
     int liveMemberCount();
 
-    int nodetool(String... commandAndArgs);
+    NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs);
+    default NodeToolResult nodetoolResult(String... commandAndArgs)
+    {
+        return nodetoolResult(true, commandAndArgs);
+    }
+    default int nodetool(String... commandAndArgs) {
+        return nodetoolResult(commandAndArgs).getRc();
+    }
     void uncaughtException(Thread t, Throwable e);
 
     /**
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
index 01fe972..f2cd6ee 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.distributed.api;
 
+import java.util.function.Predicate;
+
 public interface IMessageFilters
 {
     public interface Filter
@@ -31,6 +33,21 @@ public interface IMessageFilters
         Builder from(int ... nums);
         Builder to(int ... nums);
 
+        Builder verbs(int... verbs);
+        Builder allVerbs();
+
+        Builder inbound(boolean inbound);
+
+        default Builder inbound()
+        {
+            return inbound(true);
+        }
+
+        default Builder outbound()
+        {
+            return inbound(false);
+        }
+
         /**
          * Every message for which matcher returns `true` will be _dropped_ (assuming all
          * other matchers in the chain will return `true` as well).
@@ -42,15 +59,42 @@ public interface IMessageFilters
     public interface Matcher
     {
         boolean matches(int from, int to, IMessage message);
+
+        static Matcher of(Predicate<IMessage> fn) {
+            return (from, to, m) -> fn.test(m);
+        }
     }
 
-    Builder verbs(int... verbs);
-    Builder allVerbs();
+    Builder inbound(boolean inbound);
+    default Builder inbound() {
+        return inbound(true);
+    }
+    default Builder outbound() {
+        return inbound(false);
+    }
+    default Builder verbs(int... verbs) {
+        return inbound().verbs(verbs);
+    }
+    default Builder allVerbs() {
+        return inbound().allVerbs();
+    }
     void reset();
 
     /**
-     * {@code true} value returned by the implementation implies that the message was
+     * Checks if the message should be delivered.  This is expected to run on "inbound", or on the reciever of
+     * the message (instance.config.num == to).
+     *
+     * @return {@code true} value returned by the implementation implies that the message was
+     * not matched by any filters and therefore should be delivered.
+     */
+    boolean permitInbound(int from, int to, IMessage msg);
+
+    /**
+     * Checks if the message should be delivered.  This is expected to run on "outbound", or on the sender of
+     * the message (instance.config.num == from).
+     *
+     * @return {@code true} value returned by the implementation implies that the message was
      * not matched by any filters and therefore should be delivered.
      */
-    boolean permit(int from, int to, IMessage msg);
+    boolean permitOutbound(int from, int to, IMessage msg);
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/api/LongTokenRange.java
similarity index 61%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/api/LongTokenRange.java
index f7c9dcf..06327e8 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/LongTokenRange.java
@@ -16,20 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.api;
 
-import java.io.IOException;
+import java.io.Serializable;
 
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
-
-public class InternalNodeProbeFactory implements INodeProbeFactory
+public final class LongTokenRange implements Serializable
 {
-    public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
+    public final long minExclusive;
+    public final long maxInclusive;
+
+    public LongTokenRange(long minExclusive, long maxInclusive)
+    {
+        this.minExclusive = minExclusive;
+        this.maxInclusive = maxInclusive;
     }
 
-    public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+    public String toString()
+    {
+        return "(" + minExclusive + "," + maxInclusive + "]";
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
new file mode 100644
index 0000000..9ba1127
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.management.Notification;
+
+import org.junit.Assert;
+
+public class NodeToolResult
+{
+    private final String[] commandAndArgs;
+    private final int rc;
+    private final List<Notification> notifications;
+    private final Throwable error;
+
+    public NodeToolResult(String[] commandAndArgs, int rc, List<Notification> notifications, Throwable error)
+    {
+        this.commandAndArgs = commandAndArgs;
+        this.rc = rc;
+        this.notifications = notifications;
+        this.error = error;
+    }
+
+    public String[] getCommandAndArgs()
+    {
+        return commandAndArgs;
+    }
+
+    public int getRc()
+    {
+        return rc;
+    }
+
+    public List<Notification> getNotifications()
+    {
+        return notifications;
+    }
+
+    public Throwable getError()
+    {
+        return error;
+    }
+
+    public Asserts asserts()
+    {
+        return new Asserts();
+    }
+
+    public final class Asserts {
+        public Asserts success() {
+            Assert.assertEquals("nodetool command " + commandAndArgs[0] + " was not successful", 0, rc);
+            return this;
+        }
+
+        public Asserts failure() {
+            Assert.assertNotEquals("nodetool command " + commandAndArgs[0] + " was successful but not expected to be", 0, rc);
+            return this;
+        }
+
+        public Asserts errorContains(String msg) {
+            Assert.assertNotNull("No exception was found but expected one", error);
+            Assert.assertTrue("Error message '" + error.getMessage() + "' does not contain '" + msg + "'", error.getMessage().contains(msg));
+            return this;
+        }
+
+        public Asserts notificationContains(String msg) {
+            Assert.assertNotNull("notifications not defined", notifications);
+            Assert.assertFalse("notifications not defined", notifications.isEmpty());
+            for (Notification n : notifications) {
+                if (n.getMessage().contains(msg)) {
+                    return this;
+                }
+            }
+            Assert.fail("Unable to locate message " + msg + " in notifications: " + notifications);
+            return this; // unreachable
+        }
+
+        public Asserts notificationContains(ProgressEventType type, String msg) {
+            int userType = type.ordinal();
+            Assert.assertNotNull("notifications not defined", notifications);
+            Assert.assertFalse("notifications not defined", notifications.isEmpty());
+            for (Notification n : notifications) {
+                if (notificationType(n) == userType) {
+                    if (n.getMessage().contains(msg)) {
+                        return this;
+                    }
+                }
+            }
+            Assert.fail("Unable to locate message '" + msg + "' in notifications: " + notifications);
+            return this; // unreachable
+        }
+    }
+
+    private static int notificationType(Notification n)
+    {
+        return ((Map<String, Integer>) n.getUserData()).get("type").intValue();
+    }
+
+    public String toString()
+    {
+        return "NodeToolResult{" +
+               "commandAndArgs=" + Arrays.toString(commandAndArgs) +
+               ", rc=" + rc +
+               ", notifications=[" + notifications.stream().map(n -> ProgressEventType.values()[notificationType(n)].name()).collect(Collectors.joining(", ")) + "]" +
+               ", error=" + error +
+               '}';
+    }
+
+    /**
+     * Progress event type.
+     *
+     * <p>
+     * Progress starts by emitting {@link #START}, followed by emitting zero or more {@link #PROGRESS} events,
+     * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link #SUCCESS}.
+     * Progress indicates its completion by emitting {@link #COMPLETE} at the end of process.
+     * </p>
+     * <p>
+     * {@link #NOTIFICATION} event type is used to just notify message without progress.
+     * </p>
+     */
+    public enum ProgressEventType
+    {
+        /**
+         * Fired first when progress starts.
+         * Happens only once.
+         */
+        START,
+
+        /**
+         * Fire when progress happens.
+         * This can be zero or more time after START.
+         */
+        PROGRESS,
+
+        /**
+         * When observing process completes with error, this is sent once before COMPLETE.
+         */
+        ERROR,
+
+        /**
+         * When observing process is aborted by user, this is sent once before COMPLETE.
+         */
+        ABORT,
+
+        /**
+         * When observing process completes successfully, this is sent once before COMPLETE.
+         */
+        SUCCESS,
+
+        /**
+         * Fire when progress complete.
+         * This is fired once, after ERROR/ABORT/SUCCESS is fired.
+         * After this, no more ProgressEvent should be fired for the same event.
+         */
+        COMPLETE,
+
+        /**
+         * Used when sending message without progress.
+         */
+        NOTIFICATION
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/QueryResult.java b/test/distributed/org/apache/cassandra/distributed/api/QueryResult.java
new file mode 100644
index 0000000..dcdfa14
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/api/QueryResult.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * A table of data representing a complete query result.
+ *
+ * A <code>QueryResult</code> is different from {@link java.sql.ResultSet} in several key ways:
+ *
+ * <ul>
+ *     <li>represents a complete result rather than a cursor</li>
+ *     <li>returns a {@link Row} to access the current row of data</li>
+ *     <li>relies on object pooling; {@link #hasNext()} may return the same object just with different data, accessing a
+ *     {@link Row} from a previous {@link #hasNext()} call has undefined behavior.</li>
+ *     <li>includes {@link #filter(Predicate)}, this will do client side filtering since Apache Cassandra is more
+ *     restrictive on server side filtering</li>
+ * </ul>
+ *
+ * <h2>Unsafe patterns</h2>
+ *
+ * Below are a few unsafe patterns which may lead to unexpected results
+ *
+ * <code>{@code
+ * while (rs.hasNext()) {
+ *   list.add(rs.next());
+ * }
+ * }</code>
+ *
+ * <code>{@code
+ * rs.forEach(list::add)
+ * }</code>
+ *
+ * Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}.  Since the same {@link Row}
+ * object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact
+ * points to newer data.  If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
+ * should be used; this will clone the {@link Row} and return a new object pointing to the same data.
+ */
+public class QueryResult implements Iterator<Row>
+{
+    public static final QueryResult EMPTY = new QueryResult(new String[0], null);
+
+    private final String[] names;
+    private final Object[][] results;
+    private final Predicate<Row> filter;
+    private final Row row;
+    private int offset = -1;
+
+    public QueryResult(String[] names, Object[][] results)
+    {
+        this.names = Objects.requireNonNull(names, "names");
+        this.results = results;
+        this.row = new Row(names);
+        this.filter = ignore -> true;
+    }
+
+    private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
+    {
+        this.names = names;
+        this.results = results;
+        this.filter = filter;
+        this.offset = offset;
+        this.row = new Row(names);
+    }
+
+    public String[] getNames()
+    {
+        return names;
+    }
+
+    public boolean isEmpty()
+    {
+        return results.length == 0;
+    }
+
+    public int size()
+    {
+        return results.length;
+    }
+
+    public QueryResult filter(Predicate<Row> fn)
+    {
+        return new QueryResult(names, results, filter.and(fn), offset);
+    }
+
+    /**
+     * Get all rows as a 2d array.  Any calls to {@link #filter(Predicate)} will be ignored and the array returned will
+     * be the full set from the query.
+     */
+    public Object[][] toObjectArrays()
+    {
+        return results;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        if (results == null)
+            return false;
+        while ((offset += 1) < results.length)
+        {
+            row.setResults(results[offset]);
+            if (filter.test(row))
+            {
+                return true;
+            }
+        }
+        row.setResults(null);
+        return false;
+    }
+
+    @Override
+    public Row next()
+    {
+        if (offset < 0 || offset >= results.length)
+            throw new NoSuchElementException();
+        return row;
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Row.java b/test/distributed/org/apache/cassandra/distributed/api/Row.java
new file mode 100644
index 0000000..43fa6d9
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/api/Row.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+import com.carrotsearch.hppc.ObjectIntHashMap;
+import com.carrotsearch.hppc.ObjectIntMap;
+
+/**
+ * Data representing a single row in a query result.
+ *
+ * This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls
+ * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call;
+ * to get around this, a call to {@link #copy()} will return a new object pointing to the same row.
+ */
+public class Row
+{
+    private final ObjectIntMap<String> nameIndex;
+    @Nullable private Object[] results; // mutable to avoid allocations in loops
+
+    public Row(String[] names)
+    {
+        Objects.requireNonNull(names, "names");
+        this.nameIndex = new ObjectIntHashMap<>(names.length);
+        for (int i = 0; i < names.length; i++) {
+            nameIndex.put(names[i], i);
+        }
+    }
+
+    private Row(ObjectIntMap<String> nameIndex)
+    {
+        this.nameIndex = nameIndex;
+    }
+
+    void setResults(@Nullable Object[] results)
+    {
+        this.results = results;
+    }
+
+    /**
+     * Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}.
+     */
+    public Row copy() {
+        Row copy = new Row(nameIndex);
+        copy.setResults(results);
+        return copy;
+    }
+
+    public <T> T get(String name)
+    {
+        checkAccess();
+        int idx = findIndex(name);
+        if (idx == -1)
+            return null;
+        return (T) results[idx];
+    }
+
+    public String getString(String name)
+    {
+        return get(name);
+    }
+
+    public UUID getUUID(String name)
+    {
+        return get(name);
+    }
+
+    public Date getTimestamp(String name)
+    {
+        return get(name);
+    }
+
+    public <T> Set<T> getSet(String name)
+    {
+        return get(name);
+    }
+
+    public String toString()
+    {
+        return "Row{" +
+               "names=" + nameIndex.keys() +
+               ", results=" + Arrays.toString(results) +
+               '}';
+    }
+
+    private void checkAccess()
+    {
+        if (results == null)
+            throw new NoSuchElementException();
+    }
+
+    private int findIndex(String name)
+    {
+        return nameIndex.getOrDefault(name, -1);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index f03bae0..371de54 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Verb;
@@ -151,9 +152,16 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         @Override
         public synchronized void startup()
         {
+            startup(AbstractCluster.this);
+        }
+
+        public synchronized void startup(ICluster cluster)
+        {
+            if (cluster != AbstractCluster.this)
+                throw new IllegalArgumentException("Only the owning cluster can be used for startup"); //TODO why have this in the API?
             if (!isShutdown)
                 throw new IllegalStateException();
-            delegate().startup(AbstractCluster.this);
+            delegate().startup(cluster);
             isShutdown = false;
             updateMessagingVersions();
         }
@@ -183,9 +191,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             throw new IllegalStateException("Cannot get live member count on shutdown instance");
         }
 
-        public int nodetool(String... commandAndArgs)
+        public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
         {
-            return delegate().nodetool(commandAndArgs);
+            return delegate().nodetoolResult(withNotifications, commandAndArgs);
         }
 
         public long killAttempts()
@@ -355,7 +363,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         return filters;
     }
 
-    public MessageFilters.Builder verbs(Verb... verbs)
+    public IMessageFilters.Builder verbs(Verb... verbs)
     {
         int[] ids = new int[verbs.length];
         for (int i = 0; i < verbs.length; ++i)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 94c7e9e..dee9049 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.QueryResult;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.pager.QueryPager;
@@ -52,9 +53,9 @@ public class Coordinator implements ICoordinator
     }
 
     @Override
-    public Object[][] execute(String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
+    public QueryResult executeWithResult(String query, Enum<?> consistencyLevel, Object... boundValues)
     {
-        return instance.sync(() -> executeInternal(query, consistencyLevelOrigin, boundValues)).call();
+        return instance.sync(() -> executeInternal(query, consistencyLevel, boundValues)).call();
     }
 
     public Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
@@ -63,7 +64,7 @@ public class Coordinator implements ICoordinator
             try
             {
                 Tracing.instance.newSession(sessionId, Collections.emptyMap());
-                return executeInternal(query, consistencyLevelOrigin, boundValues);
+                return executeInternal(query, consistencyLevelOrigin, boundValues).toObjectArrays();
             }
             finally
             {
@@ -72,7 +73,7 @@ public class Coordinator implements ICoordinator
         }).call();
     }
 
-    private Object[][] executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues)
+    private QueryResult executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues)
     {
         ClientState clientState = makeFakeClientState();
         CQLStatement prepared = QueryProcessor.getStatement(query, clientState);
@@ -94,9 +95,16 @@ public class Coordinator implements ICoordinator
                                              System.nanoTime());
 
         if (res != null && res.kind == ResultMessage.Kind.ROWS)
-            return RowUtil.toObjects((ResultMessage.Rows) res);
+        {
+            ResultMessage.Rows rows = (ResultMessage.Rows) res;
+            String[] names = rows.result.metadata.names.stream().map(c -> c.name.toString()).toArray(String[]::new);
+            Object[][] results = RowUtil.toObjects(rows);
+            return new QueryResult(names, results);
+        }
         else
-            return new Object[][]{};
+        {
+            return QueryResult.EMPTY;
+        }
     }
 
     public Object[][] executeWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 1beb708..90d747e 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -25,10 +25,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
+import javax.management.ListenerNotFoundException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -58,6 +61,8 @@ import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
@@ -81,6 +86,7 @@ import org.apache.cassandra.service.DefaultFSErrorHandler;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.streaming.StreamReceiveTask;
 import org.apache.cassandra.streaming.StreamTransferTask;
 import org.apache.cassandra.streaming.async.StreamingInboundHandler;
@@ -199,31 +205,37 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
     private void registerMockMessaging(ICluster cluster)
     {
-        BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
-        BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
-            if (permitMessage(cluster, to, message))
-                deliverToInstance.accept(to, message);
-        };
-
         MessagingService.instance().outboundSink.add((message, to) -> {
-            deliverToInstanceIfNotFiltered.accept(to, serializeMessage(message.from(), to, message));
+            cluster.get(to).receiveMessage(serializeMessage(message.from(), to, message));
             return false;
         });
     }
 
-    // unnecessary if registerMockMessaging used
-    private void registerFilter(ICluster cluster)
+    private void registerInboundFilter(ICluster cluster)
     {
-        MessagingService.instance().outboundSink.add((message, to) -> {
-            return permitMessage(cluster, to, serializeMessage(message.from(), to, message));
-        });
+        MessagingService.instance().inboundSink.add(message ->
+                permitMessageInbound(cluster, serializeMessage(message.from(), broadcastAddressAndPort(), message)));
     }
 
-    private boolean permitMessage(ICluster cluster, InetAddressAndPort to, IMessage message)
+    private void registerOutboundFilter(ICluster cluster)
+    {
+
+        MessagingService.instance().outboundSink.add((message, to) ->
+                                                     permitMessageOutbound(cluster, to, serializeMessage(message.from(), to, message)));
+    }
+
+    private boolean permitMessageInbound(ICluster cluster, IMessage message)
     {
         int fromNum = cluster.get(message.from()).config().num();
+        int toNum = config.num(); // since this instance is reciving the message, to will always be this instance
+        return cluster.filters().permitInbound(fromNum, toNum, message);
+    }
+
+    private boolean permitMessageOutbound(ICluster cluster, InetAddressAndPort to, IMessage message)
+    {
+        int fromNum = config.num(); // since this instance is sending the message, from will always be this instance
         int toNum = cluster.get(to).config().num();
-        return cluster.filters().permit(fromNum, toNum, message);
+        return cluster.filters().permitOutbound(fromNum, toNum, message);
     }
 
     public void uncaughtException(Thread thread, Throwable throwable)
@@ -274,8 +286,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
             Message.Header header = messageIn.header;
             TraceState state = Tracing.instance.initializeFromMessage(header);
             if (state != null) state.trace("{} message received from {}", header.verb, header.from);
-            header.verb.stage.execute(ThrowingRunnable.toRunnable(() -> messageIn.verb().handler().doVerb((Message<Object>) messageIn)),
-                                      ExecutorLocals.create(state));
+            header.verb.stage.execute(() -> {
+                MessagingService.instance().inboundSink.accept(messageIn);
+            }, ExecutorLocals.create(state));
         }).run();
     }
 
@@ -362,7 +375,6 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
                 if (config.has(NETWORK))
                 {
-                    registerFilter(cluster);
                     MessagingService.instance().listen();
                 }
                 else
@@ -372,6 +384,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 //                    -- not sure what that means?  SocketFactory.instance.getClass();
                     registerMockMessaging(cluster);
                 }
+                registerInboundFilter(cluster);
+                registerOutboundFilter(cluster);
                 JVMStabilityInspector.replaceKiller(new InstanceKiller());
 
                 // TODO: this is more than just gossip
@@ -554,9 +568,66 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }).call();
     }
 
-    public int nodetool(String... commandAndArgs)
+    public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
     {
-        return sync(() -> new NodeTool(new InternalNodeProbeFactory()).execute(commandAndArgs)).call();
+        return sync(() -> {
+            DTestNodeTool nodetool = new DTestNodeTool(withNotifications);
+            int rc =  nodetool.execute(commandAndArgs);
+            return new NodeToolResult(commandAndArgs, rc, new ArrayList<>(nodetool.notifications.notifications), nodetool.latestError);
+        }).call();
+    }
+
+    private static class DTestNodeTool extends NodeTool {
+        private final StorageServiceMBean storageProxy;
+        private final CollectingNotificationListener notifications = new CollectingNotificationListener();
+
+        private Throwable latestError;
+
+        DTestNodeTool(boolean withNotifications) {
+            super(new InternalNodeProbeFactory(withNotifications));
+            storageProxy = new InternalNodeProbe(withNotifications).getStorageService();
+            storageProxy.addNotificationListener(notifications, null, null);
+        }
+
+        public int execute(String... args)
+        {
+            try
+            {
+                return super.execute(args);
+            }
+            finally
+            {
+                try
+                {
+                    storageProxy.removeNotificationListener(notifications, null, null);
+                }
+                catch (ListenerNotFoundException e)
+                {
+                    // ignored
+                }
+            }
+        }
+
+        protected void badUse(Exception e)
+        {
+            super.badUse(e);
+            latestError = e;
+        }
+
+        protected void err(Throwable e)
+        {
+            super.err(e);
+            latestError = e;
+        }
+    }
+
+    private static final class CollectingNotificationListener implements NotificationListener {
+        private final CopyOnWriteArrayList<Notification> notifications = new CopyOnWriteArrayList<>();
+
+        public void handleNotification(Notification notification, Object handback)
+        {
+            notifications.add(notification);
+        }
     }
 
     public long killAttempts()
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
index c92553f..b4017e1 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.impl;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.cassandra.distributed.api.IMessage;
@@ -27,9 +28,20 @@ import org.apache.cassandra.distributed.api.IMessageFilters;
 
 public class MessageFilters implements IMessageFilters
 {
-    private final List<Filter> filters = new CopyOnWriteArrayList<>();
+    private final List<Filter> inboundFilters = new CopyOnWriteArrayList<>();
+    private final List<Filter> outboundFilters = new CopyOnWriteArrayList<>();
 
-    public boolean permit(int from, int to, IMessage msg)
+    public boolean permitInbound(int from, int to, IMessage msg)
+    {
+        return permit(inboundFilters, from, to, msg);
+    }
+
+    public boolean permitOutbound(int from, int to, IMessage msg)
+    {
+        return permit(outboundFilters, from, to, msg);
+    }
+
+    private static boolean permit(List<Filter> filters, int from, int to, IMessage msg)
     {
         for (Filter filter : filters)
         {
@@ -39,14 +51,15 @@ public class MessageFilters implements IMessageFilters
         return true;
     }
 
-    public class Filter implements IMessageFilters.Filter
+    public static class Filter implements IMessageFilters.Filter
     {
+        final List<Filter> parent;
         final int[] from;
         final int[] to;
         final int[] verbs;
         final Matcher matcher;
 
-        Filter(int[] from, int[] to, int[] verbs, Matcher matcher)
+        private Filter(List<Filter> parent, int[] from, int[] to, int[] verbs, Matcher matcher)
         {
             if (from != null)
             {
@@ -63,6 +76,7 @@ public class MessageFilters implements IMessageFilters
                 verbs = verbs.clone();
                 Arrays.sort(verbs);
             }
+            this.parent = Objects.requireNonNull(parent, "parent");
             this.from = from;
             this.to = to;
             this.verbs = verbs;
@@ -73,7 +87,8 @@ public class MessageFilters implements IMessageFilters
         {
             return (from == null ? 0 : Arrays.hashCode(from))
                    + (to == null ? 0 : Arrays.hashCode(to))
-                   + (verbs == null ? 0 : Arrays.hashCode(verbs));
+                   + (verbs == null ? 0 : Arrays.hashCode(verbs)
+                   + parent.hashCode());
         }
 
         public boolean equals(Object that)
@@ -85,18 +100,19 @@ public class MessageFilters implements IMessageFilters
         {
             return Arrays.equals(from, that.from)
                    && Arrays.equals(to, that.to)
-                   && Arrays.equals(verbs, that.verbs);
+                   && Arrays.equals(verbs, that.verbs)
+                   && parent.equals(that.parent);
         }
 
         public Filter off()
         {
-            filters.remove(this);
+            parent.remove(this);
             return this;
         }
 
         public Filter on()
         {
-            filters.add(this);
+            parent.add(this);
             return this;
         }
 
@@ -111,55 +127,69 @@ public class MessageFilters implements IMessageFilters
 
     public class Builder implements IMessageFilters.Builder
     {
+        boolean inbound;
         int[] from;
         int[] to;
         int[] verbs;
         Matcher matcher;
 
-        private Builder(int[] verbs)
+        private Builder(boolean inbound)
         {
-            this.verbs = verbs;
+            this.inbound = inbound;
         }
 
-        public Builder from(int... nums)
+        public IMessageFilters.Builder from(int... nums)
         {
             from = nums;
             return this;
         }
 
-        public Builder to(int... nums)
+        public IMessageFilters.Builder to(int... nums)
         {
             to = nums;
             return this;
         }
 
+        public IMessageFilters.Builder verbs(int... verbs)
+        {
+            this.verbs = verbs;
+            return this;
+        }
+
+        public IMessageFilters.Builder allVerbs()
+        {
+            this.verbs = null;
+            return this;
+        }
+
+        public IMessageFilters.Builder inbound(boolean inbound)
+        {
+            this.inbound = inbound;
+            return this;
+        }
+
         public IMessageFilters.Builder messagesMatching(Matcher matcher)
         {
             this.matcher = matcher;
             return this;
         }
 
-        public Filter drop()
+        public IMessageFilters.Filter drop()
         {
-            return new Filter(from, to, verbs, matcher).on();
+            return new Filter(inbound ? inboundFilters : outboundFilters, from, to, verbs, matcher).on();
         }
     }
 
 
-    public Builder verbs(int... verbs)
-    {
-        return new Builder(verbs);
-    }
-
-    @Override
-    public Builder allVerbs()
+    public IMessageFilters.Builder inbound(boolean inbound)
     {
-        return new Builder(null);
+        return new Builder(inbound);
     }
 
     @Override
     public void reset()
     {
-        filters.clear();
+        inboundFilters.clear();
+        outboundFilters.clear();
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
index 6786519..9ab264e 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
@@ -18,10 +18,10 @@
 
 package org.apache.cassandra.distributed.mock.nodetool;
 
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.Iterator;
 import java.util.Map;
+import javax.management.ListenerNotFoundException;
 
 import com.google.common.collect.Multimap;
 
@@ -45,14 +45,19 @@ import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.service.GCInspector;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.tools.NodeProbe;
+import org.mockito.Mockito;
 
 public class InternalNodeProbe extends NodeProbe
 {
-    public InternalNodeProbe() throws IOException
+    private final boolean withNotifications;
+
+    public InternalNodeProbe(boolean withNotifications)
     {
-        super("", 0);
+        this.withNotifications = withNotifications;
+        connect();
     }
 
     protected void connect()
@@ -61,7 +66,26 @@ public class InternalNodeProbe extends NodeProbe
         mbeanServerConn = null;
         jmxc = null;
 
-        ssProxy = StorageService.instance;
+        if (withNotifications)
+        {
+            ssProxy = StorageService.instance;
+        }
+        else
+        {
+            // replace the notification apis with a no-op method
+            StorageServiceMBean mock = Mockito.spy(StorageService.instance);
+            Mockito.doNothing().when(mock).addNotificationListener(Mockito.any(), Mockito.any(), Mockito.any());
+            try
+            {
+                Mockito.doNothing().when(mock).removeNotificationListener(Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.doNothing().when(mock).removeNotificationListener(Mockito.any());
+            }
+            catch (ListenerNotFoundException e)
+            {
+                throw new AssertionError(e);
+            }
+            ssProxy = mock;
+        }
         msProxy = MessagingService.instance();
         streamProxy = StreamManager.instance;
         compactionProxy = CompactionManager.instance;
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
index f7c9dcf..1904aa7 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
@@ -25,11 +25,18 @@ import org.apache.cassandra.tools.INodeProbeFactory;
 
 public class InternalNodeProbeFactory implements INodeProbeFactory
 {
+    private final boolean withNotifications;
+
+    public InternalNodeProbeFactory(boolean withNotifications)
+    {
+        this.withNotifications = withNotifications;
+    }
+
     public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
+        return new InternalNodeProbe(withNotifications);
     }
 
     public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+        return new InternalNodeProbe(withNotifications);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
new file mode 100644
index 0000000..7f162e1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Assert;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.QueryResult;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.distributed.impl.IInvokableInstance;
+import org.apache.cassandra.metrics.StorageMetrics;
+
+import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking;
+
+public final class DistributedRepairUtils
+{
+    public static final int DEFAULT_COORDINATOR = 1;
+
+    private DistributedRepairUtils()
+    {
+
+    }
+
+    public static NodeToolResult repair(AbstractCluster<?> cluster, RepairType repairType, boolean withNotifications, String... args) {
+        return repair(cluster, DEFAULT_COORDINATOR, repairType, withNotifications, args);
+    }
+
+    public static NodeToolResult repair(AbstractCluster<?> cluster, int node, RepairType repairType, boolean withNotifications, String... args) {
+        args = repairType.append(args);
+        args = ArrayUtils.addAll(new String[] { "repair" }, args);
+        return cluster.get(node).nodetoolResult(withNotifications, args);
+    }
+
+    public static <I extends IInvokableInstance, C extends AbstractCluster<I>> long getRepairExceptions(C cluster)
+    {
+        return getRepairExceptions(cluster, DEFAULT_COORDINATOR);
+    }
+
+    public static <I extends IInvokableInstance, C extends AbstractCluster<I>> long getRepairExceptions(C cluster, int node)
+    {
+        return cluster.get(node).callOnInstance(() -> StorageMetrics.repairExceptions.getCount());
+    }
+
+    public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, String ks, String table)
+    {
+        return queryParentRepairHistory(cluster, DEFAULT_COORDINATOR, ks, table);
+    }
+
+    public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+    {
+        // This is kinda brittle since the caller never gets the ID and can't ask for the ID; it needs to infer the id
+        // this logic makes the assumption the ks/table pairs are unique (should be or else create should fail) so any
+        // repair for that pair will be the repair id
+        Set<String> tableNames = table == null? Collections.emptySet() : ImmutableSet.of(table);
+
+        QueryResult rs = retryWithBackoffBlocking(10, () -> cluster.coordinator(coordinator)
+                                                                   .executeWithResult("SELECT * FROM system_distributed.parent_repair_history", ConsistencyLevel.QUORUM)
+                                                                   .filter(row -> ks.equals(row.getString("keyspace_name")))
+                                                                   .filter(row -> tableNames.equals(row.getSet("columnfamily_names"))));
+        return rs;
+    }
+
+    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks, String table)
+    {
+        assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks, table);
+    }
+
+    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+    {
+        QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
+        Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
+    }
+
+    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks)
+    {
+        assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks);
+    }
+
+    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks)
+    {
+        QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, null);
+        Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
+    }
+
+    public static void assertParentRepairSuccess(AbstractCluster<?> cluster, String ks, String table)
+    {
+        assertParentRepairSuccess(cluster, DEFAULT_COORDINATOR, ks, table);
+    }
+
+    public static void assertParentRepairSuccess(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+    {
+        QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
+        validateExistingParentRepair(rs, row -> {
+            // check completed
+            Assert.assertNotNull("finished_at not found, the repair is not complete?", row.getTimestamp("finished_at"));
+
+            // check not failed (aka success)
+            Assert.assertNull("Exception found", row.getString("exception_stacktrace"));
+            Assert.assertNull("Exception found", row.getString("exception_message"));
+        });
+    }
+
+    public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, String ks, String table, String message)
+    {
+        assertParentRepairFailedWithMessageContains(cluster, DEFAULT_COORDINATOR, ks, table, message);
+    }
+
+    public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, int coordinator, String ks, String table, String message)
+    {
+        QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
+        validateExistingParentRepair(rs, row -> {
+            // check completed
+            Assert.assertNotNull("finished_at not found, the repair is not complete?", row.getTimestamp("finished_at"));
+
+            // check failed
+            Assert.assertNotNull("Exception not found", row.getString("exception_stacktrace"));
+            String exceptionMessage = row.getString("exception_message");
+            Assert.assertNotNull("Exception not found", exceptionMessage);
+
+            Assert.assertTrue("Unable to locate message '" + message + "' in repair error message: " + exceptionMessage, exceptionMessage.contains(message));
+        });
+    }
+
+    private static void validateExistingParentRepair(QueryResult rs, Consumer<Row> fn)
+    {
+        Assert.assertTrue("No rows found", rs.hasNext());
+        Row row = rs.next();
+
+        Assert.assertNotNull("parent_id (which is the primary key) was null", row.getUUID("parent_id"));
+
+        fn.accept(row);
+
+        // make sure no other records found
+        Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext());
+    }
+
+    public enum RepairType {
+        FULL {
+            public String[] append(String... args)
+            {
+                return ArrayUtils.add(args, "--full");
+            }
+        },
+        INCREMENTAL {
+            public String[] append(String... args)
+            {
+                // incremental is the default
+                return args;
+            }
+        },
+        PREVIEW {
+            public String[] append(String... args)
+            {
+                return ArrayUtils.addAll(args, "--preview");
+            }
+        };
+
+        public abstract String[] append(String... args);
+    }
+
+    public enum RepairParallelism {
+        SEQUENTIAL {
+            public String[] append(String... args)
+            {
+                return ArrayUtils.add(args, "--sequential");
+            }
+        },
+        PARALLEL {
+            public String[] append(String... args)
+            {
+                // default is to be parallel
+                return args;
+            }
+        },
+        DATACENTER_AWARE {
+            public String[] append(String... args)
+            {
+                return ArrayUtils.add(args, "--dc-parallel");
+            }
+        };
+
+        public abstract String[] append(String... args);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
similarity index 59%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
index f7c9dcf..e380985 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
 
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class FullRepairCoordinatorFastTest extends RepairCoordinatorFast
 {
-    public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
-    }
-
-    public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+    public FullRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.FULL, parallelism, withNotifications);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
similarity index 59%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
index f7c9dcf..d3904b3 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
 
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class FullRepairCoordinatorSlowTest extends RepairCoordinatorSlow
 {
-    public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
-    }
-
-    public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+    public FullRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.FULL, parallelism, withNotifications);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
similarity index 58%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
index f7c9dcf..7a4c98e 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
 
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class IncrementalRepairCoordinatorFastTest extends RepairCoordinatorFast
 {
-    public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
-    }
-
-    public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+    public IncrementalRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.INCREMENTAL, parallelism, withNotifications);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
similarity index 58%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
index f7c9dcf..7f9b35f 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
 
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class IncrementalRepairCoordinatorSlowTest extends RepairCoordinatorSlow
 {
-    public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
-    }
-
-    public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+    public IncrementalRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.INCREMENTAL, parallelism, withNotifications);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 3d73a5e..814e229 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@ -19,7 +19,10 @@
 package org.apache.cassandra.distributed.test;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Sets;
@@ -35,12 +38,30 @@ import org.apache.cassandra.distributed.impl.Instance;
 import org.apache.cassandra.distributed.impl.MessageFilters;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
 import org.apache.cassandra.net.Verb;
 
 public class MessageFiltersTest extends DistributedTestBase
 {
     @Test
-    public void simpleFiltersTest() throws Throwable
+    public void simpleInboundFiltersTest()
+    {
+        simpleFiltersTest(true);
+    }
+
+    @Test
+    public void simpleOutboundFiltersTest()
+    {
+        simpleFiltersTest(false);
+    }
+
+    private interface Permit
+    {
+        boolean test(int from, int to, IMessage msg);
+    }
+
+    private static void simpleFiltersTest(boolean inbound)
     {
         int VERB1 = Verb.READ_REQ.id;
         int VERB2 = Verb.READ_RSP.id;
@@ -52,61 +73,62 @@ public class MessageFiltersTest extends DistributedTestBase
         String MSG2 = "msg2";
 
         MessageFilters filters = new MessageFilters();
-        MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+        Permit permit = inbound ? (from, to, msg) -> filters.permitInbound(from, to, msg) : (from, to, msg) -> filters.permitOutbound(from, to, msg);
 
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+        IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop();
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1)));
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
         filter.off();
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
         filters.reset();
 
-        filters.verbs(VERB1).from(1).to(2).drop();
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+        filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop();
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1)));
 
         filters.reset();
         AtomicInteger counter = new AtomicInteger();
-        filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> {
+        filters.verbs(VERB1).inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
             counter.incrementAndGet();
             return Arrays.equals(msg.bytes(), MSG1.getBytes());
         }).drop();
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
         Assert.assertEquals(counter.get(), 1);
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG2)));
         Assert.assertEquals(counter.get(), 2);
 
         // filter chain gets interrupted because a higher level filter returns no match
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
         Assert.assertEquals(counter.get(), 2);
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB2, MSG1)));
         Assert.assertEquals(counter.get(), 2);
         filters.reset();
 
-        filters.allVerbs().from(3, 2).to(2, 1).drop();
-        Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+        filters.allVerbs().inbound(inbound).from(3, 2).to(2, 1).drop();
+        Assert.assertFalse(permit.test(i3, i1, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i3, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i2, i3, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
         filters.reset();
 
         counter.set(0);
-        filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+        filters.allVerbs().inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
             counter.incrementAndGet();
             return false;
         }).drop();
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
         Assert.assertEquals(2, counter.get());
     }
 
-    IMessage msg(int verb, String msg)
+    private static IMessage msg(int verb, String msg)
     {
         return new IMessage()
         {
@@ -166,29 +188,71 @@ public class MessageFiltersTest extends DistributedTestBase
                                                                Verb.MUTATION_REQ.id,
                                                                Verb.MUTATION_RSP.id));
 
-            // Reads and writes are going to time out in both directions
-            IMessageFilters.Filter filter = cluster.filters()
-                                                   .allVerbs()
-                                                   .from(1)
-                                                   .to(2)
-                                                   .messagesMatching((from, to, msg) -> {
-                                                       // Decode and verify message on instance; return the result back here
-                                                       Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> {
-                                                           Message decoded = Instance.deserializeMessage(msg);
-                                                           return (Integer) decoded.verb().id;
-                                                       }).call();
-                                                       Assert.assertTrue(verbs.contains(id));
-                                                       counter.incrementAndGet();
-                                                       return false;
-                                                   }).drop();
+            for (boolean inbound : Arrays.asList(true, false))
+            {
+                counter.set(0);
+                // Reads and writes are going to time out in both directions
+                IMessageFilters.Filter filter = cluster.filters()
+                                                       .allVerbs()
+                                                       .inbound(inbound)
+                                                       .from(1)
+                                                       .to(2)
+                                                       .messagesMatching((from, to, msg) -> {
+                                                           // Decode and verify message on instance; return the result back here
+                                                           Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> {
+                                                               Message decoded = Instance.deserializeMessage(msg);
+                                                               return (Integer) decoded.verb().id;
+                                                           }).call();
+                                                           Assert.assertTrue(verbs.contains(id));
+                                                           counter.incrementAndGet();
+                                                           return false;
+                                                       }).drop();
 
-            for (int i : new int[]{ 1, 2 })
-                cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
-            for (int i : new int[]{ 1, 2 })
-                cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+                for (int i : new int[]{ 1, 2 })
+                    cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
+                for (int i : new int[]{ 1, 2 })
+                    cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+
+                filter.off();
+                Assert.assertEquals(4, counter.get());
+            }
+        }
+    }
+
+    @Test
+    public void outboundBeforeInbound() throws Throwable
+    {
+        try (Cluster cluster = Cluster.create(2))
+        {
+            InetAddressAndPort other = cluster.get(2).broadcastAddressAndPort();
+            CountDownLatch waitForIt = new CountDownLatch(1);
+            Set<Integer> outboundMessagesSeen = new HashSet<>();
+            Set<Integer> inboundMessagesSeen = new HashSet<>();
+            AtomicBoolean outboundAfterInbound = new AtomicBoolean(false);
+            cluster.filters().outbound().verbs(Verb.ECHO_REQ.id, Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
+                outboundMessagesSeen.add(msg.verb());
+                if (inboundMessagesSeen.contains(msg.verb()))
+                    outboundAfterInbound.set(true);
+                return false;
+            }).drop(); // drop is confusing since I am not dropping, im just listening...
+            cluster.filters().inbound().verbs(Verb.ECHO_REQ.id, Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
+                inboundMessagesSeen.add(msg.verb());
+                return false;
+            }).drop(); // drop is confusing since I am not dropping, im just listening...
+            cluster.filters().inbound().verbs(Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
+                waitForIt.countDown();
+                return false;
+            }).drop(); // drop is confusing since I am not dropping, im just listening...
+            cluster.get(1).runOnInstance(() -> {
+                MessagingService.instance().send(Message.out(Verb.ECHO_REQ, NoPayload.noPayload), other);
+            });
+
+            waitForIt.await();
 
-            filter.off();
-            Assert.assertEquals(4, counter.get());
+            Assert.assertEquals(outboundMessagesSeen, inboundMessagesSeen);
+            // since both are equal, only need to confirm the size of one
+            Assert.assertEquals(2, outboundMessagesSeen.size());
+            Assert.assertFalse("outbound message saw after inbound", outboundAfterInbound.get());
         }
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
similarity index 59%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
index f7c9dcf..bafef05 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
 
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class PreviewRepairCoordinatorFastTest extends RepairCoordinatorFast
 {
-    public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
-    }
-
-    public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+    public PreviewRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.PREVIEW, parallelism, withNotifications);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
similarity index 59%
copy from test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
copy to test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
index f7c9dcf..2d52475 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.mock.nodetool;
+package org.apache.cassandra.distributed.test;
 
-import java.io.IOException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import org.apache.cassandra.tools.NodeProbe;
-import org.apache.cassandra.tools.INodeProbeFactory;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
-public class InternalNodeProbeFactory implements INodeProbeFactory
+@RunWith(Parameterized.class)
+public class PreviewRepairCoordinatorSlowTest extends RepairCoordinatorSlow
 {
-    public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
-    }
-
-    public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+    public PreviewRepairCoordinatorSlowTest(RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(RepairType.PREVIEW, parallelism, withNotifications);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index ed29a30..c712948 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -131,7 +131,7 @@ public class PreviewRepairTest extends DistributedTestBase
             SimpleCondition continuePreviewRepair = new SimpleCondition();
             DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
             // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
-            cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 
             Future<Pair<Boolean, Boolean>> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
             Thread.sleep(1000);
@@ -170,7 +170,7 @@ public class PreviewRepairTest extends DistributedTestBase
             // pause preview repair validation messages on node2 until node1 has finished
             SimpleCondition continuePreviewRepair = new SimpleCondition();
             DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
-            cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 
             // get local ranges to repair two separate ranges:
             List<String> localRanges = cluster.get(1).callOnInstance(() -> {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
new file mode 100644
index 0000000..4651376
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+public class RepairCoordinatorBase extends DistributedTestBase
+{
+    protected static Cluster CLUSTER;
+
+    protected final RepairType repairType;
+    protected final RepairParallelism parallelism;
+    protected final boolean withNotifications;
+
+    public RepairCoordinatorBase(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+    {
+        this.repairType = repairType;
+        this.parallelism = parallelism;
+        this.withNotifications = withNotifications;
+    }
+
+    @Parameterized.Parameters(name = "{0}/{1}")
+    public static Collection<Object[]> testsWithoutType()
+    {
+        List<Object[]> tests = new ArrayList<>();
+        for (RepairParallelism p : RepairParallelism.values())
+        {
+            tests.add(new Object[] { p, true });
+            tests.add(new Object[] { p, false });
+        }
+        return tests;
+    }
+
+    @BeforeClass
+    public static void before()
+    {
+        // This only works because the way CI works
+        // In CI a new JVM is spun up for each test file, so this doesn't have to worry about another test file
+        // getting this set first
+        System.setProperty("cassandra.nodetool.jmx_notification_poll_interval_seconds", "1");
+    }
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        // streaming requires networking ATM
+        // streaming also requires gossip or isn't setup properly
+        CLUSTER = init(Cluster.build(2)
+                              .withConfig(c -> c.with(Feature.NETWORK)
+                                                .with(Feature.GOSSIP))
+                              .start());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (CLUSTER != null)
+            CLUSTER.close();
+    }
+
+    protected String tableName(String prefix) {
+        return prefix + "_" + postfix();
+    }
+
+    protected String postfix()
+    {
+        return repairType.name().toLowerCase() + "_" + parallelism.name().toLowerCase() + "_" + withNotifications;
+    }
+
+    protected NodeToolResult repair(int node, String... args) {
+        return DistributedRepairUtils.repair(CLUSTER, node, repairType, withNotifications, parallelism.append(args));
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java
new file mode 100644
index 0000000..ac31334
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.net.Verb;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+
+@RunWith(Parameterized.class)
+@Ignore("Until CASSANDRA-15566 is in these tests all time out")
+public class RepairCoordinatorFailingMessageTest extends DistributedTestBase implements Serializable
+{
+    private static Cluster CLUSTER;
+
+    private final RepairType repairType;
+    private final boolean withNotifications;
+
+    public RepairCoordinatorFailingMessageTest(RepairType repairType, boolean withNotifications)
+    {
+        this.repairType = repairType;
+        this.withNotifications = withNotifications;
+    }
+
+    @Parameterized.Parameters(name = "{0}/{1}")
+    public static Collection<Object[]> messages()
+    {
+        List<Object[]> tests = new ArrayList<>();
+        for (RepairType type : RepairType.values())
+        {
+            tests.add(new Object[] { type, true });
+            tests.add(new Object[] { type, false });
+        }
+        return tests;
+    }
+
+    @BeforeClass
+    public static void before()
+    {
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        // streaming requires networking ATM
+        // streaming also requires gossip or isn't setup properly
+        CLUSTER = init(Cluster.build(3) // set to 3 so streaming hits non-local case
+                              .withConfig(c -> c.with(Feature.NETWORK)
+                                                .with(Feature.GOSSIP))
+                              .start());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (CLUSTER != null)
+            CLUSTER.close();
+    }
+
+    private String tableName(String prefix) {
+        return prefix + "_" + postfix() + "_" + withNotifications;
+    }
+
+    private String postfix()
+    {
+        return repairType.name().toLowerCase();
+    }
+
+    private NodeToolResult repair(int node, String... args) {
+        return DistributedRepairUtils.repair(CLUSTER, node, repairType, withNotifications, args);
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void prepareIrFailure()
+    {
+        Assume.assumeTrue("The Verb.PREPARE_CONSISTENT_REQ is only for incremental, so disable in non-incremental", repairType == RepairType.INCREMENTAL);
+        // Wait, isn't this copy paste of RepairCoordinatorTest::prepareFailure?  NO!
+        // Incremental repair sends the PREPARE message the same way full does, but then after it does it sends
+        // a consistent prepare message... and that one doesn't handle errors...
+        CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".prepareirfailure (key text, value text, PRIMARY KEY (key))");
+        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_CONSISTENT_REQ).messagesMatching(of(m -> {
+            throw new RuntimeException("prepare fail");
+        })).drop();
+        try
+        {
+            NodeToolResult result = repair(1, KEYSPACE, "prepareirfailure");
+            result.asserts()
+                  .failure()
+                  .errorContains("error prepare fail")
+                  .notificationContains(NodeToolResult.ProgressEventType.ERROR, "error prepare fail")
+                  .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+        }
+        finally
+        {
+            filter.off();
+        }
+    }
+
+    //TODO failure reply murkle tree
+    //TODO failure reply murkle tree IR
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void validationFailure()
+    {
+        String table = tableName("validationfailure");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.VALIDATION_REQ).messagesMatching(of(m -> {
+            throw new RuntimeException("validation fail");
+        })).drop();
+        try
+        {
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            result.asserts()
+                  .failure()
+                  .errorContains("Some repair failed")
+                  .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Some repair failed")
+                  .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+        }
+        finally
+        {
+            filter.off();
+        }
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void streamFailure()
+    {
+        String table = tableName("streamfailure");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+        // there needs to be a difference to cause streaming to happen, so add to one node
+        CLUSTER.get(2).executeInternal(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), "some data");
+        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SYNC_REQ).messagesMatching(of(m -> {
+            throw new RuntimeException("stream fail");
+        })).drop();
+        try
+        {
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            result.asserts()
+                  .failure()
+                  .errorContains("Some repair failed")
+                  .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Some repair failed")
+                  .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+        }
+        finally
+        {
+            filter.off();
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
new file mode 100644
index 0000000..edcb9c3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.LongTokenRange;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.NodeToolResult.ProgressEventType;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.StorageService;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+
+public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
+{
+    public RepairCoordinatorFast(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(repairType, parallelism, withNotifications);
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void simple() {
+        String table = tableName("simple");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY KEY (key))", KEYSPACE, table));
+        CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
+
+        long repairExceptions = getRepairExceptions(CLUSTER, 2);
+        NodeToolResult result = repair(2, KEYSPACE, table);
+        result.asserts().success();
+        if (withNotifications)
+        {
+            result.asserts()
+                  .notificationContains(ProgressEventType.START, "Starting repair command")
+                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                  .notificationContains(ProgressEventType.SUCCESS, repairType != RepairType.PREVIEW ? "Repair completed successfully": "Repair preview completed successfully")
+                  .notificationContains(ProgressEventType.COMPLETE, "finished");
+        }
+
+        if (repairType != RepairType.PREVIEW)
+        {
+            assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
+        }
+        else
+        {
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+        }
+
+        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void missingKeyspace()
+    {
+        // as of this moment the check is done in nodetool so the JMX notifications are not imporant
+        // nor is the history stored
+        long repairExceptions = getRepairExceptions(CLUSTER, 2);
+        NodeToolResult result = repair(2, "doesnotexist");
+        result.asserts()
+              .failure()
+              .errorContains("Keyspace [doesnotexist] does not exist.");
+
+        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+
+        assertParentRepairNotExist(CLUSTER, "doesnotexist");
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void missingTable()
+    {
+        long repairExceptions = getRepairExceptions(CLUSTER, 2);
+        NodeToolResult result = repair(2, KEYSPACE, "doesnotexist");
+        result.asserts()
+              .failure();
+        if (withNotifications)
+        {
+            result.asserts()
+                  .errorContains("failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
+                  // Start notification is ignored since this is checked during setup (aka before start)
+                  .notificationContains(ProgressEventType.ERROR, "failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
+                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+        }
+
+        assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
+
+        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void noTablesToRepair()
+    {
+        // index CF currently don't support repair, so they get dropped when listed
+        // this is done in this test to cause the keyspace to have 0 tables to repair, which causes repair to no-op
+        // early and skip.
+        String table = tableName("withindex");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+        CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s (value)", postfix(), KEYSPACE, table));
+
+        long repairExceptions = getRepairExceptions(CLUSTER, 2);
+        // if CF has a . in it, it is assumed to be a 2i which rejects repairs
+        NodeToolResult result = repair(2, KEYSPACE, table + ".value");
+        result.asserts().success();
+        if (withNotifications)
+        {
+            result.asserts()
+                  .notificationContains("Empty keyspace")
+                  .notificationContains("skipping repair: " + KEYSPACE)
+                  // Start notification is ignored since this is checked during setup (aka before start)
+                  .notificationContains(ProgressEventType.SUCCESS, "Empty keyspace") // will fail since success isn't returned; only complete
+                  .notificationContains(ProgressEventType.COMPLETE, "finished"); // will fail since it doesn't do this
+        }
+
+        assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
+
+        // this is actually a SKIP and not a FAILURE, so shouldn't increment
+        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void intersectingRange()
+    {
+        // this test exists to show that this case will cause repair to finish; success or failure isn't imporant
+        // if repair is enhanced to allow intersecting ranges w/ local then this test will fail saying that we expected
+        // repair to fail but it didn't, this would be fine and this test should be updated to reflect the new
+        // semantic
+        String table = tableName("intersectingrange");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+        //TODO dtest api for this?
+        LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
+            Set<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
+            Range<Token> range = Iterables.getFirst(ranges, null);
+            long left = (long) range.left.getTokenValue();
+            long right = (long) range.right.getTokenValue();
+            return new LongTokenRange(left, right);
+        });
+        LongTokenRange intersectingRange = new LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
+
+        long repairExceptions = getRepairExceptions(CLUSTER, 2);
+        NodeToolResult result = repair(2, KEYSPACE, table,
+                                       "--start-token", Long.toString(intersectingRange.minExclusive),
+                                       "--end-token", Long.toString(intersectingRange.maxInclusive));
+        result.asserts()
+              .failure()
+              .errorContains("Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one");
+        if (withNotifications)
+        {
+            result.asserts()
+                  .notificationContains(ProgressEventType.START, "Starting repair command")
+                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                  .notificationContains(ProgressEventType.ERROR, "Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one")
+                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+        }
+
+        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void unknownHost()
+    {
+        String table = tableName("unknownhost");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+        long repairExceptions = getRepairExceptions(CLUSTER, 2);
+        NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "thisreally.should.not.exist.apache.org");
+        result.asserts()
+              .failure()
+              .errorContains("Unknown host specified thisreally.should.not.exist.apache.org");
+        if (withNotifications)
+        {
+            result.asserts()
+                  .notificationContains(ProgressEventType.START, "Starting repair command")
+                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                  .notificationContains(ProgressEventType.ERROR, "Unknown host specified thisreally.should.not.exist.apache.org")
+                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+        }
+
+        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void desiredHostNotCoordinator()
+    {
+        // current limitation is that the coordinator must be apart of the repair, so as long as that exists this test
+        // verifies that the validation logic will termniate the repair properly
+        String table = tableName("desiredhostnotcoordinator");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+        long repairExceptions = getRepairExceptions(CLUSTER, 2);
+        NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "localhost");
+        result.asserts()
+              .failure()
+              .errorContains("The current host must be part of the repair");
+        if (withNotifications)
+        {
+            result.asserts()
+                  .notificationContains(ProgressEventType.START, "Starting repair command")
+                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                  .notificationContains(ProgressEventType.ERROR, "The current host must be part of the repair")
+                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+        }
+
+        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void onlyCoordinator()
+    {
+        // this is very similar to ::desiredHostNotCoordinator but has the difference that the only host to do repair
+        // is the coordinator
+        String table = tableName("onlycoordinator");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+
+        long repairExceptions = getRepairExceptions(CLUSTER, 2);
+        NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", "localhost");
+        result.asserts()
+              .failure()
+              .errorContains("Specified hosts [localhost] do not share range");
+        if (withNotifications)
+        {
+            result.asserts()
+                  .notificationContains(ProgressEventType.START, "Starting repair command")
+                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                  .notificationContains(ProgressEventType.ERROR, "Specified hosts [localhost] do not share range")
+                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+        }
+
+        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+        //TODO should this be marked as fail to match others?  Should they not be marked?
+        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void replicationFactorOne()
+    {
+        // In the case of rf=1 repair fails to create a cmd handle so node tool exists early
+        String table = tableName("one");
+        // since cluster is shared and this test gets called multiple times, need "IF NOT EXISTS" so the second+ attempt
+        // does not fail
+        CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS replicationfactor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
+        CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s (key text, value text, PRIMARY KEY (key))", table));
+
+        long repairExceptions = getRepairExceptions(CLUSTER, 1);
+        NodeToolResult result = repair(1, "replicationfactor", table);
+        result.asserts()
+              .success();
+
+        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+
+        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 1));
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void prepareFailure()
+    {
+        String table = tableName("preparefailure");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
+            throw new RuntimeException("prepare fail");
+        })).drop();
+        try
+        {
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            result.asserts()
+                  .failure()
+                  .errorContains("Got negative replies from endpoints");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting repair command")
+                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, "Got negative replies from endpoints")
+                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+            }
+
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints");
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+        }
+        finally
+        {
+            filter.off();
+        }
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void snapshotFailure()
+    {
+        Assume.assumeFalse("incremental does not do snapshot", repairType == RepairType.INCREMENTAL);
+        Assume.assumeFalse("Parallel repair does not perform snapshots", parallelism == RepairParallelism.PARALLEL);
+
+        String table = tableName("snapshotfailure");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
+            throw new RuntimeException("snapshot fail");
+        })).drop();
+        try
+        {
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            result.asserts()
+                  .failure();
+            if (withNotifications)
+            {
+                result.asserts()
+                      .errorContains("Could not create snapshot")
+                      .notificationContains(ProgressEventType.START, "Starting repair command")
+                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, "Could not create snapshot ")
+                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
+            }
+            else
+            {
+                // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
+                // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
+                // polls on) is ignored.  With notifications off, the poll await fails and queries cmd state, and that
+                // will have the below error.
+                // NOTE: this isn't desireable, would be good to propgate
+                result.asserts()
+                      .errorContains("Some repair failed");
+            }
+
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Could not create snapshot");
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+        }
+        finally
+        {
+            filter.off();
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
new file mode 100644
index 0000000..32538ec
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.impl.MessageFilters;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+
+public abstract class RepairCoordinatorSlow extends RepairCoordinatorBase
+{
+    public RepairCoordinatorSlow(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
+    {
+        super(repairType, parallelism, withNotifications);
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void prepareRPCTimeout()
+    {
+        String table = tableName("preparerpctimeout");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).drop();
+        try
+        {
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            result.asserts()
+                  .failure()
+                  .errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
+                      .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
+                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+            }
+
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+        }
+        finally
+        {
+            filter.off();
+        }
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void neighbourDown() throws InterruptedException, ExecutionException
+    {
+        String table = tableName("neighbourdown");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+        Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
+        String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().toString());
+        try
+        {
+            // wait for the node to stop
+            shutdownFuture.get();
+            // wait for the failure detector to detect this
+            CLUSTER.get(1).runOnInstance(() -> {
+                InetAddressAndPort neighbor;
+                try
+                {
+                    neighbor = InetAddressAndPort.getByName(downNodeAddress);
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                while (FailureDetector.instance.isAlive(neighbor))
+                    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+            });
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            result.asserts()
+                  .failure()
+                  .errorContains("Endpoint not alive");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
+                      .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not alive")
+                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+            }
+
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+        }
+        finally
+        {
+            CLUSTER.get(2).startup();
+        }
+
+        // make sure to call outside of the try/finally so the node is up so we can actually query
+        if (repairType != RepairType.PREVIEW)
+        {
+            assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint not alive");
+        }
+        else
+        {
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+        }
+    }
+
+    @Test(timeout = 1 * 60 * 1000)
+    public void validationParticipentCrashesAndComesBack()
+    {
+        // Test what happens when a participant restarts in the middle of validation
+        // Currently this isn't recoverable but could be.
+        // TODO since this is a real restart, how would I test "long pause"? Can't send SIGSTOP since same procress
+        String table = tableName("validationparticipentcrashesandcomesback");
+        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
+        AtomicReference<Future<Void>> participantShutdown = new AtomicReference<>();
+        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
+            // the nice thing about this is that this lambda is "capturing" and not "transfer", what this means is that
+            // this lambda isn't serialized and any object held isn't copied.
+            participantShutdown.set(CLUSTER.get(2).shutdown());
+            return true; // drop it so this node doesn't reply before shutdown.
+        })).drop();
+        try
+        {
+            // since nodetool is blocking, need to handle participantShutdown in the background
+            CompletableFuture<Void> recovered = CompletableFuture.runAsync(() -> {
+                try {
+                    while (participantShutdown.get() == null) {
+                        // event not happened, wait for it
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    }
+                    Future<Void> f = participantShutdown.get();
+                    f.get(); // wait for shutdown to complete
+                    CLUSTER.get(2).startup();
+                } catch (Exception e) {
+                    if (e instanceof RuntimeException) {
+                        throw (RuntimeException) e;
+                    }
+                    throw new RuntimeException(e);
+                }
+            });
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            recovered.join(); // if recovery didn't happen then the results are not what are being tested, so block here first
+            result.asserts()
+                  .failure();
+            if (withNotifications)
+            {
+                result.asserts()
+                      .errorContains("Endpoint 127.0.0.2:7012 died")
+                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint 127.0.0.2:7012 died")
+                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
+            }
+            else
+            {
+                // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
+                // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
+                // polls on) is ignored.  With notifications off, the poll await fails and queries cmd state, and that
+                // will have the below error.
+                // NOTE: this isn't desireable, would be good to propgate
+                result.asserts()
+                      .errorContains("Some repair failed");
+            }
+
+            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint 127.0.0.2:7012 died");
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+        }
+        finally
+        {
+            filter.off();
+            try {
+                CLUSTER.get(2).startup();
+            } catch (Exception e) {
+                // if you call startup twice it is allowed to fail, so ignore it... hope this didn't brike the other tests =x
+            }
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/Retry.java b/test/unit/org/apache/cassandra/utils/Retry.java
new file mode 100644
index 0000000..513b0f2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/Retry.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.IntToLongFunction;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+/**
+ * Class for retryable actions.
+ *
+ * @see {@link #retryWithBackoff(int, Supplier, Predicate)}
+ */
+public final class Retry
+{
+    private static final ScheduledExecutorService SCHEDULED = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("RetryScheduler"));
+
+    private Retry()
+    {
+
+    }
+
+    /**
+     * Schedule code to run after the defined duration.
+     *
+     * Since a executor was not defined, the global {@link ForkJoinPool#commonPool()} executor will be used, if this
+     * is not desirable then should use {@link #schedule(Duration, Executor, Runnable)}.
+     *
+     * @param duration how long to delay
+     * @param fn code to run
+     * @return future representing result
+     */
+    public static CompletableFuture<Void> schedule(final Duration duration, final Runnable fn)
+    {
+        return schedule(duration, ForkJoinPool.commonPool(), fn);
+    }
+
+    /**
+     * Schedule code to run after the defined duration on the provided executor.
+     *
+     * @param duration how long to delay
+     * @param executor to run on
+     * @param fn code to run
+     * @return future representing result
+     */
+    public static CompletableFuture<Void> schedule(final Duration duration, final Executor executor, final Runnable fn)
+    {
+        long nanos = duration.toNanos();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        SCHEDULED.schedule(() -> run0(executor, future, fn), nanos, TimeUnit.NANOSECONDS);
+        return future;
+    }
+
+    private static void run0(final Executor executor, final CompletableFuture<Void> future, final Runnable fn)
+    {
+        try
+        {
+            executor.execute(() -> {
+                try
+                {
+                    fn.run();
+                    future.complete(null);
+                }
+                catch (Exception e)
+                {
+                    future.completeExceptionally(e);
+                }
+            });
+        }
+        catch (Exception e)
+        {
+            future.completeExceptionally(e);
+        }
+    }
+
+    /**
+     * Continously attempting to call the provided future supplier until successful or until no longer able to retry.
+     *
+     * @param maxRetries to allow
+     * @param fn asyncronous operation to retry
+     * @param retryableException used to say if retry is allowed
+     * @return future representing the result.  If retries were not able to get a successful result, the exception is the last exception seen.
+     */
+    public static <A> CompletableFuture<A> retryWithBackoff(final int maxRetries,
+                                                            final Supplier<CompletableFuture<A>> fn,
+                                                            final Predicate<Throwable> retryableException)
+    {
+        CompletableFuture<A> future = new CompletableFuture<>();
+        retryWithBackoff0(future, 0, maxRetries, fn, retryableException, retryCount -> computeSleepTimeMillis(retryCount, 50, 1000));
+        return future;
+    }
+
+    /**
+     * This is the same as {@link #retryWithBackoff(int, Supplier, Predicate)}, but takes a blocking retryable action
+     * and blocks the caller until done.
+     */
+    public static <A> A retryWithBackoffBlocking(final int maxRetries, final Supplier<A> fn)
+    {
+        return retryWithBackoffBlocking(maxRetries, fn, (ignore) -> true);
+    }
+
+    /**
+     * This is the same as {@link #retryWithBackoff(int, Supplier, Predicate)}, but takes a blocking retryable action
+     * and blocks the caller until done.
+     */
+    public static <A> A retryWithBackoffBlocking(final int maxRetries,
+                                                 final Supplier<A> fn,
+                                                 final Predicate<Throwable> retryableException)
+    {
+        return retryWithBackoff(maxRetries, () -> CompletableFuture.completedFuture(fn.get()), retryableException).join();
+    }
+
+    private static <A> void retryWithBackoff0(final CompletableFuture<A> result,
+                                              final int retryCount,
+                                              final int maxRetry,
+                                              final Supplier<CompletableFuture<A>> body,
+                                              final Predicate<Throwable> retryableException,
+                                              final IntToLongFunction completeSleep)
+    {
+        try
+        {
+            Consumer<Throwable> attemptRetry = cause -> {
+                if (retryCount >= maxRetry || !retryableException.test(cause))
+                {
+                    // too many attempts or exception isn't retryable, so fail
+                    result.completeExceptionally(cause);
+                }
+                else
+                {
+                    long sleepMillis = completeSleep.applyAsLong(retryCount);
+                    schedule(Duration.ofMillis(sleepMillis), () -> {
+                        retryWithBackoff0(result, retryCount + 1, maxRetry, body, retryableException, completeSleep);
+                    });
+                }
+            };
+
+            // sanity check that the future isn't filled
+            // the most likely cause for this is when the future is composed with other futures (such as .successAsList);
+            // the failure of a different future may cancel this one, so stop running
+            if (result.isDone())
+            {
+                if (!(result.isCancelled() || result.isCompletedExceptionally()))
+                {
+                    // the result is success!  But we didn't fill it...
+                    new RuntimeException("Attempt to retry but found future was successful... aborting " + body).printStackTrace();
+                }
+                return;
+            }
+
+            CompletableFuture<A> future;
+            try
+            {
+                future = body.get();
+            }
+            catch (Exception e)
+            {
+                attemptRetry.accept(e);
+                return;
+            }
+
+            future.whenComplete((success, failure) -> {
+                if (failure == null)
+                {
+                    result.complete(success);
+                }
+                else
+                {
+                    attemptRetry.accept(failure instanceof CompletionException ? failure.getCause() : failure);
+                }
+            });
+        }
+        catch (Exception e)
+        {
+            result.completeExceptionally(e);
+        }
+    }
+
+    /**
+     * Compute a expoential delay based off the retry count and min/max delay.
+     */
+    private static long computeSleepTimeMillis(int retryCount, long baseSleepTimeMillis, long maxSleepMillis)
+    {
+        long baseTime = baseSleepTimeMillis * (1L << retryCount);
+        // its possible that this overflows, so fall back to max;
+        if (baseTime <= 0)
+        {
+            baseTime = maxSleepMillis;
+        }
+        // now make sure this is capped to target max
+        baseTime = Math.min(baseTime, maxSleepMillis);
+
+        return (long) (baseTime * (ThreadLocalRandom.current().nextDouble() + 0.5));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org