You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/27 18:24:31 UTC

[cassandra] branch trunk updated (244d4c2 -> 0934730)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 244d4c2  Merge branch 'cassandra-3.11' into trunk
     new 1f72cc6  Extract in-jvm-dtest API
     new c2cfebf  Merge branch 'cassandra-2.2' into cassandra-3.0
     new a7820d1  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 0934730  Merge branch 'cassandra-3.11' into trunk

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build.xml                                          |   2 +
 .../org/apache/cassandra/distributed/Cluster.java  |  28 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  31 +-
 .../apache/cassandra/distributed/api/Feature.java  |  24 --
 .../apache/cassandra/distributed/api/ICluster.java |  36 ---
 .../cassandra/distributed/api/ICoordinator.java    |  40 ---
 .../cassandra/distributed/api/IInstance.java       |  73 -----
 .../cassandra/distributed/api/IInstanceConfig.java |  60 ----
 .../distributed/api/IIsolatedExecutor.java         | 126 --------
 .../apache/cassandra/distributed/api/IListen.java  |  28 --
 .../apache/cassandra/distributed/api/IMessage.java |  38 ---
 .../cassandra/distributed/api/IMessageFilters.java | 100 -------
 .../distributed/impl/AbstractCluster.java          | 319 +++------------------
 .../cassandra/distributed/impl/Coordinator.java    |  31 +-
 .../impl/DelegatingInvokableInstance.java          |  10 +-
 .../distributed/impl/DistributedTestSnitch.java    |  31 +-
 .../distributed/impl/IInvokableInstance.java       |  67 -----
 .../distributed/impl/IUpgradeableInstance.java     |  28 --
 .../cassandra/distributed/impl/Instance.java       | 111 ++++---
 .../distributed/impl/InstanceClassLoader.java      | 142 ---------
 .../cassandra/distributed/impl/InstanceConfig.java | 100 ++++---
 .../distributed/impl/IsolatedExecutor.java         |   4 +
 .../cassandra/distributed/impl/MessageFilters.java | 195 -------------
 .../cassandra/distributed/impl/MessageImpl.java    |  10 +-
 .../distributed/impl/NetworkTopology.java          | 137 ---------
 .../cassandra/distributed/impl/TracingUtil.java    |   2 +-
 .../cassandra/distributed/impl/Versions.java       | 213 --------------
 .../distributed/shared}/RepairResult.java          |  16 +-
 .../cassandra/distributed/test/BootstrapTest.java  |  50 ++--
 .../cassandra/distributed/test/CasWriteTest.java   |  34 +--
 .../distributed/test/DistributedRepairUtils.java   |   4 +-
 .../distributed/test/DistributedTestBase.java      | 173 -----------
 .../distributed/{impl => test}/ExecUtil.java       |   2 +-
 .../distributed/test/FailingRepairTest.java        |  26 +-
 .../distributed/test/GossipSettlesTest.java        |  16 +-
 .../distributed/test/LargeColumnTest.java          |  37 +--
 .../distributed/test/MessageFiltersTest.java       |  24 +-
 .../distributed/test/MessageForwardingTest.java    |  34 +--
 .../distributed/test/NativeProtocolTest.java       |  49 ++--
 .../distributed/test/NetworkTopologyTest.java      |  40 ++-
 .../cassandra/distributed/test/NodeToolTest.java   |   6 +-
 .../distributed/test/PreviewRepairTest.java        |  36 +--
 .../test/QueryReplayerEndToEndTest.java            |  13 +-
 .../cassandra/distributed/test/ReadRepairTest.java |  22 +-
 .../distributed/test/RepairCoordinatorBase.java    |   2 +-
 .../test/RepairCoordinatorFailingMessageTest.java  |   2 +-
 .../distributed/test/RepairCoordinatorFast.java    |   2 +-
 .../distributed/test/RepairCoordinatorSlow.java    |   1 -
 .../distributed/test/RepairDigestTrackingTest.java |   9 +-
 .../cassandra/distributed/test/RepairTest.java     |  42 +--
 .../distributed/test/ResourceLeakTest.java         |  22 +-
 .../distributed/test/SimpleReadWriteTest.java      |  50 ++--
 .../cassandra/distributed/test/StreamingTest.java  |   4 +-
 .../{GossipSettlesTest.java => TestBaseImpl.java}  |  34 ++-
 .../upgrade/MixedModeReadRepairTest.java           |  21 +-
 .../cassandra/distributed/upgrade/UpgradeTest.java |  41 ++-
 .../distributed/upgrade/UpgradeTestBase.java       |  19 +-
 .../CassandraIsolatedJunit4ClassRunner.java        |   6 +-
 .../apache/cassandra/LogbackStatusListener.java    |   2 +-
 .../config/DatabaseDescriptorRefTest.java          |   4 +-
 60 files changed, 616 insertions(+), 2213 deletions(-)
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/Feature.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/ICluster.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IInstance.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IListen.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IMessage.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/IInvokableInstance.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/Versions.java
 copy {src/java/org/apache/cassandra/repair => test/distributed/org/apache/cassandra/distributed/shared}/RepairResult.java (75%)
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
 rename test/distributed/org/apache/cassandra/distributed/{impl => test}/ExecUtil.java (96%)
 copy test/distributed/org/apache/cassandra/distributed/test/{GossipSettlesTest.java => TestBaseImpl.java} (51%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0934730c2b89b2cc3cb430e4f60fc15b67af7e9f
Merge: 244d4c2 a7820d1
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Fri Mar 27 19:13:17 2020 +0100

    Merge branch 'cassandra-3.11' into trunk

 build.xml                                          |   2 +
 .../org/apache/cassandra/distributed/Cluster.java  |  28 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  31 +-
 .../apache/cassandra/distributed/api/Feature.java  |  24 --
 .../apache/cassandra/distributed/api/ICluster.java |  36 ---
 .../cassandra/distributed/api/ICoordinator.java    |  40 ---
 .../cassandra/distributed/api/IInstance.java       |  73 -----
 .../cassandra/distributed/api/IInstanceConfig.java |  60 ----
 .../distributed/api/IIsolatedExecutor.java         | 126 --------
 .../apache/cassandra/distributed/api/IMessage.java |  38 ---
 .../cassandra/distributed/api/IMessageFilters.java | 100 -------
 .../distributed/impl/AbstractCluster.java          | 319 +++------------------
 .../cassandra/distributed/impl/Coordinator.java    |  31 +-
 .../impl/DelegatingInvokableInstance.java          |  10 +-
 .../distributed/impl/DistributedTestSnitch.java    |  31 +-
 .../distributed/impl/IInvokableInstance.java       |  67 -----
 .../distributed/impl/IUpgradeableInstance.java     |  28 --
 .../cassandra/distributed/impl/Instance.java       | 111 ++++---
 .../distributed/impl/InstanceClassLoader.java      | 142 ---------
 .../cassandra/distributed/impl/InstanceConfig.java | 100 ++++---
 .../distributed/impl/IsolatedExecutor.java         |   4 +
 .../cassandra/distributed/impl/MessageFilters.java | 195 -------------
 .../cassandra/distributed/impl/MessageImpl.java    |  10 +-
 .../distributed/impl/NetworkTopology.java          | 137 ---------
 .../cassandra/distributed/impl/TracingUtil.java    |   2 +-
 .../cassandra/distributed/impl/Versions.java       | 213 --------------
 .../{api/IListen.java => shared/RepairResult.java} |  15 +-
 .../cassandra/distributed/test/BootstrapTest.java  |  50 ++--
 .../cassandra/distributed/test/CasWriteTest.java   |  34 +--
 .../distributed/test/DistributedRepairUtils.java   |   4 +-
 .../distributed/test/DistributedTestBase.java      | 173 -----------
 .../distributed/{impl => test}/ExecUtil.java       |   2 +-
 .../distributed/test/FailingRepairTest.java        |  26 +-
 .../distributed/test/GossipSettlesTest.java        |  16 +-
 .../distributed/test/LargeColumnTest.java          |  37 +--
 .../distributed/test/MessageFiltersTest.java       |  24 +-
 .../distributed/test/MessageForwardingTest.java    |  34 +--
 .../distributed/test/NativeProtocolTest.java       |  49 ++--
 .../distributed/test/NetworkTopologyTest.java      |  40 ++-
 .../cassandra/distributed/test/NodeToolTest.java   |   6 +-
 .../distributed/test/PreviewRepairTest.java        |  36 +--
 .../test/QueryReplayerEndToEndTest.java            |  13 +-
 .../cassandra/distributed/test/ReadRepairTest.java |  22 +-
 .../distributed/test/RepairCoordinatorBase.java    |   2 +-
 .../test/RepairCoordinatorFailingMessageTest.java  |   2 +-
 .../distributed/test/RepairCoordinatorFast.java    |   2 +-
 .../distributed/test/RepairCoordinatorSlow.java    |   1 -
 .../distributed/test/RepairDigestTrackingTest.java |   9 +-
 .../cassandra/distributed/test/RepairTest.java     |  42 +--
 .../distributed/test/ResourceLeakTest.java         |  22 +-
 .../distributed/test/SimpleReadWriteTest.java      |  50 ++--
 .../cassandra/distributed/test/StreamingTest.java  |   4 +-
 .../{GossipSettlesTest.java => TestBaseImpl.java}  |  34 ++-
 .../upgrade/MixedModeReadRepairTest.java           |  21 +-
 .../cassandra/distributed/upgrade/UpgradeTest.java |  41 ++-
 .../distributed/upgrade/UpgradeTestBase.java       |  19 +-
 .../CassandraIsolatedJunit4ClassRunner.java        |   6 +-
 .../apache/cassandra/LogbackStatusListener.java    |   2 +-
 .../config/DatabaseDescriptorRefTest.java          |   4 +-
 59 files changed, 619 insertions(+), 2181 deletions(-)

diff --cc build.xml
index 50937e1,161150a..3be96e4
--- a/build.xml
+++ b/build.xml
@@@ -534,23 -401,28 +534,24 @@@
            <dependency groupId="org.antlr" artifactId="antlr-runtime" version="3.5.2">
              <exclusion groupId="org.antlr" artifactId="stringtemplate"/>
            </dependency>
 -          <dependency groupId="org.slf4j" artifactId="slf4j-api" version="1.7.7"/>
 -          <dependency groupId="org.slf4j" artifactId="log4j-over-slf4j" version="1.7.7"/>
 -          <dependency groupId="org.slf4j" artifactId="jcl-over-slf4j" version="1.7.7" />
 -          <dependency groupId="ch.qos.logback" artifactId="logback-core" version="1.1.3"/>
 -          <dependency groupId="ch.qos.logback" artifactId="logback-classic" version="1.1.3"/>
 -          <dependency groupId="org.codehaus.jackson" artifactId="jackson-core-asl" version="1.9.2"/>
 -          <dependency groupId="org.codehaus.jackson" artifactId="jackson-mapper-asl" version="1.9.2"/>
 +          <dependency groupId="org.slf4j" artifactId="slf4j-api" version="1.7.25"/>
 +          <dependency groupId="org.slf4j" artifactId="log4j-over-slf4j" version="1.7.25"/>
 +          <dependency groupId="org.slf4j" artifactId="jcl-over-slf4j" version="1.7.25" />
 +          <dependency groupId="ch.qos.logback" artifactId="logback-core" version="1.2.3"/>
 +          <dependency groupId="ch.qos.logback" artifactId="logback-classic" version="1.2.3"/>
 +          <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-core" version="2.9.5"/>
 +          <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-databind" version="2.9.5"/>
 +          <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations" version="2.9.5"/>
            <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/>
            <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/>
 -          <dependency groupId="com.github.jbellis" artifactId="jamm" version="0.3.0"/>
 +          <dependency groupId="com.github.jbellis" artifactId="jamm" version="${jamm.version}"/>
  
 -          <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7">
 -            <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/>
 -            <exclusion groupId="junit" artifactId="junit"/>
 -          </dependency>
            <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
 -          <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.2">
 -	         <exclusion groupId="commons-logging" artifactId="commons-logging"/>
 -          </dependency>
 -          <dependency groupId="junit" artifactId="junit" version="4.6" />
 +          <dependency groupId="junit" artifactId="junit" version="4.12" />
            <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
 +          <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
 +          <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
+           <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.1" />
            <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
               <exclusion groupId="commons-lang" artifactId="commons-lang"/>
            </dependency>
@@@ -684,18 -537,21 +685,19 @@@
                  version="${version}"/>
          <dependency groupId="junit" artifactId="junit"/>
          <dependency groupId="org.mockito" artifactId="mockito-core" />
 +        <dependency groupId="org.quicktheories" artifactId="quicktheories" />
 +        <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
+         <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
 +        <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
          <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
          <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
 -      	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
 -      	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
 +        <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
 +        <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
          <dependency groupId="org.antlr" artifactId="antlr"/>
 -        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded">
 -          <exclusion groupId="io.netty" artifactId="netty-buffer"/>
 -          <exclusion groupId="io.netty" artifactId="netty-codec"/>
 -          <exclusion groupId="io.netty" artifactId="netty-handler"/>
 -          <exclusion groupId="io.netty" artifactId="netty-transport"/>
 -        </dependency>
 +        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
          <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
 -        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.4" />
 -        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.4" />
 +        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
 +        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/>
          <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
          <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/>
          <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/>
diff --cc test/distributed/org/apache/cassandra/distributed/Cluster.java
index d3533d3,d657638..ee01874
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@@ -23,11 -23,13 +23,14 @@@ import java.io.IOException
  import java.util.List;
  import java.util.function.Consumer;
  
 +import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInstanceConfig;
 -import org.apache.cassandra.distributed.api.IInvokableInstance;
  import org.apache.cassandra.distributed.impl.AbstractCluster;
- import org.apache.cassandra.distributed.impl.IInvokableInstance;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
  import org.apache.cassandra.distributed.impl.InstanceConfig;
- import org.apache.cassandra.distributed.impl.Versions;
+ import org.apache.cassandra.distributed.shared.Builder;
+ import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.distributed.shared.Versions;
  
  /**
   * A simple cluster supporting only the 'current' Cassandra version, offering easy access to the convenience methods
diff --cc test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 6ffdc62,a7899fe..8566897
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@@ -21,12 -21,13 +21,14 @@@ package org.apache.cassandra.distribute
  import java.io.File;
  import java.util.List;
  
 +import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInstanceConfig;
  import org.apache.cassandra.distributed.impl.AbstractCluster;
- import org.apache.cassandra.distributed.impl.IInvokableInstance;
- import org.apache.cassandra.distributed.impl.IUpgradeableInstance;
+ import org.apache.cassandra.distributed.api.IUpgradeableInstance;
  import org.apache.cassandra.distributed.impl.InstanceConfig;
- import org.apache.cassandra.distributed.impl.Versions;
+ import org.apache.cassandra.distributed.shared.Builder;
+ import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.distributed.shared.Versions;
  
  /**
   * A multi-version cluster, offering only the cross-version API
diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 371de54,82c06da..bcfaaf5
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -56,10 -53,14 +53,14 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.IListen;
  import org.apache.cassandra.distributed.api.IMessage;
  import org.apache.cassandra.distributed.api.IMessageFilters;
+ import org.apache.cassandra.distributed.api.IUpgradeableInstance;
  import org.apache.cassandra.distributed.api.NodeToolResult;
+ import org.apache.cassandra.distributed.shared.InstanceClassLoader;
+ import org.apache.cassandra.distributed.shared.MessageFilters;
+ import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.distributed.shared.Versions;
  import org.apache.cassandra.io.util.FileUtils;
- import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.Verb;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.concurrent.SimpleCondition;
  
@@@ -134,9 -137,9 +137,9 @@@ public abstract class AbstractCluster<
  
          private IInvokableInstance newInstance(int generation)
          {
-             ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
-             return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>) Instance::new, classLoader)
-                            .apply(config.forVersion(version.major), classLoader);
+             ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader);
 -            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader)
 -                                        .apply(config, classLoader);
++            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
++                                        .apply(config.forVersion(version.major), classLoader);
          }
  
          public IInstanceConfig config()
@@@ -152,16 -155,9 +155,16 @@@
          @Override
          public synchronized void startup()
          {
 +            startup(AbstractCluster.this);
 +        }
 +
 +        public synchronized void startup(ICluster cluster)
 +        {
 +            if (cluster != AbstractCluster.this)
-                 throw new IllegalArgumentException("Only the owning cluster can be used for startup"); //TODO why have this in the API?
++                throw new IllegalArgumentException("Only the owning cluster can be used for startup");
              if (!isShutdown)
                  throw new IllegalStateException();
 -            delegate().startup(AbstractCluster.this);
 +            delegate().startup(cluster);
              isShutdown = false;
              updateMessagingVersions();
          }
@@@ -568,258 -565,7 +572,7 @@@
          InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader();
          get(cl.getInstanceId()).uncaughtException(thread, error);
      }
 -    
 +
-     protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
-     {
-         C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader);
-     }
- 
-     public static class Builder<I extends IInstance, C extends AbstractCluster<I>>
-     {
-         private final Factory<I, C> factory;
-         private int nodeCount;
-         private int subnet;
-         private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
-         private TokenSupplier tokenSupplier;
-         private File root;
-         private Versions.Version version = Versions.CURRENT;
-         private Consumer<InstanceConfig> configUpdater;
- 
-         public Builder(Factory<I, C> factory)
-         {
-             this.factory = factory;
-         }
- 
-         public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
-         {
-             this.tokenSupplier = tokenSupplier;
-             return this;
-         }
- 
-         public Builder<I, C> withSubnet(int subnet)
-         {
-             this.subnet = subnet;
-             return this;
-         }
- 
-         public Builder<I, C> withNodes(int nodeCount)
-         {
-             this.nodeCount = nodeCount;
-             return this;
-         }
- 
-         public Builder<I, C> withDCs(int dcCount)
-         {
-             return withRacks(dcCount, 1);
-         }
- 
-         public Builder<I, C> withRacks(int dcCount, int racksPerDC)
-         {
-             if (nodeCount == 0)
-                 throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
- 
-             int totalRacks = dcCount * racksPerDC;
-             int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
-             return withRacks(dcCount, racksPerDC, nodesPerRack);
-         }
- 
-         public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
-         {
-             if (nodeIdTopology != null)
-                 throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
- 
-             nodeIdTopology = new HashMap<>();
-             int nodeId = 1;
-             for (int dc = 1; dc <= dcCount; dc++)
-             {
-                 for (int rack = 1; rack <= racksPerDC; rack++)
-                 {
-                     for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
-                         nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
-                 }
-             }
-             // adjust the node count to match the allocatation
-             final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
-             if (adjustedNodeCount != nodeCount)
-             {
-                 assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
-                 logger.info("Network topology of {} DCs with {} racks per DC and {} nodes per rack required increasing total nodes to {}",
-                             dcCount, racksPerDC, nodesPerRack, adjustedNodeCount);
-                 nodeCount = adjustedNodeCount;
-             }
-             return this;
-         }
- 
-         public Builder<I, C> withDC(String dcName, int nodeCount)
-         {
-             return withRack(dcName, rackName(1), nodeCount);
-         }
- 
-         public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
-         {
-             if (nodeIdTopology == null)
-             {
-                 if (nodeCount > 0)
-                     throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
- 
-                 nodeIdTopology = new HashMap<>();
-             }
-             for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
-                 nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
- 
-             nodeCount += nodesInRack;
-             return this;
-         }
- 
-         // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
-         public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
-         {
-             if (nodeIdTopology.isEmpty())
-                 throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
- 
-             IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
-                 if (nodeIdTopology.get(nodeId) == null)
-                     throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
-             });
- 
-             if (nodeCount != nodeIdTopology.size())
-             {
-                 nodeCount = nodeIdTopology.size();
-                 logger.info("Adjusting node count to {} for supplied network topology", nodeCount);
-             }
- 
-             this.nodeIdTopology = new HashMap<>(nodeIdTopology);
- 
-             return this;
-         }
- 
-         public Builder<I, C> withRoot(File root)
-         {
-             this.root = root;
-             return this;
-         }
- 
-         public Builder<I, C> withVersion(Versions.Version version)
-         {
-             this.version = version;
-             return this;
-         }
- 
-         public Builder<I, C> withConfig(Consumer<InstanceConfig> updater)
-         {
-             this.configUpdater = updater;
-             return this;
-         }
- 
-         public C createWithoutStarting() throws IOException
-         {
-             if (root == null)
-                 root = Files.createTempDirectory("dtests").toFile();
- 
-             if (nodeCount <= 0)
-                 throw new IllegalStateException("Cluster must have at least one node");
- 
-             if (nodeIdTopology == null)
-             {
-                 nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
-                                           .collect(Collectors.toMap(nodeId -> nodeId,
-                                                                     nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
-             }
- 
-             root.mkdirs();
-             setupLogging(root);
- 
-             ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
- 
-             List<InstanceConfig> configs = new ArrayList<>();
- 
-             if (tokenSupplier == null)
-                 tokenSupplier = evenlyDistributedTokens(nodeCount);
- 
-             for (int i = 0; i < nodeCount; ++i)
-             {
-                 int nodeNum = i + 1;
-                 configs.add(createInstanceConfig(nodeNum));
-             }
- 
-             return factory.newCluster(root, version, configs, sharedClassLoader);
-         }
- 
-         public InstanceConfig newInstanceConfig(C cluster)
-         {
-             return createInstanceConfig(cluster.size() + 1);
-         }
- 
-         private InstanceConfig createInstanceConfig(int nodeNum)
-         {
-             String ipPrefix = "127.0." + subnet + ".";
-             String seedIp = ipPrefix + "1";
-             String ipAddress = ipPrefix + nodeNum;
-             long token = tokenSupplier.token(nodeNum);
- 
-             NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
- 
-             InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
-             if (configUpdater != null)
-                 configUpdater.accept(config);
- 
-             return config;
-         }
- 
-         public C start() throws IOException
-         {
-             C cluster = createWithoutStarting();
-             cluster.startup();
-             return cluster;
-         }
-     }
- 
-     public static TokenSupplier evenlyDistributedTokens(int numNodes)
-     {
-         long increment = (Long.MAX_VALUE / numNodes) * 2;
-         return (int nodeId) -> {
-             assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
-                                                       nodeId, numNodes);
-             return Long.MIN_VALUE + 1 + nodeId * increment;
-         };
-     }
- 
-     public static interface TokenSupplier
-     {
-         public long token(int nodeId);
-     }
- 
-     static String dcName(int index)
-     {
-         return "datacenter" + index;
-     }
- 
-     static String rackName(int index)
-     {
-         return "rack" + index;
-     }
- 
-     private static void setupLogging(File root)
-     {
-         try
-         {
-             String testConfPath = "test/conf/logback-dtest.xml";
-             Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
- 
-             if (!logConfPath.toFile().exists())
-             {
-                 Files.copy(new File(testConfPath).toPath(),
-                            logConfPath);
-             }
- 
-             System.setProperty("logback.configurationFile", "file://" + logConfPath);
-         }
-         catch (IOException e)
-         {
-             throw new RuntimeException(e);
-         }
-     }
- 
      @Override
      public void close()
      {
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index dee9049,d49679d..c1446d1
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -73,10 -74,15 +74,15 @@@ public class Coordinator implements ICo
          }).call();
      }
  
-     private QueryResult executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues)
+     protected org.apache.cassandra.db.ConsistencyLevel toCassandraCL(ConsistencyLevel cl)
+     {
+         return org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal());
+     }
+ 
+     private QueryResult executeInternal(String query, ConsistencyLevel consistencyLevelOrigin, Object[] boundValues)
      {
          ClientState clientState = makeFakeClientState();
 -        CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement;
 +        CQLStatement prepared = QueryProcessor.getStatement(query, clientState);
          List<ByteBuffer> boundBBValues = new ArrayList<>();
          ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
          for (Object boundValue : boundValues)
diff --cc test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
index 736e77b,8652bbc..e671f4d
--- a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
@@@ -19,7 -19,10 +19,9 @@@
  package org.apache.cassandra.distributed.impl;
  
  import java.net.InetAddress;
+ import java.net.InetSocketAddress;
 -import java.util.HashMap;
  import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
  
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.db.SystemKeyspace;
diff --cc test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
index 3eb3657,d42e799..0000000
deleted file mode 100644,100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
+++ /dev/null
@@@ -1,28 -1,29 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.cassandra.distributed.impl;
--
--import org.apache.cassandra.distributed.api.IInstance;
 -import org.apache.cassandra.distributed.shared.Versions;
--
--// this lives outside the api package so that we do not have to worry about inter-version compatibility
--public interface IUpgradeableInstance extends IInstance
--{
--    // only to be invoked while the node is shutdown!
--    public void setVersion(Versions.Version version);
--}
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 90d747e,759a636..e79a182
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -20,9 -20,14 +20,12 @@@ package org.apache.cassandra.distribute
  
  import java.io.File;
  import java.io.IOException;
 -import java.net.InetAddress;
+ import java.net.InetSocketAddress;
 -import java.nio.ByteBuffer;
  import java.util.ArrayList;
  import java.util.Collections;
+ import java.util.HashMap;
  import java.util.List;
+ import java.util.Map;
  import java.util.UUID;
  import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.CopyOnWriteArrayList;
@@@ -58,7 -64,9 +64,8 @@@ import org.apache.cassandra.dht.IPartit
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.api.ICoordinator;
 -import org.apache.cassandra.distributed.api.IInstance;
  import org.apache.cassandra.distributed.api.IInstanceConfig;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
  import org.apache.cassandra.distributed.api.IListen;
  import org.apache.cassandra.distributed.api.IMessage;
  import org.apache.cassandra.distributed.api.NodeToolResult;
@@@ -105,6 -110,6 +113,8 @@@ import static java.util.concurrent.Time
  import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
  import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
  import static org.apache.cassandra.distributed.api.Feature.NETWORK;
++import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.fromCassandraInetAddressAndPort;
++import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
  
  public class Instance extends IsolatedExecutor implements IInvokableInstance
  {
@@@ -117,7 -129,8 +134,9 @@@
          super("node" + config.num(), classLoader);
          this.config = config;
          InstanceIDDefiner.setInstanceId(config.num());
-         FBUtilities.setBroadcastInetAddressAndPort(config.broadcastAddressAndPort());
 -        FBUtilities.setBroadcastInetAddress(config.broadcastAddress().getAddress());
++        FBUtilities.setBroadcastInetAddressAndPort(InetAddressAndPort.getByAddressOverrideDefaults(config.broadcastAddress().getAddress(),
++                                                                                                   config.broadcastAddress().getPort()));
+ 
          // Set the config at instance creation, possibly before startup() has run on all other instances.
          // setMessagingVersions below will call runOnInstance which will instantiate
          // the MessagingService and dependencies preventing later changes to network parameters.
@@@ -203,53 -213,104 +222,47 @@@
          }).run();
      }
  
 -    private void registerMockMessaging(ICluster<IInstance> cluster)
 +    private void registerMockMessaging(ICluster cluster)
      {
 -        BiConsumer<InetSocketAddress, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
 -        BiConsumer<InetSocketAddress, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
 -            int fromNum = config().num();
 -            int toNum = cluster.get(to).config().num();
 -
 -            if (cluster.filters().permitOutbound(fromNum, toNum, message)
 -                && cluster.filters().permitInbound(fromNum, toNum, message))
 -                deliverToInstance.accept(to, message);
 -        };
 -
 -        Map<InetAddress, InetSocketAddress> addressAndPortMap = new HashMap<>();
 -        cluster.stream().forEach(instance -> {
 -            InetSocketAddress addressAndPort = instance.broadcastAddress();
 -            if (!addressAndPort.equals(instance.config().broadcastAddress()))
 -                throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddress());
 -            InetSocketAddress prev = addressAndPortMap.put(addressAndPort.getAddress(),
 -                                                                        addressAndPort);
 -            if (null != prev)
 -                throw new IllegalStateException("This version of Cassandra does not support multiple nodes with the same InetAddress: " + addressAndPort + " vs " + prev);
 +        MessagingService.instance().outboundSink.add((message, to) -> {
-             cluster.get(to).receiveMessage(serializeMessage(message.from(), to, message));
++            InetSocketAddress toAddr = fromCassandraInetAddressAndPort(to);
++            cluster.get(toAddr).receiveMessage(serializeMessage(message.from(), to, message));
 +            return false;
          });
 -
 -        MessagingService.instance().addMessageSink(new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
      }
  
 -    // unnecessary if registerMockMessaging used
 -    private void registerFilters(ICluster cluster)
 +    private void registerInboundFilter(ICluster cluster)
      {
-         MessagingService.instance().inboundSink.add(message ->
-                 permitMessageInbound(cluster, serializeMessage(message.from(), broadcastAddressAndPort(), message)));
 -        IInstance instance = this;
 -        MessagingService.instance().addMessageSink(new IMessageSink()
 -        {
 -            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress toAddress)
 -            {
 -                // Port is not passed in, so take a best guess at the destination port from this instance
 -                IInstance to = cluster.get(NetworkTopology.addressAndPort(toAddress,
 -                                                                          instance.config().broadcastAddress().getPort()));
 -                int fromNum = config().num();
 -                int toNum = to.config().num();
 -                return cluster.filters().permitOutbound(fromNum, toNum, serializeMessage(message, id,
 -                                                                                 broadcastAddress(),
 -                                                                                 to.broadcastAddress()));
 -            }
 -
 -            public boolean allowIncomingMessage(MessageIn message, int id)
 -            {
 -                // Port is not passed in, so take a best guess at the destination port from this instance
 -                IInstance from = cluster.get(NetworkTopology.addressAndPort(message.from,
 -                                                                            instance.config().broadcastAddress().getPort()));
 -                int fromNum = from.config().num();
 -                int toNum = config().num();
 -
 -
 -                IMessage msg = serializeMessage(message, id, from.broadcastAddress(), broadcastAddress());
++        MessagingService.instance().inboundSink.add(message -> {
++            IMessage serialized = serializeMessage(message.from(), toCassandraInetAddressAndPort(broadcastAddress()), message);
++            int fromNum = cluster.get(serialized.from()).config().num();
++            int toNum = config.num(); // since this instance is reciving the message, to will always be this instance
++            return cluster.filters().permitInbound(fromNum, toNum, serialized);
++        });
 +    }
  
 -                return cluster.filters().permitInbound(fromNum, toNum, msg);
 -            }
 +    private void registerOutboundFilter(ICluster cluster)
 +    {
- 
-         MessagingService.instance().outboundSink.add((message, to) ->
-                                                      permitMessageOutbound(cluster, to, serializeMessage(message.from(), to, message)));
-     }
- 
-     private boolean permitMessageInbound(ICluster cluster, IMessage message)
-     {
-         int fromNum = cluster.get(message.from()).config().num();
-         int toNum = config.num(); // since this instance is reciving the message, to will always be this instance
-         return cluster.filters().permitInbound(fromNum, toNum, message);
-     }
- 
-     private boolean permitMessageOutbound(ICluster cluster, InetAddressAndPort to, IMessage message)
-     {
-         int fromNum = config.num(); // since this instance is sending the message, from will always be this instance
-         int toNum = cluster.get(to).config().num();
-         return cluster.filters().permitOutbound(fromNum, toNum, message);
++        MessagingService.instance().outboundSink.add((message, to) -> {
++            IMessage serialzied = serializeMessage(message.from(), to, message);
++            int fromNum = config.num(); // since this instance is sending the message, from will always be this instance
++            int toNum = cluster.get(fromCassandraInetAddressAndPort(to)).config().num();
++            return cluster.filters().permitOutbound(fromNum, toNum, serialzied);
+         });
      }
  
 -    public static IMessage serializeMessage(MessageOut messageOut, int id, InetSocketAddress from, InetSocketAddress to)
 +    public void uncaughtException(Thread thread, Throwable throwable)
      {
 -        try (DataOutputBuffer out = new DataOutputBuffer(1024))
 -        {
 -            int version = MessagingService.instance().getVersion(to.getAddress());
 -
 -            out.writeInt(MessagingService.PROTOCOL_MAGIC);
 -            out.writeInt(id);
 -            long timestamp = System.currentTimeMillis();
 -            out.writeInt((int) timestamp);
 -            messageOut.serialize(out, version);
 -            return new MessageImpl(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
 -        }
 -        catch (IOException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 +        sync(CassandraDaemon::uncaughtException).accept(thread, throwable);
      }
  
 -    public static IMessage serializeMessage(MessageIn messageIn, int id, InetSocketAddress from, InetSocketAddress to)
 +    private static IMessage serializeMessage(InetAddressAndPort from, InetAddressAndPort to, Message<?> messageOut)
      {
          try (DataOutputBuffer out = new DataOutputBuffer(1024))
          {
 -            int version = MessagingService.instance().getVersion(to.getAddress());
 -
 -            out.writeInt(MessagingService.PROTOCOL_MAGIC);
 -            out.writeInt(id);
 -            long timestamp = System.currentTimeMillis();
 -            out.writeInt((int) timestamp);
 -
 -            MessageOut.serialize(out,
 -                                 from.getAddress(),
 -                                 messageIn.verb,
 -                                 messageIn.parameters,
 -                                 messageIn.payload,
 -                                 version);
 -
 -            return new MessageImpl(messageIn.verb.ordinal(), out.toByteArray(), id, version, from);
 +            int version = MessagingService.instance().versions.get(to);
 +            Message.serializer.serialize(messageOut, out, version);
-             return new MessageImpl(messageOut.verb().id, out.toByteArray(), messageOut.id(), version, from);
++            return new MessageImpl(messageOut.verb().id, out.toByteArray(), messageOut.id(), version, fromCassandraInetAddressAndPort(from));
          }
          catch (IOException e)
          {
@@@ -257,12 -318,80 +270,12 @@@
          }
      }
  
 -    private class MessageDeliverySink implements IMessageSink
 +    @VisibleForTesting
 +    public static Message<?> deserializeMessage(IMessage message)
      {
 -        private final BiConsumer<InetSocketAddress, IMessage> deliver;
 -        private final Function<InetAddress, InetSocketAddress> lookupAddressAndPort;
 -
 -        MessageDeliverySink(BiConsumer<InetSocketAddress, IMessage> deliver,
 -                            Function<InetAddress, InetSocketAddress> lookupAddressAndPort)
 +        try (DataInputBuffer in = new DataInputBuffer(message.bytes()))
          {
-             return Message.serializer.deserialize(in, message.from(), message.version());
 -            this.deliver = deliver;
 -            this.lookupAddressAndPort = lookupAddressAndPort;
 -        }
 -
 -        public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress to)
 -        {
 -            InetSocketAddress from = broadcastAddress();
 -            assert from.equals(lookupAddressAndPort.apply(messageOut.from));
 -
 -            // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected
 -            byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER);
 -            if (sessionBytes != null)
 -            {
 -                UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
 -                TraceState state = Tracing.instance.get(sessionId);
 -                String message = String.format("Sending %s message to %s", messageOut.verb, to);
 -                // session may have already finished; see CASSANDRA-5668
 -                if (state == null)
 -                {
 -                    byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE);
 -                    Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
 -                    Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), message, traceType.getTTL());
 -                }
 -                else
 -                {
 -                    state.trace(message);
 -                    if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE)
 -                        Tracing.instance.doneWithNonLocalSession(state);
 -                }
 -            }
 -
 -            InetSocketAddress toFull = lookupAddressAndPort.apply(to);
 -            deliver.accept(toFull,
 -                           serializeMessage(messageOut, id, broadcastAddress(), toFull));
 -
 -            return false;
 -        }
 -
 -        public boolean allowIncomingMessage(MessageIn message, int id)
 -        {
 -            // we can filter to our heart's content on the outgoing message; no need to worry about incoming
 -            return true;
 -        }
 -    }
 -
 -    public static MessageIn<Object> deserializeMessage(IMessage imessage)
 -    {
 -        // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
 -        try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
 -        {
 -            int version = imessage.version();
 -            if (version > MessagingService.current_version)
 -            {
 -                throw new IllegalStateException(String.format("Received message version %d but current version is %d",
 -                                                              version,
 -                                                              MessagingService.current_version));
 -            }
 -
 -            MessagingService.validateMagic(input.readInt());
 -            int id;
 -            if (version < MessagingService.VERSION_20)
 -                id = Integer.parseInt(input.readUTF());
 -            else
 -                id = input.readInt();
 -            long currentTime = ApproximateTime.currentTimeMillis();
 -            return MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().getAddress(), input, currentTime));
++            return Message.serializer.deserialize(in, toCassandraInetAddressAndPort(message.from()), message.version());
          }
          catch (Throwable t)
          {
@@@ -270,25 -399,28 +283,24 @@@
          }
      }
  
 -    public void receiveMessage(IMessage imessage)
 +    @Override
 +    public void receiveMessage(IMessage message)
      {
          sync(() -> {
 -            // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
 -            try
 -            {
 -                MessageIn message = deserializeMessage(imessage);
 -                if (message == null)
 -                {
 -                    // callback expired; nothing to do
 -                    return;
 -                }
 -                if (message.version <= MessagingService.current_version)
 -                {
 -                    MessagingService.instance().receive(message, imessage.id());
 -                }
 -                // else ignore message
 -            }
 -            catch (Throwable t)
 +            if (message.version() > MessagingService.current_version)
              {
 -                throw new RuntimeException("Exception occurred on node " + broadcastAddress(), t);
 +                throw new IllegalStateException(String.format("Node%d received message version %d but current version is %d",
 +                                                              this.config.num(),
 +                                                              message.version(),
 +                                                              MessagingService.current_version));
              }
 +
 +            Message<?> messageIn = deserializeMessage(message);
 +            Message.Header header = messageIn.header;
 +            TraceState state = Tracing.instance.initializeFromMessage(header);
 +            if (state != null) state.trace("{} message received from {}", header.verb, header.from);
-             header.verb.stage.execute(() -> {
-                 MessagingService.instance().inboundSink.accept(messageIn);
-             }, ExecutorLocals.create(state));
++            header.verb.stage.execute(() -> MessagingService.instance().inboundSink.accept(messageIn),
++                                      ExecutorLocals.create(state));
          }).run();
      }
  
@@@ -297,9 -429,9 +309,10 @@@
          return callsOnInstance(() -> MessagingService.current_version).call();
      }
  
-     public void setMessagingVersion(InetAddressAndPort endpoint, int version)
++    @Override
+     public void setMessagingVersion(InetSocketAddress endpoint, int version)
      {
-         MessagingService.instance().versions.set(endpoint, version);
 -        runOnInstance(() -> MessagingService.instance().setVersion(endpoint.getAddress(), version));
++        MessagingService.instance().versions.set(toCassandraInetAddressAndPort(endpoint), version);
      }
  
      public void flush(String keyspace)
@@@ -327,19 -459,9 +340,20 @@@
          sync(() -> {
              try
              {
 +                FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
 +
 +                if (config.has(GOSSIP))
 +                {
 +                    // TODO: hacky
 +                    System.setProperty("cassandra.ring_delay_ms", "5000");
 +                    System.setProperty("cassandra.consistent.rangemovement", "false");
 +                    System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true");
 +                }
 +
                  mkdirs();
  
-                 assert config.networkTopology().contains(config.broadcastAddressAndPort());
 -                assert config.networkTopology().contains(config.broadcastAddress());
++                assert config.networkTopology().contains(config.broadcastAddress()) : String.format("Network topology %s doesn't contain the address %s",
++                                                                                                    config.networkTopology(), config.broadcastAddress());
                  DistributedTestSnitch.assign(config.networkTopology());
  
                  DatabaseDescriptor.daemonInitialization();
@@@ -384,15 -506,12 +398,17 @@@
  //                    -- not sure what that means?  SocketFactory.instance.getClass();
                      registerMockMessaging(cluster);
                  }
 +                registerInboundFilter(cluster);
 +                registerOutboundFilter(cluster);
++
 +                JVMStabilityInspector.replaceKiller(new InstanceKiller());
  
                  // TODO: this is more than just gossip
                  if (config.has(GOSSIP))
                  {
                      StorageService.instance.initServer();
                      StorageService.instance.removeShutdownHook();
++                    Gossiper.waitToSettle();
                  }
                  else
                  {
@@@ -413,10 -529,10 +429,11 @@@
                      StorageService.instance.setRpcReady(true);
                  }
  
-                 if (!FBUtilities.getBroadcastAddressAndPort().equals(broadcastAddressAndPort()))
 -                if (!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress()))
 -                    throw new IllegalStateException();
 -                if (DatabaseDescriptor.getStoragePort() != broadcastAddress().getPort())
--                    throw new IllegalStateException();
++                if (!FBUtilities.getBroadcastAddressAndPort().address.equals(broadcastAddress().getAddress()) ||
++                    FBUtilities.getBroadcastAddressAndPort().port != broadcastAddress().getPort())
++                    throw new IllegalStateException(String.format("%s != %s", FBUtilities.getBroadcastAddressAndPort(), broadcastAddress()));
 +
 +                ActiveRepairService.instance.start();
              }
              catch (Throwable t)
              {
@@@ -436,10 -552,10 +453,10 @@@
              new File(dir).mkdirs();
      }
  
--    private static Config loadConfig(IInstanceConfig overrides)
++    private Config loadConfig(IInstanceConfig overrides)
      {
          Config config = new Config();
-         overrides.propagate(config);
+         overrides.propagate(config, mapper);
          return config;
      }
  
@@@ -468,32 -584,29 +485,33 @@@
  
              for (int i = 0; i < tokens.size(); i++)
              {
-                 InetAddressAndPort ep = hosts.get(i);
+                 InetSocketAddress ep = hosts.get(i);
++                InetAddressAndPort addressAndPort = toCassandraInetAddressAndPort(ep);
                  UUID hostId = hostIds.get(i);
                  Token token = tokens.get(i);
                  Gossiper.runInGossipStageBlocking(() -> {
-                     Gossiper.instance.initializeNodeUnsafe(ep, hostId, 1);
-                     Gossiper.instance.injectApplicationState(ep,
 -                    Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1);
 -                    Gossiper.instance.injectApplicationState(ep.getAddress(),
++                    Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1);
++                    Gossiper.instance.injectApplicationState(addressAndPort,
                                                               ApplicationState.TOKENS,
                                                               new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
-                     storageService.onChange(ep,
 -                    storageService.onChange(ep.getAddress(),
++                    storageService.onChange(addressAndPort,
 +                                            ApplicationState.STATUS_WITH_PORT,
 +                                            new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
-                     storageService.onChange(ep,
++                    storageService.onChange(addressAndPort,
                                              ApplicationState.STATUS,
                                              new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
-                     Gossiper.instance.realMarkAlive(ep, Gossiper.instance.getEndpointStateForEndpoint(ep));
 -                    Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
++                    Gossiper.instance.realMarkAlive(addressAndPort, Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
                  });
  
                  int messagingVersion = cluster.get(ep).isShutdown()
                                         ? MessagingService.current_version
                                         : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion());
-                 MessagingService.instance().versions.set(ep, messagingVersion);
 -                MessagingService.instance().setVersion(ep.getAddress(), messagingVersion);
++                MessagingService.instance().versions.set(addressAndPort, messagingVersion);
              }
  
              // check that all nodes are in token metadata
              for (int i = 0; i < tokens.size(); ++i)
-                 assert storageService.getTokenMetadata().isMember(hosts.get(i));
 -                assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress());
++                assert storageService.getTokenMetadata().isMember(toCassandraInetAddressAndPort(hosts.get(i)));
          }
          catch (Throwable e) // UnknownHostException
          {
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index ad4f6bf,2aad9cd..9e2d9d6
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -117,9 -96,7 +96,9 @@@ public class InstanceConfig implements 
                  .set("storage_port", 7012)
                  .set("endpoint_snitch", DistributedTestSnitch.class.getName())
                  .set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(),
-                         Collections.singletonMap("seeds", seedIp + ":7012")))
 -                        Collections.singletonMap("seeds", seedIp)))
++                                                             Collections.singletonMap("seeds", seedIp + ":7012")))
 +                // required settings for dtest functionality
 +                .set("diagnostic_events_enabled", true)
                  .set("auto_bootstrap", false)
                  // capacities that are based on `totalMemory` that should be fixed size
                  .set("index_summary_capacity_in_mb", 50l)
diff --cc test/distributed/org/apache/cassandra/distributed/shared/RepairResult.java
index c2e8dd6,0000000..7be381a
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/RepairResult.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/RepairResult.java
@@@ -1,28 -1,0 +1,31 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
- package org.apache.cassandra.distributed.api;
++package org.apache.cassandra.distributed.shared;
 +
- public interface IListen
++public class RepairResult
 +{
-     public interface Cancel { void cancel(); }
++    public final boolean success;
++    public final boolean wasInconsistent;
 +
-     Cancel schema(Runnable onChange);
- 
-     Cancel liveMembers(Runnable onChange);
++    public RepairResult(boolean success, boolean wasInconsistent)
++    {
++        this.success = success;
++        this.wasInconsistent = wasInconsistent;
++    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java
index 7e7c629,0000000..a5d7e72
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CasWriteTest.java
@@@ -1,276 -1,0 +1,278 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Consumer;
 +import java.util.function.Function;
 +import java.util.function.Supplier;
 +
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.rules.ExpectedException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.distributed.Cluster;
- import org.apache.cassandra.distributed.impl.InstanceClassLoader;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ICluster;
++import org.apache.cassandra.distributed.shared.InstanceClassLoader;
 +import org.apache.cassandra.exceptions.CasWriteTimeoutException;
 +import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.hamcrest.BaseMatcher;
 +import org.hamcrest.Description;
 +
 +import static org.hamcrest.CoreMatchers.containsString;
- import static org.junit.Assert.fail;
++import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 +
- public class CasWriteTest extends DistributedTestBase
++// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
++public class CasWriteTest extends TestBaseImpl
 +{
 +    // Sharing the same cluster to boost test speed. Using a pkGen to make sure queries has distinct pk value for paxos instances.
-     private static Cluster cluster;
++    private static ICluster cluster;
 +    private static final AtomicInteger pkGen = new AtomicInteger(1_000); // preserve any pk values less than 1000 for manual queries.
 +    private static final Logger logger = LoggerFactory.getLogger(CasWriteTest.class);
 +
 +    @Rule
 +    public ExpectedException thrown = ExpectedException.none();
 +
 +    @BeforeClass
 +    public static void setupCluster() throws Throwable
 +    {
-         cluster = init(Cluster.create(3));
++        cluster = init(Cluster.build().withNodes(3).start());
 +        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +    }
 +
 +    @AfterClass
-     public static void close()
++    public static void close() throws Exception
 +    {
 +        cluster.close();
 +        cluster = null;
 +    }
 +
 +    @Before @After
 +    public void resetFilters()
 +    {
 +        cluster.filters().reset();
 +    }
 +
 +    @Test
 +    public void testCasWriteSuccessWithNoContention()
 +    {
 +        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS",
 +                                       ConsistencyLevel.QUORUM);
 +        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 +                                                  ConsistencyLevel.QUORUM),
 +                   row(1, 1, 1));
 +
 +        cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 AND ck = 1 IF v = 1",
 +                                       ConsistencyLevel.QUORUM);
 +        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 +                                                  ConsistencyLevel.QUORUM),
 +                   row(1, 1, 2));
 +    }
 +
 +    @Test
 +    public void testCasWriteTimeoutAtPreparePhase_ReqLost()
 +    {
 +        expectCasWriteTimeout();
-         cluster.verbs(Verb.PAXOS_PREPARE_REQ).from(1).to(2, 3).drop().on(); // drop the internode messages to acceptors
++        cluster.filters().verbs(Verb.PAXOS_PREPARE_REQ.id).from(1).to(2, 3).drop().on(); // drop the internode messages to acceptors
 +        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM);
 +    }
 +
 +    @Test
 +    public void testCasWriteTimeoutAtPreparePhase_RspLost()
 +    {
 +        expectCasWriteTimeout();
-         cluster.verbs(Verb.PAXOS_PREPARE_RSP).from(2, 3).to(1).drop().on(); // drop the internode messages to acceptors
++        cluster.filters().verbs(Verb.PAXOS_PREPARE_RSP.id).from(2, 3).to(1).drop().on(); // drop the internode messages to acceptors
 +        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM);
 +    }
 +
 +    @Test
 +    public void testCasWriteTimeoutAtProposePhase_ReqLost()
 +    {
 +        expectCasWriteTimeout();
-         cluster.verbs(Verb.PAXOS_PROPOSE_REQ).from(1).to(2, 3).drop().on();
++        cluster.filters().verbs(Verb.PAXOS_PROPOSE_REQ.id).from(1).to(2, 3).drop().on();
 +        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM);
 +    }
 +
 +    @Test
 +    public void testCasWriteTimeoutAtProposePhase_RspLost()
 +    {
 +        expectCasWriteTimeout();
-         cluster.verbs(Verb.PAXOS_PROPOSE_RSP).from(2, 3).to(1).drop().on();
++        cluster.filters().verbs(Verb.PAXOS_PROPOSE_RSP.id).from(2, 3).to(1).drop().on();
 +        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM);
 +    }
 +
 +    @Test
 +    public void testCasWriteTimeoutAtCommitPhase_ReqLost()
 +    {
 +        expectCasWriteTimeout();
-         cluster.verbs(Verb.PAXOS_COMMIT_REQ).from(1).to(2, 3).drop().on();
++        cluster.filters().verbs(Verb.PAXOS_COMMIT_REQ.id).from(1).to(2, 3).drop().on();
 +        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM);
 +    }
 +
 +    @Test
 +    public void testCasWriteTimeoutAtCommitPhase_RspLost()
 +    {
 +        expectCasWriteTimeout();
-         cluster.verbs(Verb.PAXOS_COMMIT_RSP).from(2, 3).to(1).drop().on();
++        cluster.filters().verbs(Verb.PAXOS_COMMIT_RSP.id).from(2, 3).to(1).drop().on();
 +        cluster.coordinator(1).execute(mkUniqueCasInsertQuery(1), ConsistencyLevel.QUORUM);
 +    }
 +
 +
 +
 +    @Test
 +    public void casWriteContentionTimeoutTest() throws InterruptedException
 +    {
 +        testWithContention(101,
 +                           Arrays.asList(1, 3),
 +                           c -> {
 +                               c.filters().reset();
-                                c.verbs(Verb.PAXOS_PREPARE_REQ).from(1).to(3).drop();
-                                c.verbs(Verb.PAXOS_PROPOSE_REQ).from(1).to(2).drop();
++                               c.filters().verbs(Verb.PAXOS_PREPARE_REQ.id).from(1).to(3).drop();
++                               c.filters().verbs(Verb.PAXOS_PROPOSE_REQ.id).from(1).to(2).drop();
 +                           },
 +                           failure ->
 +                               failure.get() != null &&
 +                               failure.get()
 +                                      .getMessage()
 +                                      .contains(CasWriteTimeoutException.class.getCanonicalName()),
 +                           "Expecting cause to be CasWriteTimeoutException");
 +    }
 +
 +    private void testWithContention(int testUid,
 +                                    List<Integer> contendingNodes,
-                                     Consumer<Cluster> setupForEachRound,
++                                    Consumer<ICluster> setupForEachRound,
 +                                    Function<AtomicReference<Throwable>, Boolean> expectedException,
 +                                    String assertHintMessage) throws InterruptedException
 +    {
 +        assert contendingNodes.size() == 2;
 +        AtomicInteger curPk = new AtomicInteger(1);
 +        ExecutorService es = Executors.newFixedThreadPool(3);
 +        AtomicReference<Throwable> failure = new AtomicReference<>();
 +        Supplier<Boolean> hasExpectedException = () -> expectedException.apply(failure);
 +        while (!hasExpectedException.get())
 +        {
 +            failure.set(null);
 +            setupForEachRound.accept(cluster);
 +
 +            List<Future<?>> futures = new ArrayList<>();
 +            CountDownLatch latch = new CountDownLatch(3);
 +            contendingNodes.forEach(nodeId -> {
 +                String query = mkCasInsertQuery((a) -> curPk.get(), testUid, nodeId);
 +                futures.add(es.submit(() -> {
 +                    try
 +                    {
 +                        latch.countDown();
 +                        latch.await(1, TimeUnit.SECONDS); // help threads start at approximately same time
 +                        cluster.coordinator(nodeId).execute(query, ConsistencyLevel.QUORUM);
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        failure.set(t);
 +                    }
 +                }));
 +            });
 +
 +            FBUtilities.waitOnFutures(futures);
 +            curPk.incrementAndGet();
 +        }
 +
 +        es.shutdownNow();
 +        es.awaitTermination(1, TimeUnit.MINUTES);
 +        Assert.assertTrue(assertHintMessage, hasExpectedException.get());
 +    }
 +
 +    private void expectCasWriteTimeout()
 +    {
 +        thrown.expect(RuntimeException.class);
 +        thrown.expectCause(new BaseMatcher<Throwable>()
 +        {
 +            public boolean matches(Object item)
 +            {
 +                return InstanceClassLoader.wasLoadedByAnInstanceClassLoader(item.getClass());
 +            }
 +
 +            public void describeTo(Description description)
 +            {
 +                description.appendText("Cause should be loaded by InstanceClassLoader");
 +            }
 +        });
 +        // unable to assert on class becuase the exception thrown was loaded by a differnet classloader, InstanceClassLoader
 +        // therefor asserts the FQCN name present in the message as a workaround
 +        thrown.expectMessage(containsString(CasWriteTimeoutException.class.getCanonicalName()));
 +        thrown.expectMessage(containsString("CAS operation timed out"));
 +    }
 +
 +    @Test
 +    public void testWriteUnknownResult()
 +    {
 +        while (true)
 +        {
 +            cluster.filters().reset();
 +            int pk = pkGen.getAndIncrement();
 +            cluster.filters().verbs(Verb.PAXOS_PROPOSE_REQ.id).from(1).to(3).messagesMatching((from, to, msg) -> {
 +                // Inject a single CAS request in-between prepare and propose phases
 +                cluster.coordinator(2).execute(mkCasInsertQuery((a) -> pk, 1, 2),
 +                                               ConsistencyLevel.QUORUM);
 +                return false;
 +            }).drop();
 +
 +            try
 +            {
 +                cluster.coordinator(1).execute(mkCasInsertQuery((a) -> pk, 1, 1), ConsistencyLevel.QUORUM);
 +            }
 +            catch (Throwable t)
 +            {
 +                Assert.assertTrue("Expecting cause to be CasWriteUncertainException",
 +                                  t.getMessage().contains(CasWriteUnknownResultException.class.getCanonicalName()));
 +                return;
 +            }
 +        }
 +    }
 +
 +    // every invokation returns a query with an unique pk
 +    private String mkUniqueCasInsertQuery(int v)
 +    {
 +        return mkCasInsertQuery(AtomicInteger::getAndIncrement, 1, v);
 +    }
 +
 +    private String mkCasInsertQuery(Function<AtomicInteger, Integer> pkFunc, int ck, int v)
 +    {
 +        String query = String.format("INSERT INTO %s.tbl (pk, ck, v) VALUES (%d, %d, %d) IF NOT EXISTS", KEYSPACE, pkFunc.apply(pkGen), ck, v);
 +        logger.info("Generated query: " + query);
 +        return query;
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
index 7f162e1,0000000..023d02c
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@@ -1,208 -1,0 +1,208 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.util.Collections;
 +import java.util.Set;
 +import java.util.function.Consumer;
 +
 +import com.google.common.collect.ImmutableSet;
 +import org.apache.commons.lang3.ArrayUtils;
 +import org.junit.Assert;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.NodeToolResult;
 +import org.apache.cassandra.distributed.api.QueryResult;
 +import org.apache.cassandra.distributed.api.Row;
 +import org.apache.cassandra.distributed.impl.AbstractCluster;
- import org.apache.cassandra.distributed.impl.IInvokableInstance;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +
 +import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking;
 +
 +public final class DistributedRepairUtils
 +{
 +    public static final int DEFAULT_COORDINATOR = 1;
 +
 +    private DistributedRepairUtils()
 +    {
 +
 +    }
 +
 +    public static NodeToolResult repair(AbstractCluster<?> cluster, RepairType repairType, boolean withNotifications, String... args) {
 +        return repair(cluster, DEFAULT_COORDINATOR, repairType, withNotifications, args);
 +    }
 +
 +    public static NodeToolResult repair(AbstractCluster<?> cluster, int node, RepairType repairType, boolean withNotifications, String... args) {
 +        args = repairType.append(args);
 +        args = ArrayUtils.addAll(new String[] { "repair" }, args);
 +        return cluster.get(node).nodetoolResult(withNotifications, args);
 +    }
 +
 +    public static <I extends IInvokableInstance, C extends AbstractCluster<I>> long getRepairExceptions(C cluster)
 +    {
 +        return getRepairExceptions(cluster, DEFAULT_COORDINATOR);
 +    }
 +
 +    public static <I extends IInvokableInstance, C extends AbstractCluster<I>> long getRepairExceptions(C cluster, int node)
 +    {
 +        return cluster.get(node).callOnInstance(() -> StorageMetrics.repairExceptions.getCount());
 +    }
 +
 +    public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, String ks, String table)
 +    {
 +        return queryParentRepairHistory(cluster, DEFAULT_COORDINATOR, ks, table);
 +    }
 +
 +    public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, int coordinator, String ks, String table)
 +    {
 +        // This is kinda brittle since the caller never gets the ID and can't ask for the ID; it needs to infer the id
 +        // this logic makes the assumption the ks/table pairs are unique (should be or else create should fail) so any
 +        // repair for that pair will be the repair id
 +        Set<String> tableNames = table == null? Collections.emptySet() : ImmutableSet.of(table);
 +
 +        QueryResult rs = retryWithBackoffBlocking(10, () -> cluster.coordinator(coordinator)
 +                                                                   .executeWithResult("SELECT * FROM system_distributed.parent_repair_history", ConsistencyLevel.QUORUM)
 +                                                                   .filter(row -> ks.equals(row.getString("keyspace_name")))
 +                                                                   .filter(row -> tableNames.equals(row.getSet("columnfamily_names"))));
 +        return rs;
 +    }
 +
 +    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks, String table)
 +    {
 +        assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks, table);
 +    }
 +
 +    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks, String table)
 +    {
 +        QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
 +        Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
 +    }
 +
 +    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks)
 +    {
 +        assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks);
 +    }
 +
 +    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks)
 +    {
 +        QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, null);
 +        Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
 +    }
 +
 +    public static void assertParentRepairSuccess(AbstractCluster<?> cluster, String ks, String table)
 +    {
 +        assertParentRepairSuccess(cluster, DEFAULT_COORDINATOR, ks, table);
 +    }
 +
 +    public static void assertParentRepairSuccess(AbstractCluster<?> cluster, int coordinator, String ks, String table)
 +    {
 +        QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
 +        validateExistingParentRepair(rs, row -> {
 +            // check completed
 +            Assert.assertNotNull("finished_at not found, the repair is not complete?", row.getTimestamp("finished_at"));
 +
 +            // check not failed (aka success)
 +            Assert.assertNull("Exception found", row.getString("exception_stacktrace"));
 +            Assert.assertNull("Exception found", row.getString("exception_message"));
 +        });
 +    }
 +
 +    public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, String ks, String table, String message)
 +    {
 +        assertParentRepairFailedWithMessageContains(cluster, DEFAULT_COORDINATOR, ks, table, message);
 +    }
 +
 +    public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, int coordinator, String ks, String table, String message)
 +    {
 +        QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
 +        validateExistingParentRepair(rs, row -> {
 +            // check completed
 +            Assert.assertNotNull("finished_at not found, the repair is not complete?", row.getTimestamp("finished_at"));
 +
 +            // check failed
 +            Assert.assertNotNull("Exception not found", row.getString("exception_stacktrace"));
 +            String exceptionMessage = row.getString("exception_message");
 +            Assert.assertNotNull("Exception not found", exceptionMessage);
 +
 +            Assert.assertTrue("Unable to locate message '" + message + "' in repair error message: " + exceptionMessage, exceptionMessage.contains(message));
 +        });
 +    }
 +
 +    private static void validateExistingParentRepair(QueryResult rs, Consumer<Row> fn)
 +    {
 +        Assert.assertTrue("No rows found", rs.hasNext());
 +        Row row = rs.next();
 +
 +        Assert.assertNotNull("parent_id (which is the primary key) was null", row.getUUID("parent_id"));
 +
 +        fn.accept(row);
 +
 +        // make sure no other records found
 +        Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext());
 +    }
 +
 +    public enum RepairType {
 +        FULL {
 +            public String[] append(String... args)
 +            {
 +                return ArrayUtils.add(args, "--full");
 +            }
 +        },
 +        INCREMENTAL {
 +            public String[] append(String... args)
 +            {
 +                // incremental is the default
 +                return args;
 +            }
 +        },
 +        PREVIEW {
 +            public String[] append(String... args)
 +            {
 +                return ArrayUtils.addAll(args, "--preview");
 +            }
 +        };
 +
 +        public abstract String[] append(String... args);
 +    }
 +
 +    public enum RepairParallelism {
 +        SEQUENTIAL {
 +            public String[] append(String... args)
 +            {
 +                return ArrayUtils.add(args, "--sequential");
 +            }
 +        },
 +        PARALLEL {
 +            public String[] append(String... args)
 +            {
 +                // default is to be parallel
 +                return args;
 +            }
 +        },
 +        DATACENTER_AWARE {
 +            public String[] append(String... args)
 +            {
 +                return ArrayUtils.add(args, "--dc-parallel");
 +            }
 +        };
 +
 +        public abstract String[] append(String... args);
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/ExecUtil.java
index b907626,0000000..b9fbd5c
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ExecUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ExecUtil.java
@@@ -1,51 -1,0 +1,51 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
- package org.apache.cassandra.distributed.impl;
++package org.apache.cassandra.distributed.test;
 +
 +import java.io.Serializable;
 +
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +
 +public class ExecUtil
 +{
 +
 +    public interface ThrowingSerializableRunnable<T extends Throwable> extends Serializable
 +    {
 +        public void run() throws T;
 +    }
 +
 +    public static <T extends Throwable> IIsolatedExecutor.SerializableRunnable rethrow(ThrowingSerializableRunnable<T> run)
 +    {
 +        return () -> {
 +            try
 +            {
 +                run.run();
 +            }
 +            catch (RuntimeException | Error t)
 +            {
 +                throw t;
 +            }
 +            catch (Throwable t)
 +            {
 +                throw new RuntimeException(t);
 +            }
 +        };
 +    }
 +
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
index 25e299c,0000000..bad6d87
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
@@@ -1,346 -1,0 +1,348 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import com.google.common.util.concurrent.Uninterruptibles;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +import org.junit.runners.Parameterized.Parameters;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.ConsistencyLevel;
++import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.Feature;
++import org.apache.cassandra.distributed.api.ICluster;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
- import org.apache.cassandra.distributed.impl.IInvokableInstance;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.impl.InstanceKiller;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 +import org.apache.cassandra.io.util.ChannelProxy;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 +import org.apache.cassandra.service.StorageService;
 +
 +@RunWith(Parameterized.class)
- public class FailingRepairTest extends DistributedTestBase implements Serializable
++public class FailingRepairTest extends TestBaseImpl implements Serializable
 +{
-     private static Cluster CLUSTER;
++    private static ICluster<IInvokableInstance> CLUSTER;
 +
 +    private final Verb messageType;
 +    private final RepairParallelism parallelism;
 +    private final boolean withTracing;
 +    private final SerializableRunnable setup;
 +
 +    public FailingRepairTest(Verb messageType, RepairParallelism parallelism, boolean withTracing, SerializableRunnable setup)
 +    {
 +        this.messageType = messageType;
 +        this.parallelism = parallelism;
 +        this.withTracing = withTracing;
 +        this.setup = setup;
 +    }
 +
 +    @Parameters(name = "{0}/{1}/{2}")
 +    public static Collection<Object[]> messages()
 +    {
 +        List<Object[]> tests = new ArrayList<>();
 +        for (RepairParallelism parallelism : RepairParallelism.values())
 +        {
 +            for (Boolean withTracing : Arrays.asList(Boolean.TRUE, Boolean.FALSE))
 +            {
 +                tests.add(new Object[]{ Verb.VALIDATION_REQ, parallelism, withTracing, failingReaders(Verb.VALIDATION_REQ, parallelism, withTracing) });
 +            }
 +        }
 +        return tests;
 +    }
 +
 +    private static SerializableRunnable failingReaders(Verb type, RepairParallelism parallelism, boolean withTracing)
 +    {
 +        return () -> {
 +            String cfName = getCfName(type, parallelism, withTracing);
 +            ColumnFamilyStore cf = Keyspace.open(KEYSPACE).getColumnFamilyStore(cfName);
 +            cf.forceBlockingFlush();
 +            Set<SSTableReader> remove = cf.getLiveSSTables();
 +            Set<SSTableReader> replace = new HashSet<>();
 +            if (type == Verb.VALIDATION_REQ)
 +            {
 +                for (SSTableReader r : remove)
 +                    replace.add(new FailingSSTableReader(r));
 +            }
 +            else
 +            {
 +                throw new UnsupportedOperationException("verb: " + type);
 +            }
 +            cf.getTracker().removeUnsafe(remove);
 +            cf.addSSTables(replace);
 +        };
 +    }
 +
 +    private static String getCfName(Verb type, RepairParallelism parallelism, boolean withTracing)
 +    {
 +        return type.name().toLowerCase() + "_" + parallelism.name().toLowerCase() + "_" + withTracing;
 +    }
 +
 +    @BeforeClass
 +    public static void setupCluster() throws IOException
 +    {
 +        // streaming requires networking ATM
 +        // streaming also requires gossip or isn't setup properly
-         CLUSTER = init(Cluster.build(2)
-                     .withConfig(c -> c.with(Feature.NETWORK)
-                                       .with(Feature.GOSSIP)
-                                       .set("disk_failure_policy", "die"))
-                     .start());
++        CLUSTER = init(Cluster.build()
++                              .withNodes(2)
++                              .withConfig(c -> c.with(Feature.NETWORK)
++                                             .with(Feature.GOSSIP)
++                                             .set("disk_failure_policy", "die"))
++                              .start());
 +    }
 +
 +    @AfterClass
-     public static void teardownCluster()
++    public static void teardownCluster() throws Exception
 +    {
 +        if (CLUSTER != null)
 +            CLUSTER.close();
 +    }
 +
 +    @Before
 +    public void cleanupState()
 +    {
 +        for (int i = 1; i <= CLUSTER.size(); i++)
-             CLUSTER.get(i).runOnInstance(() -> InstanceKiller.clear());
++            CLUSTER.get(i).runOnInstance(InstanceKiller::clear);
 +    }
 +
 +    @Test(timeout = 10 * 60 * 1000)
 +    public void testFailingMessage() throws IOException
 +    {
 +        final int replica = 1;
 +        final int coordinator = 2;
 +        String tableName = getCfName(messageType, parallelism, withTracing);
 +        String fqtn = KEYSPACE + "." + tableName;
 +
 +        CLUSTER.schemaChange("CREATE TABLE " + fqtn + " (k INT, PRIMARY KEY (k))");
 +
 +        // create data which will NOT conflict
 +        int lhsOffset = 10;
 +        int rhsOffset = 20;
 +        int limit = rhsOffset + (rhsOffset - lhsOffset);
 +
 +        // setup data which is consistent on both sides
 +        for (int i = 0; i < lhsOffset; i++)
 +            CLUSTER.coordinator(replica)
 +                   .execute("INSERT INTO " + fqtn + " (k) VALUES (?)", ConsistencyLevel.ALL, i);
 +
 +        // create data on LHS which does NOT exist in RHS
 +        for (int i = lhsOffset; i < rhsOffset; i++)
 +            CLUSTER.get(replica).executeInternal("INSERT INTO " + fqtn + " (k) VALUES (?)", i);
 +
 +        // create data on RHS which does NOT exist in LHS
 +        for (int i = rhsOffset; i < limit; i++)
 +            CLUSTER.get(coordinator).executeInternal("INSERT INTO " + fqtn + " (k) VALUES (?)", i);
 +
 +        // at this point, the two nodes should be out of sync, so confirm missing data
 +        // node 1
 +        Object[][] node1Records = toRows(IntStream.range(0, rhsOffset));
 +        Object[][] node1Actuals = toNaturalOrder(CLUSTER.get(replica).executeInternal("SELECT k FROM " + fqtn));
 +        Assert.assertArrayEquals(node1Records, node1Actuals);
 +
 +        // node 2
 +        Object[][] node2Records = toRows(IntStream.concat(IntStream.range(0, lhsOffset), IntStream.range(rhsOffset, limit)));
 +        Object[][] node2Actuals = toNaturalOrder(CLUSTER.get(coordinator).executeInternal("SELECT k FROM " + fqtn));
 +        Assert.assertArrayEquals(node2Records, node2Actuals);
 +
 +        // Inject the failure
 +        CLUSTER.get(replica).runOnInstance(() -> setup.run());
 +
 +        // run a repair which is expected to fail
 +        List<String> repairStatus = CLUSTER.get(coordinator).callOnInstance(() -> {
 +            // need all ranges on the host
 +            String ranges = StorageService.instance.getLocalAndPendingRanges(KEYSPACE).stream()
 +                                                   .map(r -> r.left + ":" + r.right)
 +                                                   .collect(Collectors.joining(","));
 +            Map<String, String> args = new HashMap<String, String>()
 +            {{
 +                put(RepairOption.PARALLELISM_KEY, parallelism.getName());
 +                put(RepairOption.PRIMARY_RANGE_KEY, "false");
 +                put(RepairOption.INCREMENTAL_KEY, "false");
 +                put(RepairOption.TRACE_KEY, Boolean.toString(withTracing));
 +                put(RepairOption.PULL_REPAIR_KEY, "false");
 +                put(RepairOption.FORCE_REPAIR_KEY, "false");
 +                put(RepairOption.RANGES_KEY, ranges);
 +                put(RepairOption.COLUMNFAMILIES_KEY, tableName);
 +            }};
 +            int cmd = StorageService.instance.repairAsync(KEYSPACE, args);
 +            Assert.assertFalse("repair return status was 0, expected non-zero return status, 0 indicates repair not submitted", cmd == 0);
 +            List<String> status;
 +            do
 +            {
 +                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +                status = StorageService.instance.getParentRepairStatus(cmd);
 +            } while (status == null || status.get(0).equals(ParentRepairStatus.IN_PROGRESS.name()));
 +
 +            return status;
 +        });
 +        Assert.assertEquals(repairStatus.toString(), ParentRepairStatus.FAILED, ParentRepairStatus.valueOf(repairStatus.get(0)));
 +
 +        // its possible that the coordinator gets the message that the replica failed before the replica completes
 +        // shutting down; this then means that isKilled could be updated after the fact
 +        IInvokableInstance replicaInstance = CLUSTER.get(replica);
 +        while (replicaInstance.killAttempts() <= 0)
 +            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
 +
 +        Assert.assertEquals("replica should be killed", 1, replicaInstance.killAttempts());
 +        Assert.assertEquals("coordinator should not be killed", 0, CLUSTER.get(coordinator).killAttempts());
 +    }
 +
 +    private static Object[][] toNaturalOrder(Object[][] actuals)
 +    {
 +        // data is returned in token order, so rather than try to be fancy and order expected in token order
 +        // convert it to natural
 +        int[] values = new int[actuals.length];
 +        for (int i = 0; i < values.length; i++)
 +            values[i] = (Integer) actuals[i][0];
 +        Arrays.sort(values);
 +        return toRows(IntStream.of(values));
 +    }
 +
 +    private static Object[][] toRows(IntStream values)
 +    {
 +        return values
 +               .mapToObj(v -> new Object[]{ v })
 +               .toArray(Object[][]::new);
 +    }
 +
 +    private static final class FailingSSTableReader extends ForwardingSSTableReader
 +    {
 +
 +        private FailingSSTableReader(SSTableReader delegate)
 +        {
 +            super(delegate);
 +        }
 +
 +        public ISSTableScanner getScanner()
 +        {
 +            return new FailingISSTableScanner();
 +        }
 +
 +        public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
 +        {
 +            return new FailingISSTableScanner();
 +        }
 +
 +        public ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
 +        {
 +            return new FailingISSTableScanner();
 +        }
 +
 +        public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener)
 +        {
 +            return new FailingISSTableScanner();
 +        }
 +
 +        public ChannelProxy getDataChannel()
 +        {
 +            throw new RuntimeException();
 +        }
 +
 +        public String toString()
 +        {
 +            return "FailingSSTableReader[" + super.toString() + "]";
 +        }
 +    }
 +
 +    private static final class FailingISSTableScanner implements ISSTableScanner
 +    {
 +        public long getLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        public long getCompressedLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        public long getCurrentPosition()
 +        {
 +            return 0;
 +        }
 +
 +        public long getBytesScanned()
 +        {
 +            return 0;
 +        }
 +
 +        public Set<SSTableReader> getBackingSSTables()
 +        {
 +            return Collections.emptySet();
 +        }
 +
 +        public TableMetadata metadata()
 +        {
 +            return null;
 +        }
 +
 +        public void close()
 +        {
 +
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            throw new CorruptSSTableException(new IOException("Test commands it"), "mahahahaha!");
 +        }
 +
 +        public UnfilteredRowIterator next()
 +        {
 +            throw new CorruptSSTableException(new IOException("Test commands it"), "mahahahaha!");
 +        }
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/LargeColumnTest.java
index 0c28d38,0000000..0a81359
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/LargeColumnTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/LargeColumnTest.java
@@@ -1,96 -1,0 +1,97 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.util.Random;
 +import java.util.concurrent.ThreadLocalRandom;
- import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ICluster;
 +
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +
- public class LargeColumnTest extends DistributedTestBase
++// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
++public class LargeColumnTest extends TestBaseImpl
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(LargeColumnTest.class);
++
 +    private static String str(int length, Random random, long seed)
 +    {
 +        random.setSeed(seed);
 +        char[] chars = new char[length];
 +        int i = 0;
 +        int s = 0;
 +        long v = 0;
 +        while (i < length)
 +        {
 +            if (s == 0)
 +            {
 +                v = random.nextLong();
 +                s = 8;
 +            }
 +            chars[i] = (char) (((v & 127) + 32) & 127);
 +            v >>= 8;
 +            --s;
 +            ++i;
 +        }
 +        return new String(chars);
 +    }
 +
 +    private void testLargeColumns(int nodes, int columnSize, int rowCount) throws Throwable
 +    {
 +        Random random = new Random();
 +        long seed = ThreadLocalRandom.current().nextLong();
 +        logger.info("Using seed {}", seed);
 +
-         try (Cluster cluster = init(Cluster.build(nodes)
-                                            .withConfig(config ->
-                                                        config.set("commitlog_segment_size_in_mb", (columnSize * 3) >> 20)
-                                                              .set("internode_application_send_queue_reserve_endpoint_capacity_in_bytes", columnSize * 2)
-                                                              .set("internode_application_send_queue_reserve_global_capacity_in_bytes", columnSize * 3)
-                                                              .set("write_request_timeout_in_ms", SECONDS.toMillis(30L))
-                                                              .set("read_request_timeout_in_ms", SECONDS.toMillis(30L))
-                                                              .set("memtable_heap_space_in_mb", 1024)
-                                            )
-                                            .start()))
++        try (ICluster cluster = init(builder()
++                                     .withNodes(nodes)
++                                     .withConfig(config ->
++                                                 config.set("commitlog_segment_size_in_mb", (columnSize * 3) >> 20)
++                                                       .set("internode_application_send_queue_reserve_endpoint_capacity_in_bytes", columnSize * 2)
++                                                       .set("internode_application_send_queue_reserve_global_capacity_in_bytes", columnSize * 3)
++                                                       .set("write_request_timeout_in_ms", SECONDS.toMillis(30L))
++                                                       .set("read_request_timeout_in_ms", SECONDS.toMillis(30L))
++                                                       .set("memtable_heap_space_in_mb", 1024)
++                                     )
++                                     .start()))
 +        {
 +            cluster.schemaChange(String.format("CREATE TABLE %s.cf (k int, c text, PRIMARY KEY (k))", KEYSPACE));
 +
-             for (int i = 0 ; i < rowCount ; ++i)
++            for (int i = 0; i < rowCount; ++i)
 +                cluster.coordinator(1).execute(String.format("INSERT INTO %s.cf (k, c) VALUES (?, ?);", KEYSPACE), ConsistencyLevel.ALL, i, str(columnSize, random, seed | i));
 +
-             for (int i = 0 ; i < rowCount ; ++i)
++            for (int i = 0; i < rowCount; ++i)
 +            {
 +                Object[][] results = cluster.coordinator(1).execute(String.format("SELECT k, c FROM %s.cf WHERE k = ?;", KEYSPACE), ConsistencyLevel.ALL, i);
 +                Assert.assertTrue(str(columnSize, random, seed | i).equals(results[0][1]));
 +            }
 +        }
 +    }
 +
 +    @Test
 +    public void test() throws Throwable
 +    {
 +        testLargeColumns(2, 16 << 20, 5);
 +    }
- 
- }
++}
diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 814e229,bb8d7fb..bd09891
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@@ -29,20 -27,17 +30,22 @@@ import com.google.common.collect.Sets
  import org.junit.Assert;
  import org.junit.Test;
  
- import org.apache.cassandra.db.ConsistencyLevel;
  import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ICluster;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
  import org.apache.cassandra.distributed.api.IIsolatedExecutor;
  import org.apache.cassandra.distributed.api.IMessage;
  import org.apache.cassandra.distributed.api.IMessageFilters;
  import org.apache.cassandra.distributed.impl.Instance;
- import org.apache.cassandra.distributed.impl.MessageFilters;
+ import org.apache.cassandra.distributed.shared.MessageFilters;
 -import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.NoPayload;
 +import org.apache.cassandra.net.Verb;
  
- public class MessageFiltersTest extends DistributedTestBase
+ public class MessageFiltersTest extends TestBaseImpl
  {
      @Test
      public void simpleInboundFiltersTest()
@@@ -136,17 -132,16 +139,21 @@@
              public byte[] bytes() { return msg.getBytes(); }
              public int id() { return 0; }
              public int version() { return 0;  }
-             public InetAddressAndPort from() { return null; }
+             public InetSocketAddress from() { return null; }
++            public int fromPort()
++            {
++                return 0;
++            }
          };
      }
 +
      @Test
      public void testFilters() throws Throwable
      {
          String read = "SELECT * FROM " + KEYSPACE + ".tbl";
          String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)";
  
--        try (Cluster cluster = Cluster.create(2))
++        try (ICluster cluster = builder().withNodes(2).start())
          {
              cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
              cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
@@@ -176,83 -171,42 +183,84 @@@
          String read = "SELECT * FROM " + KEYSPACE + ".tbl";
          String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)";
  
--        try (Cluster cluster = Cluster.create(2))
++        try (ICluster<IInvokableInstance> cluster = builder().withNodes(2).start())
          {
              cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
              cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
  
              AtomicInteger counter = new AtomicInteger();
  
 -            Set<Integer> verbs = new HashSet<>(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
 -                                                             MessagingService.Verb.MUTATION.ordinal()));
 +            Set<Integer> verbs = Sets.newHashSet(Arrays.asList(Verb.RANGE_REQ.id,
 +                                                               Verb.RANGE_RSP.id,
 +                                                               Verb.MUTATION_REQ.id,
 +                                                               Verb.MUTATION_RSP.id));
  
 -            // Reads and writes are going to time out in both directions
 -            IMessageFilters.Filter filter = cluster.filters()
 -                                                   .allVerbs()
 -                                                   .from(1)
 -                                                   .to(2)
 -                                                   .messagesMatching((from, to, msg) -> {
 -                                                       // Decode and verify message on instance; return the result back here
 -                                                       Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> {
 -                                                           MessageIn decoded = Instance.deserializeMessage(msg);
 -                                                           if (decoded != null)
 -                                                               return (Integer) decoded.verb.ordinal();
 -                                                           return -1;
 -                                                       }).call();
 -                                                       if (id > 0)
 +            for (boolean inbound : Arrays.asList(true, false))
 +            {
 +                counter.set(0);
 +                // Reads and writes are going to time out in both directions
 +                IMessageFilters.Filter filter = cluster.filters()
 +                                                       .allVerbs()
 +                                                       .inbound(inbound)
 +                                                       .from(1)
 +                                                       .to(2)
 +                                                       .messagesMatching((from, to, msg) -> {
 +                                                           // Decode and verify message on instance; return the result back here
 +                                                           Integer id = cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>) () -> {
 +                                                               Message decoded = Instance.deserializeMessage(msg);
 +                                                               return (Integer) decoded.verb().id;
 +                                                           }).call();
                                                             Assert.assertTrue(verbs.contains(id));
 -                                                       counter.incrementAndGet();
 -                                                       return false;
 -                                                   }).drop();
 +                                                           counter.incrementAndGet();
 +                                                           return false;
 +                                                       }).drop();
  
 -            for (int i : new int[]{ 1, 2 })
 -                cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
 -            for (int i : new int[]{ 1, 2 })
 -                cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
 +                for (int i : new int[]{ 1, 2 })
 +                    cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
 +                for (int i : new int[]{ 1, 2 })
 +                    cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
 +
 +                filter.off();
 +                Assert.assertEquals(4, counter.get());
 +            }
 +        }
 +    }
 +
 +    @Test
 +    public void outboundBeforeInbound() throws Throwable
 +    {
 +        try (Cluster cluster = Cluster.create(2))
 +        {
-             InetAddressAndPort other = cluster.get(2).broadcastAddressAndPort();
++            InetAddressAndPort other = InetAddressAndPort.getByAddressOverrideDefaults(cluster.get(2).broadcastAddress().getAddress(),
++                                                                                       cluster.get(2).broadcastAddress().getPort());
 +            CountDownLatch waitForIt = new CountDownLatch(1);
 +            Set<Integer> outboundMessagesSeen = new HashSet<>();
 +            Set<Integer> inboundMessagesSeen = new HashSet<>();
 +            AtomicBoolean outboundAfterInbound = new AtomicBoolean(false);
 +            cluster.filters().outbound().verbs(Verb.ECHO_REQ.id, Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
 +                outboundMessagesSeen.add(msg.verb());
 +                if (inboundMessagesSeen.contains(msg.verb()))
 +                    outboundAfterInbound.set(true);
 +                return false;
 +            }).drop(); // drop is confusing since I am not dropping, im just listening...
 +            cluster.filters().inbound().verbs(Verb.ECHO_REQ.id, Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
 +                inboundMessagesSeen.add(msg.verb());
 +                return false;
 +            }).drop(); // drop is confusing since I am not dropping, im just listening...
 +            cluster.filters().inbound().verbs(Verb.ECHO_RSP.id).messagesMatching((from, to, msg) -> {
 +                waitForIt.countDown();
 +                return false;
 +            }).drop(); // drop is confusing since I am not dropping, im just listening...
 +            cluster.get(1).runOnInstance(() -> {
 +                MessagingService.instance().send(Message.out(Verb.ECHO_REQ, NoPayload.noPayload), other);
 +            });
 +
 +            waitForIt.await();
  
 -            filter.off();
 -            Assert.assertEquals(4, counter.get());
 +            Assert.assertEquals(outboundMessagesSeen, inboundMessagesSeen);
 +            // since both are equal, only need to confirm the size of one
 +            Assert.assertEquals(2, outboundMessagesSeen.size());
 +            Assert.assertFalse("outbound message saw after inbound", outboundAfterInbound.get());
          }
      }
  
diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
index 17bd6e1,61ccb5f..153d7de
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
@@@ -32,8 -31,8 +32,8 @@@ import java.util.stream.Stream
  import org.junit.Assert;
  import org.junit.Test;
  
- import org.apache.cassandra.db.ConsistencyLevel;
 -import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.impl.IsolatedExecutor;
  import org.apache.cassandra.distributed.impl.TracingUtil;
  import org.apache.cassandra.utils.UUIDGen;
@@@ -45,31 -44,29 +45,31 @@@ public class MessageForwardingTest exte
      {
          String originalTraceTimeout = TracingUtil.setWaitForTracingEventTimeoutSecs("1");
          final int numInserts = 100;
-         Map<InetAddress,Integer> forwardFromCounts = new HashMap<>();
--        Map<InetAddress,Integer> commitCounts = new HashMap<>();
++        Map<InetAddress, Integer> forwardFromCounts = new HashMap<>();
++        Map<InetAddress, Integer> commitCounts = new HashMap<>();
  
--        try (Cluster cluster = init(Cluster.build()
--                                           .withDC("dc0", 1)
--                                           .withDC("dc1", 3)
--                                           .start()))
++        try (Cluster cluster = (Cluster) init(builder()
++                                              .withDC("dc0", 1)
++                                              .withDC("dc1", 3)
++                                              .start()))
          {
              cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
  
-             cluster.forEach(instance -> commitCounts.put(instance.broadcastAddressAndPort().address, 0));
+             cluster.forEach(instance -> commitCounts.put(instance.broadcastAddress().getAddress(), 0));
              final UUID sessionId = UUIDGen.getTimeUUID();
--            Stream<Future<Object[][]>> inserts = IntStream.range(0, numInserts).mapToObj((idx) ->
--                cluster.coordinator(1).asyncExecuteWithTracing(sessionId,
--                                                         "INSERT INTO " + KEYSPACE + ".tbl(pk,ck,v) VALUES (1, 1, 'x')",
-                                                          ConsistencyLevel.ALL)
 -                                                               ConsistencyLevel.ALL)
--            );
++            Stream<Future<Object[][]>> inserts = IntStream.range(0, numInserts).mapToObj((idx) -> {
++                return cluster.coordinator(1).asyncExecuteWithTracing(sessionId,
++                                                                      "INSERT INTO " + KEYSPACE + ".tbl(pk,ck,v) VALUES (1, 1, 'x')",
++                                                                      ConsistencyLevel.ALL);
++            });
  
              // Wait for each of the futures to complete before checking the traces, don't care
              // about the result so
              //noinspection ResultOfMethodCallIgnored
 -            inserts.map(IsolatedExecutor::waitOn).count();
 +            inserts.map(IsolatedExecutor::waitOn).collect(Collectors.toList());
  
-             cluster.stream("dc1").forEach(instance -> forwardFromCounts.put(instance.broadcastAddressAndPort().address, 0));
-             cluster.forEach(instance -> commitCounts.put(instance.broadcastAddressAndPort().address, 0));
++            cluster.stream("dc1").forEach(instance -> forwardFromCounts.put(instance.broadcastAddress().getAddress(), 0));
+             cluster.forEach(instance -> commitCounts.put(instance.broadcastAddress().getAddress(), 0));
              List<TracingUtil.TraceEntry> traces = TracingUtil.getTrace(cluster, sessionId, ConsistencyLevel.ALL);
              traces.forEach(traceEntry -> {
                  if (traceEntry.activity.contains("Appending to commitlog"))
@@@ -100,4 -89,4 +100,4 @@@
              TracingUtil.setWaitForTracingEventTimeoutSecs(originalTraceTimeout);
          }
      }
--}
++}
diff --cc test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
index e209d1d,1d78152..1a1bdc7
--- a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
@@@ -20,7 -20,7 +20,7 @@@ package org.apache.cassandra.distribute
  
  import org.junit.Test;
  
--import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ICluster;
  
  import static org.junit.Assert.assertEquals;
  
@@@ -29,7 -29,7 +29,7 @@@ public class NodeToolTest extends TestB
      @Test
      public void test() throws Throwable
      {
--        try (Cluster cluster = init(Cluster.create(1)))
++        try (ICluster cluster = init(builder().withNodes(1).start()))
          {
              assertEquals(0, cluster.get(1).nodetool("help"));
              assertEquals(0, cluster.get(1).nodetool("flush"));
diff --cc test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index c712948,0000000..30c8f25
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@@ -1,281 -1,0 +1,281 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import com.google.common.collect.ImmutableList;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICoordinator;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.distributed.api.IMessage;
 +import org.apache.cassandra.distributed.api.IMessageFilters;
++import org.apache.cassandra.distributed.shared.RepairResult;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.FBUtilities;
- import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.SimpleCondition;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
- public class PreviewRepairTest extends DistributedTestBase
++public class PreviewRepairTest extends TestBaseImpl
 +{
 +    /**
 +     * makes sure that the repaired sstables are not matching on the two
 +     * nodes by disabling autocompaction on node2 and then running an
 +     * incremental repair
 +     */
 +    @Test
 +    public void testWithMismatchingPending() throws Throwable
 +    {
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            // make sure that all sstables have moved to repaired by triggering a compaction
 +            // also disables autocompaction on the nodes
 +            cluster.forEach((node) -> node.runOnInstance(() -> {
 +                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
 +                FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
 +                cfs.disableAutoCompaction();
 +            }));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            // now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired
 +            cluster.get(1).runOnInstance(() -> {
 +                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
 +                cfs.enableAutoCompaction();
 +                FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
 +            });
-             Pair<Boolean, Boolean> rs = cluster.get(1).callOnInstance(repair(options(true)));
-             assertTrue(rs.left); // preview repair should succeed
-             assertFalse(rs.right); // and we should see no mismatches
++            RepairResult rs = cluster.get(1).callOnInstance(repair(options(true)));
++            assertTrue(rs.success); // preview repair should succeed
++            assertFalse(rs.wasInconsistent); // and we should see no mismatches
 +        }
 +    }
 +
 +    /**
 +     * another case where the repaired datasets could mismatch is if an incremental repair finishes just as the preview
 +     * repair is starting up.
 +     *
 +     * This tests this case:
 +     * 1. we start a preview repair
 +     * 2. pause the validation requests from node1 -> node2
 +     * 3. node1 starts its validation
 +     * 4. run an incremental repair which completes fine
 +     * 5. node2 resumes its validation
 +     *
 +     * Now we will include sstables from the second incremental repair on node2 but not on node1
 +     * This should fail since we fail any preview repair which is ongoing when an incremental repair finishes (step 4 above)
 +     */
 +    @Test
 +    public void testFinishingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
 +    {
 +        ExecutorService es = Executors.newSingleThreadExecutor();
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            SimpleCondition continuePreviewRepair = new SimpleCondition();
 +            DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
 +            // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
 +            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 +
-             Future<Pair<Boolean, Boolean>> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
++            Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
 +            Thread.sleep(1000);
 +            // this needs to finish before the preview repair is unpaused on node2
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            continuePreviewRepair.signalAll();
-             Pair<Boolean, Boolean> rs = rsFuture.get();
-             assertFalse(rs.left); // preview repair should have failed
-             assertFalse(rs.right); // and no mismatches should have been reported
++            RepairResult rs = rsFuture.get();
++            assertFalse(rs.success); // preview repair should have failed
++            assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
 +    /**
 +     * Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair
 +     * so both preview and incremental repair should finish fine (without any mismatches)
 +     */
 +    @Test
 +    public void testFinishingNonIntersectingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
 +    {
 +        ExecutorService es = Executors.newSingleThreadExecutor();
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
-             assertTrue(cluster.get(1).callOnInstance(repair(options(false))).left);
++            assertTrue(cluster.get(1).callOnInstance(repair(options(false))).success);
 +
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            // pause preview repair validation messages on node2 until node1 has finished
 +            SimpleCondition continuePreviewRepair = new SimpleCondition();
 +            DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
 +            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 +
 +            // get local ranges to repair two separate ranges:
 +            List<String> localRanges = cluster.get(1).callOnInstance(() -> {
 +                List<String> res = new ArrayList<>();
 +                for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
 +                    res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
 +                return res;
 +            });
 +
 +            assertEquals(2, localRanges.size());
-             Future<Pair<Boolean, Boolean>> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
++            Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
 +            Thread.sleep(1000); // wait for node1 to start validation compaction
 +            // this needs to finish before the preview repair is unpaused on node2
-             assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).left);
++            assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).success);
 +
 +            continuePreviewRepair.signalAll();
-             Pair<Boolean, Boolean> rs = repairStatusFuture.get();
-             assertTrue(rs.left); // repair should succeed
-             assertFalse(rs.right); // and no mismatches
++            RepairResult rs = repairStatusFuture.get();
++            assertTrue(rs.success); // repair should succeed
++            assertFalse(rs.wasInconsistent); // and no mismatches
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
 +    private static class DelayMessageFilter implements IMessageFilters.Matcher
 +    {
 +        private final SimpleCondition condition;
 +        private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
 +
 +        public DelayMessageFilter(SimpleCondition condition)
 +        {
 +            this.condition = condition;
 +        }
 +        public boolean matches(int from, int to, IMessage message)
 +        {
 +            try
 +            {
 +                // only the first validation req should be delayed:
 +                if (waitForRepair.compareAndSet(true, false))
 +                    condition.await();
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            return false; // don't drop the message
 +        }
 +    }
 +
 +    private static void insert(ICoordinator coordinator, int start, int count)
 +    {
 +        for (int i = start; i < start + count; i++)
 +            coordinator.execute("insert into " + KEYSPACE + ".tbl (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
 +    }
 +
 +    /**
 +     * returns a pair with [repair success, was inconsistent]
 +     */
-     private static IIsolatedExecutor.SerializableCallable<Pair<Boolean, Boolean>> repair(Map<String, String> options)
++    private static IIsolatedExecutor.SerializableCallable<RepairResult> repair(Map<String, String> options)
 +    {
 +        return () -> {
 +            SimpleCondition await = new SimpleCondition();
 +            AtomicBoolean success = new AtomicBoolean(true);
 +            AtomicBoolean wasInconsistent = new AtomicBoolean(false);
 +            StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
 +                if (event.getType() == ProgressEventType.ERROR)
 +                {
 +                    success.set(false);
 +                    await.signalAll();
 +                }
 +                else if (event.getType() == ProgressEventType.NOTIFICATION && event.getMessage().contains("Repaired data is inconsistent"))
 +                {
 +                    wasInconsistent.set(true);
 +                }
 +                else if (event.getType() == ProgressEventType.COMPLETE)
 +                    await.signalAll();
 +            }));
 +            try
 +            {
 +                await.await(1, TimeUnit.MINUTES);
 +            }
 +            catch (InterruptedException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
-             return Pair.create(success.get(), wasInconsistent.get());
++            return new RepairResult(success.get(), wasInconsistent.get());
 +        };
 +    }
 +
 +    private static Map<String, String> options(boolean preview)
 +    {
 +        Map<String, String> config = new HashMap<>();
 +        config.put(RepairOption.INCREMENTAL_KEY, "true");
 +        config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString());
 +        if (preview)
 +            config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
 +        return config;
 +    }
 +
 +    private static Map<String, String> options(boolean preview, String range)
 +    {
 +        Map<String, String> options = options(preview);
 +        options.put(RepairOption.RANGES_KEY, range);
 +        return options;
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java
index 9908fcc,0000000..9202cb4
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java
@@@ -1,92 -1,0 +1,95 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.Predicate;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.cql3.QueryOptions;
- import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.Feature;
++import org.apache.cassandra.distributed.api.ICluster;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.fqltool.FQLQuery;
 +import org.apache.cassandra.fqltool.QueryReplayer;
 +
- public class QueryReplayerEndToEndTest extends DistributedTestBase
++public class QueryReplayerEndToEndTest extends TestBaseImpl
 +{
 +    private final AtomicLong queryStartTimeGenerator = new AtomicLong(1000);
 +    private final AtomicInteger ckGenerator = new AtomicInteger(1);
 +
 +    @Test
 +    public void testReplayAndCloseMultipleTimes() throws Throwable
 +    {
-         try (Cluster cluster = init(Cluster.create(3, conf -> conf.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP, Feature.NETWORK))))
++        try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(3)
++                                                                  .withConfig(conf -> conf.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP, Feature.NETWORK))
++                                                                  .start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +            List<String> hosts = cluster.stream()
-                                         .map(i -> i.config().broadcastAddressAndPort().address.getHostAddress())
++                                        .map(i -> i.config().broadcastAddress().getAddress().getHostAddress())
 +                                        .collect(Collectors.toList());
 +
 +            final int queriesCount = 3;
 +            // replay for the first time, it should pass
 +            replayAndClose(Collections.singletonList(makeFQLQueries(queriesCount)), hosts);
 +            // replay for the second time, it should pass too
 +            // however, if the cached sessions are not released, the second replay will reused the closed sessions from previous replay and fail to insert
 +            replayAndClose(Collections.singletonList(makeFQLQueries(queriesCount)), hosts);
 +            Object[][] result = cluster.coordinator(1)
 +                                       .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 +                                                ConsistencyLevel.QUORUM, 1);
 +            Assert.assertEquals(String.format("Expecting to see %d rows since it replayed twice and each with %d queries", queriesCount * 2, queriesCount),
 +                                queriesCount * 2, result.length);
 +        }
 +    }
 +
 +    private void replayAndClose(List<List<FQLQuery>> allFqlQueries, List<String> hosts) throws IOException
 +    {
 +        List<Predicate<FQLQuery>> allowAll = Collections.singletonList(fqlQuery -> true);
 +        try (QueryReplayer queryReplayer = new QueryReplayer(allFqlQueries.iterator(), hosts, null, allowAll, null))
 +        {
 +            queryReplayer.replay();
 +        }
 +    }
 +
 +    // generate a new list of FQLQuery for each invocation
 +    private List<FQLQuery> makeFQLQueries(int n)
 +    {
 +        return IntStream.range(0, n)
 +                        .boxed()
 +                        .map(i -> new FQLQuery.Single(KEYSPACE,
 +                                                      QueryOptions.DEFAULT.getProtocolVersion().asInt(),
 +                                                      QueryOptions.DEFAULT, queryStartTimeGenerator.incrementAndGet(),
 +                                                      2222,
 +                                                      3333,
 +                                                      String.format("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, %d, %d)", KEYSPACE, ckGenerator.incrementAndGet(), i),
 +                                                      Collections.emptyList()))
 +                        .collect(Collectors.toList());
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
index a8c5fa1,0000000..2a67476
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
@@@ -1,125 -1,0 +1,129 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
++import java.net.InetSocketAddress;
 +import java.util.List;
 +
 +import org.junit.Test;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.marshal.Int32Type;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ICluster;
++import org.apache.cassandra.distributed.shared.NetworkTopology;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.service.PendingRangeCalculatorService;
 +import org.apache.cassandra.service.StorageService;
 +
 +import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
 +import static org.apache.cassandra.net.Verb.READ_REQ;
++import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 +
- public class ReadRepairTest extends DistributedTestBase
++public class ReadRepairTest extends TestBaseImpl
 +{
 +
 +    @Test
 +    public void readRepairTest() throws Throwable
 +    {
-         try (Cluster cluster = init(Cluster.create(3)))
++        try (ICluster cluster = init(builder().withNodes(3).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
 +
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +
 +            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
 +
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 +                                                      ConsistencyLevel.ALL),
 +                       row(1, 1, 1));
 +
 +            // Verify that data got repaired to the third node
 +            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
 +                       row(1, 1, 1));
 +        }
 +    }
 +
 +    @Test
 +    public void failingReadRepairTest() throws Throwable
 +    {
-         try (Cluster cluster = init(Cluster.create(3)))
++        try (ICluster cluster = init(builder().withNodes(3).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
 +
 +            for (int i = 1 ; i <= 2 ; ++i)
 +                cluster.get(i).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +
 +            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
 +
-             cluster.verbs(READ_REPAIR_REQ).to(3).drop();
++            cluster.filters().verbs(READ_REPAIR_REQ.id).to(3).drop();
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 +                                                      ConsistencyLevel.QUORUM),
 +                       row(1, 1, 1));
 +
 +            // Data was not repaired
 +            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
 +        }
 +    }
 +
 +    @Test
 +    public void movingTokenReadRepairTest() throws Throwable
 +    {
-         try (Cluster cluster = init(Cluster.create(4), 3))
++        try (Cluster cluster = (Cluster) init(Cluster.create(4), 3))
 +        {
 +            List<Token> tokens = cluster.tokens();
 +
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
 +
 +            int i = 0;
 +            while (true)
 +            {
 +                Token t = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(i));
 +                if (t.compareTo(tokens.get(2 - 1)) < 0 && t.compareTo(tokens.get(1 - 1)) > 0)
 +                    break;
 +                ++i;
 +            }
 +
 +            // write only to #4
 +            cluster.get(4).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1)", i);
 +            // mark #2 as leaving in #4
-             cluster.get(4).acceptsOnInstance((InetAddressAndPort endpoint) -> {
-                 StorageService.instance.getTokenMetadata().addLeavingEndpoint(endpoint);
++            cluster.get(4).acceptsOnInstance((InetSocketAddress endpoint) -> {
++                StorageService.instance.getTokenMetadata().addLeavingEndpoint(InetAddressAndPort.getByAddressOverrideDefaults(endpoint.getAddress(), endpoint.getPort()));
 +                PendingRangeCalculatorService.instance.update();
 +                PendingRangeCalculatorService.instance.blockUntilFinished();
-             }).accept(cluster.get(2).broadcastAddressAndPort());
++            }).accept(cluster.get(2).broadcastAddress());
 +
 +            // prevent #4 from reading or writing to #3, so our QUORUM must contain #2 and #4
 +            // since #1 is taking over the range, this means any read-repair must make it to #1 as well
 +            cluster.filters().verbs(READ_REQ.ordinal()).from(4).to(3).drop();
 +            cluster.filters().verbs(READ_REPAIR_REQ.ordinal()).from(4).to(3).drop();
 +            assertRows(cluster.coordinator(4).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 +                                                      ConsistencyLevel.ALL, i),
 +                       row(i, 1, 1));
 +
 +            // verify that #1 receives the write
 +            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", i),
 +                       row(i, 1, 1));
 +        }
 +    }
 +
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
index 4651376,0000000..fc058db
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorBase.java
@@@ -1,102 -1,0 +1,102 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.runners.Parameterized;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.Feature;
 +import org.apache.cassandra.distributed.api.NodeToolResult;
 +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
 +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 +
- public class RepairCoordinatorBase extends DistributedTestBase
++public class RepairCoordinatorBase extends TestBaseImpl
 +{
 +    protected static Cluster CLUSTER;
 +
 +    protected final RepairType repairType;
 +    protected final RepairParallelism parallelism;
 +    protected final boolean withNotifications;
 +
 +    public RepairCoordinatorBase(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
 +    {
 +        this.repairType = repairType;
 +        this.parallelism = parallelism;
 +        this.withNotifications = withNotifications;
 +    }
 +
 +    @Parameterized.Parameters(name = "{0}/{1}")
 +    public static Collection<Object[]> testsWithoutType()
 +    {
 +        List<Object[]> tests = new ArrayList<>();
 +        for (RepairParallelism p : RepairParallelism.values())
 +        {
 +            tests.add(new Object[] { p, true });
 +            tests.add(new Object[] { p, false });
 +        }
 +        return tests;
 +    }
 +
 +    @BeforeClass
 +    public static void before()
 +    {
 +        // This only works because the way CI works
 +        // In CI a new JVM is spun up for each test file, so this doesn't have to worry about another test file
 +        // getting this set first
 +        System.setProperty("cassandra.nodetool.jmx_notification_poll_interval_seconds", "1");
 +    }
 +
 +    @BeforeClass
 +    public static void setupCluster() throws IOException
 +    {
 +        // streaming requires networking ATM
 +        // streaming also requires gossip or isn't setup properly
 +        CLUSTER = init(Cluster.build(2)
 +                              .withConfig(c -> c.with(Feature.NETWORK)
 +                                                .with(Feature.GOSSIP))
 +                              .start());
 +    }
 +
 +    @AfterClass
 +    public static void teardownCluster()
 +    {
 +        if (CLUSTER != null)
 +            CLUSTER.close();
 +    }
 +
 +    protected String tableName(String prefix) {
 +        return prefix + "_" + postfix();
 +    }
 +
 +    protected String postfix()
 +    {
 +        return repairType.name().toLowerCase() + "_" + parallelism.name().toLowerCase() + "_" + withNotifications;
 +    }
 +
 +    protected NodeToolResult repair(int node, String... args) {
 +        return DistributedRepairUtils.repair(CLUSTER, node, repairType, withNotifications, parallelism.append(args));
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java
index ac31334,0000000..0d04649
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFailingMessageTest.java
@@@ -1,186 -1,0 +1,186 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import org.junit.AfterClass;
 +import org.junit.Assume;
 +import org.junit.BeforeClass;
 +import org.junit.Ignore;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.Feature;
 +import org.apache.cassandra.distributed.api.IMessageFilters;
 +import org.apache.cassandra.distributed.api.NodeToolResult;
 +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 +import org.apache.cassandra.net.Verb;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
 +
 +@RunWith(Parameterized.class)
 +@Ignore("Until CASSANDRA-15566 is in these tests all time out")
- public class RepairCoordinatorFailingMessageTest extends DistributedTestBase implements Serializable
++public class RepairCoordinatorFailingMessageTest extends TestBaseImpl implements Serializable
 +{
 +    private static Cluster CLUSTER;
 +
 +    private final RepairType repairType;
 +    private final boolean withNotifications;
 +
 +    public RepairCoordinatorFailingMessageTest(RepairType repairType, boolean withNotifications)
 +    {
 +        this.repairType = repairType;
 +        this.withNotifications = withNotifications;
 +    }
 +
 +    @Parameterized.Parameters(name = "{0}/{1}")
 +    public static Collection<Object[]> messages()
 +    {
 +        List<Object[]> tests = new ArrayList<>();
 +        for (RepairType type : RepairType.values())
 +        {
 +            tests.add(new Object[] { type, true });
 +            tests.add(new Object[] { type, false });
 +        }
 +        return tests;
 +    }
 +
 +    @BeforeClass
 +    public static void before()
 +    {
 +        DatabaseDescriptor.clientInitialization();
 +    }
 +
 +    @BeforeClass
 +    public static void setupCluster() throws IOException
 +    {
 +        // streaming requires networking ATM
 +        // streaming also requires gossip or isn't setup properly
 +        CLUSTER = init(Cluster.build(3) // set to 3 so streaming hits non-local case
 +                              .withConfig(c -> c.with(Feature.NETWORK)
 +                                                .with(Feature.GOSSIP))
 +                              .start());
 +    }
 +
 +    @AfterClass
 +    public static void teardownCluster()
 +    {
 +        if (CLUSTER != null)
 +            CLUSTER.close();
 +    }
 +
 +    private String tableName(String prefix) {
 +        return prefix + "_" + postfix() + "_" + withNotifications;
 +    }
 +
 +    private String postfix()
 +    {
 +        return repairType.name().toLowerCase();
 +    }
 +
 +    private NodeToolResult repair(int node, String... args) {
 +        return DistributedRepairUtils.repair(CLUSTER, node, repairType, withNotifications, args);
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void prepareIrFailure()
 +    {
 +        Assume.assumeTrue("The Verb.PREPARE_CONSISTENT_REQ is only for incremental, so disable in non-incremental", repairType == RepairType.INCREMENTAL);
 +        // Wait, isn't this copy paste of RepairCoordinatorTest::prepareFailure?  NO!
 +        // Incremental repair sends the PREPARE message the same way full does, but then after it does it sends
 +        // a consistent prepare message... and that one doesn't handle errors...
 +        CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".prepareirfailure (key text, value text, PRIMARY KEY (key))");
 +        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_CONSISTENT_REQ).messagesMatching(of(m -> {
 +            throw new RuntimeException("prepare fail");
 +        })).drop();
 +        try
 +        {
 +            NodeToolResult result = repair(1, KEYSPACE, "prepareirfailure");
 +            result.asserts()
 +                  .failure()
 +                  .errorContains("error prepare fail")
 +                  .notificationContains(NodeToolResult.ProgressEventType.ERROR, "error prepare fail")
 +                  .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
 +        }
 +        finally
 +        {
 +            filter.off();
 +        }
 +    }
 +
 +    //TODO failure reply murkle tree
 +    //TODO failure reply murkle tree IR
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void validationFailure()
 +    {
 +        String table = tableName("validationfailure");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.VALIDATION_REQ).messagesMatching(of(m -> {
 +            throw new RuntimeException("validation fail");
 +        })).drop();
 +        try
 +        {
 +            NodeToolResult result = repair(1, KEYSPACE, table);
 +            result.asserts()
 +                  .failure()
 +                  .errorContains("Some repair failed")
 +                  .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Some repair failed")
 +                  .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
 +        }
 +        finally
 +        {
 +            filter.off();
 +        }
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void streamFailure()
 +    {
 +        String table = tableName("streamfailure");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +        // there needs to be a difference to cause streaming to happen, so add to one node
 +        CLUSTER.get(2).executeInternal(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), "some data");
 +        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SYNC_REQ).messagesMatching(of(m -> {
 +            throw new RuntimeException("stream fail");
 +        })).drop();
 +        try
 +        {
 +            NodeToolResult result = repair(1, KEYSPACE, table);
 +            result.asserts()
 +                  .failure()
 +                  .errorContains("Some repair failed")
 +                  .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Some repair failed")
 +                  .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
 +        }
 +        finally
 +        {
 +            filter.off();
 +        }
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
index edcb9c3,0000000..b05706a
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@@ -1,384 -1,0 +1,384 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.util.Set;
 +
 +import com.google.common.collect.Iterables;
 +import org.junit.Assert;
 +import org.junit.Assume;
 +import org.junit.Test;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.IMessageFilters;
 +import org.apache.cassandra.distributed.api.LongTokenRange;
 +import org.apache.cassandra.distributed.api.NodeToolResult;
 +import org.apache.cassandra.distributed.api.NodeToolResult.ProgressEventType;
 +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
 +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.service.StorageService;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
 +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
 +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
 +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
 +
 +public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
 +{
 +    public RepairCoordinatorFast(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
 +    {
 +        super(repairType, parallelism, withNotifications);
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void simple() {
 +        String table = tableName("simple");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY KEY (key))", KEYSPACE, table));
 +        CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) VALUES (?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
 +
 +        long repairExceptions = getRepairExceptions(CLUSTER, 2);
 +        NodeToolResult result = repair(2, KEYSPACE, table);
 +        result.asserts().success();
 +        if (withNotifications)
 +        {
 +            result.asserts()
 +                  .notificationContains(ProgressEventType.START, "Starting repair command")
 +                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                  .notificationContains(ProgressEventType.SUCCESS, repairType != RepairType.PREVIEW ? "Repair completed successfully": "Repair preview completed successfully")
 +                  .notificationContains(ProgressEventType.COMPLETE, "finished");
 +        }
 +
 +        if (repairType != RepairType.PREVIEW)
 +        {
 +            assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
 +        }
 +        else
 +        {
 +            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +        }
 +
 +        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void missingKeyspace()
 +    {
 +        // as of this moment the check is done in nodetool so the JMX notifications are not imporant
 +        // nor is the history stored
 +        long repairExceptions = getRepairExceptions(CLUSTER, 2);
 +        NodeToolResult result = repair(2, "doesnotexist");
 +        result.asserts()
 +              .failure()
 +              .errorContains("Keyspace [doesnotexist] does not exist.");
 +
 +        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
 +
 +        assertParentRepairNotExist(CLUSTER, "doesnotexist");
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void missingTable()
 +    {
 +        long repairExceptions = getRepairExceptions(CLUSTER, 2);
 +        NodeToolResult result = repair(2, KEYSPACE, "doesnotexist");
 +        result.asserts()
 +              .failure();
 +        if (withNotifications)
 +        {
 +            result.asserts()
 +                  .errorContains("failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
 +                  // Start notification is ignored since this is checked during setup (aka before start)
 +                  .notificationContains(ProgressEventType.ERROR, "failed with error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
 +                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
 +        }
 +
 +        assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
 +
 +        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void noTablesToRepair()
 +    {
 +        // index CF currently don't support repair, so they get dropped when listed
 +        // this is done in this test to cause the keyspace to have 0 tables to repair, which causes repair to no-op
 +        // early and skip.
 +        String table = tableName("withindex");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +        CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s (value)", postfix(), KEYSPACE, table));
 +
 +        long repairExceptions = getRepairExceptions(CLUSTER, 2);
 +        // if CF has a . in it, it is assumed to be a 2i which rejects repairs
 +        NodeToolResult result = repair(2, KEYSPACE, table + ".value");
 +        result.asserts().success();
 +        if (withNotifications)
 +        {
 +            result.asserts()
 +                  .notificationContains("Empty keyspace")
 +                  .notificationContains("skipping repair: " + KEYSPACE)
 +                  // Start notification is ignored since this is checked during setup (aka before start)
 +                  .notificationContains(ProgressEventType.SUCCESS, "Empty keyspace") // will fail since success isn't returned; only complete
 +                  .notificationContains(ProgressEventType.COMPLETE, "finished"); // will fail since it doesn't do this
 +        }
 +
 +        assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
 +
 +        // this is actually a SKIP and not a FAILURE, so shouldn't increment
 +        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void intersectingRange()
 +    {
 +        // this test exists to show that this case will cause repair to finish; success or failure isn't imporant
 +        // if repair is enhanced to allow intersecting ranges w/ local then this test will fail saying that we expected
 +        // repair to fail but it didn't, this would be fine and this test should be updated to reflect the new
 +        // semantic
 +        String table = tableName("intersectingrange");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +
 +        //TODO dtest api for this?
 +        LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
 +            Set<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
 +            Range<Token> range = Iterables.getFirst(ranges, null);
 +            long left = (long) range.left.getTokenValue();
 +            long right = (long) range.right.getTokenValue();
 +            return new LongTokenRange(left, right);
 +        });
 +        LongTokenRange intersectingRange = new LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
 +
 +        long repairExceptions = getRepairExceptions(CLUSTER, 2);
 +        NodeToolResult result = repair(2, KEYSPACE, table,
 +                                       "--start-token", Long.toString(intersectingRange.minExclusive),
 +                                       "--end-token", Long.toString(intersectingRange.maxInclusive));
 +        result.asserts()
 +              .failure()
 +              .errorContains("Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one");
 +        if (withNotifications)
 +        {
 +            result.asserts()
 +                  .notificationContains(ProgressEventType.START, "Starting repair command")
 +                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                  .notificationContains(ProgressEventType.ERROR, "Requested range " + intersectingRange + " intersects a local range (" + tokenRange + ") but is not fully contained in one")
 +                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
 +        }
 +
 +        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +
 +        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void unknownHost()
 +    {
 +        String table = tableName("unknownhost");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +
 +        long repairExceptions = getRepairExceptions(CLUSTER, 2);
 +        NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "thisreally.should.not.exist.apache.org");
 +        result.asserts()
 +              .failure()
 +              .errorContains("Unknown host specified thisreally.should.not.exist.apache.org");
 +        if (withNotifications)
 +        {
 +            result.asserts()
 +                  .notificationContains(ProgressEventType.START, "Starting repair command")
 +                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                  .notificationContains(ProgressEventType.ERROR, "Unknown host specified thisreally.should.not.exist.apache.org")
 +                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
 +        }
 +
 +        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +
 +        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void desiredHostNotCoordinator()
 +    {
 +        // current limitation is that the coordinator must be apart of the repair, so as long as that exists this test
 +        // verifies that the validation logic will termniate the repair properly
 +        String table = tableName("desiredhostnotcoordinator");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +
 +        long repairExceptions = getRepairExceptions(CLUSTER, 2);
 +        NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", "localhost");
 +        result.asserts()
 +              .failure()
 +              .errorContains("The current host must be part of the repair");
 +        if (withNotifications)
 +        {
 +            result.asserts()
 +                  .notificationContains(ProgressEventType.START, "Starting repair command")
 +                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                  .notificationContains(ProgressEventType.ERROR, "The current host must be part of the repair")
 +                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
 +        }
 +
 +        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +
 +        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 2));
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void onlyCoordinator()
 +    {
 +        // this is very similar to ::desiredHostNotCoordinator but has the difference that the only host to do repair
 +        // is the coordinator
 +        String table = tableName("onlycoordinator");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +
 +        long repairExceptions = getRepairExceptions(CLUSTER, 2);
 +        NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", "localhost");
 +        result.asserts()
 +              .failure()
 +              .errorContains("Specified hosts [localhost] do not share range");
 +        if (withNotifications)
 +        {
 +            result.asserts()
 +                  .notificationContains(ProgressEventType.START, "Starting repair command")
 +                  .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                  .notificationContains(ProgressEventType.ERROR, "Specified hosts [localhost] do not share range")
 +                  .notificationContains(ProgressEventType.COMPLETE, "finished with error");
 +        }
 +
 +        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +
 +        //TODO should this be marked as fail to match others?  Should they not be marked?
 +        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void replicationFactorOne()
 +    {
 +        // In the case of rf=1 repair fails to create a cmd handle so node tool exists early
 +        String table = tableName("one");
 +        // since cluster is shared and this test gets called multiple times, need "IF NOT EXISTS" so the second+ attempt
 +        // does not fail
 +        CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS replicationfactor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
 +        CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s (key text, value text, PRIMARY KEY (key))", table));
 +
 +        long repairExceptions = getRepairExceptions(CLUSTER, 1);
 +        NodeToolResult result = repair(1, "replicationfactor", table);
 +        result.asserts()
 +              .success();
 +
 +        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +
 +        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 1));
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void prepareFailure()
 +    {
 +        String table = tableName("preparefailure");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
 +            throw new RuntimeException("prepare fail");
 +        })).drop();
 +        try
 +        {
 +            long repairExceptions = getRepairExceptions(CLUSTER, 1);
 +            NodeToolResult result = repair(1, KEYSPACE, table);
 +            result.asserts()
 +                  .failure()
 +                  .errorContains("Got negative replies from endpoints");
 +            if (withNotifications)
 +            {
 +                result.asserts()
 +                      .notificationContains(ProgressEventType.START, "Starting repair command")
 +                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                      .notificationContains(ProgressEventType.ERROR, "Got negative replies from endpoints")
 +                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
 +            }
 +
 +            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
 +            if (repairType != RepairType.PREVIEW)
 +            {
 +                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints");
 +            }
 +            else
 +            {
 +                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +            }
 +        }
 +        finally
 +        {
 +            filter.off();
 +        }
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void snapshotFailure()
 +    {
 +        Assume.assumeFalse("incremental does not do snapshot", repairType == RepairType.INCREMENTAL);
 +        Assume.assumeFalse("Parallel repair does not perform snapshots", parallelism == RepairParallelism.PARALLEL);
 +
 +        String table = tableName("snapshotfailure");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
 +            throw new RuntimeException("snapshot fail");
 +        })).drop();
 +        try
 +        {
 +            long repairExceptions = getRepairExceptions(CLUSTER, 1);
 +            NodeToolResult result = repair(1, KEYSPACE, table);
 +            result.asserts()
 +                  .failure();
 +            if (withNotifications)
 +            {
 +                result.asserts()
 +                      .errorContains("Could not create snapshot")
 +                      .notificationContains(ProgressEventType.START, "Starting repair command")
 +                      .notificationContains(ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                      .notificationContains(ProgressEventType.ERROR, "Could not create snapshot ")
 +                      .notificationContains(ProgressEventType.COMPLETE, "finished with error");
 +            }
 +            else
 +            {
 +                // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
 +                // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
 +                // polls on) is ignored.  With notifications off, the poll await fails and queries cmd state, and that
 +                // will have the below error.
 +                // NOTE: this isn't desireable, would be good to propgate
 +                result.asserts()
 +                      .errorContains("Some repair failed");
 +            }
 +
 +            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
 +            if (repairType != RepairType.PREVIEW)
 +            {
 +                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Could not create snapshot");
 +            }
 +            else
 +            {
 +                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +            }
 +        }
 +        finally
 +        {
 +            filter.off();
 +        }
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
index 32538ec,0000000..7be8ed1
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
@@@ -1,230 -1,0 +1,229 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.net.UnknownHostException;
 +import java.util.concurrent.CompletableFuture;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import com.google.common.util.concurrent.Uninterruptibles;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.api.IMessageFilters;
 +import org.apache.cassandra.distributed.api.NodeToolResult;
- import org.apache.cassandra.distributed.impl.MessageFilters;
 +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
 +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
 +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
 +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
 +
 +public abstract class RepairCoordinatorSlow extends RepairCoordinatorBase
 +{
 +    public RepairCoordinatorSlow(RepairType repairType, RepairParallelism parallelism, boolean withNotifications)
 +    {
 +        super(repairType, parallelism, withNotifications);
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void prepareRPCTimeout()
 +    {
 +        String table = tableName("preparerpctimeout");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).drop();
 +        try
 +        {
 +            long repairExceptions = getRepairExceptions(CLUSTER, 1);
 +            NodeToolResult result = repair(1, KEYSPACE, table);
 +            result.asserts()
 +                  .failure()
 +                  .errorContains("Got negative replies from endpoints [127.0.0.2:7012]");
 +            if (withNotifications)
 +            {
 +                result.asserts()
 +                      .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
 +                      .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative replies from endpoints [127.0.0.2:7012]")
 +                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
 +            }
 +
 +            if (repairType != RepairType.PREVIEW)
 +            {
 +                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Got negative replies from endpoints [127.0.0.2:7012]");
 +            }
 +            else
 +            {
 +                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +            }
 +
 +            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
 +        }
 +        finally
 +        {
 +            filter.off();
 +        }
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void neighbourDown() throws InterruptedException, ExecutionException
 +    {
 +        String table = tableName("neighbourdown");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +        Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
 +        String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().toString());
 +        try
 +        {
 +            // wait for the node to stop
 +            shutdownFuture.get();
 +            // wait for the failure detector to detect this
 +            CLUSTER.get(1).runOnInstance(() -> {
 +                InetAddressAndPort neighbor;
 +                try
 +                {
 +                    neighbor = InetAddressAndPort.getByName(downNodeAddress);
 +                }
 +                catch (UnknownHostException e)
 +                {
 +                    throw new RuntimeException(e);
 +                }
 +                while (FailureDetector.instance.isAlive(neighbor))
 +                    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 +            });
 +
 +            long repairExceptions = getRepairExceptions(CLUSTER, 1);
 +            NodeToolResult result = repair(1, KEYSPACE, table);
 +            result.asserts()
 +                  .failure()
 +                  .errorContains("Endpoint not alive");
 +            if (withNotifications)
 +            {
 +                result.asserts()
 +                      .notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair command")
 +                      .notificationContains(NodeToolResult.ProgressEventType.START, "repairing keyspace " + KEYSPACE + " with repair options")
 +                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not alive")
 +                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
 +            }
 +
 +            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
 +        }
 +        finally
 +        {
 +            CLUSTER.get(2).startup();
 +        }
 +
 +        // make sure to call outside of the try/finally so the node is up so we can actually query
 +        if (repairType != RepairType.PREVIEW)
 +        {
 +            assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint not alive");
 +        }
 +        else
 +        {
 +            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +        }
 +    }
 +
 +    @Test(timeout = 1 * 60 * 1000)
 +    public void validationParticipentCrashesAndComesBack()
 +    {
 +        // Test what happens when a participant restarts in the middle of validation
 +        // Currently this isn't recoverable but could be.
 +        // TODO since this is a real restart, how would I test "long pause"? Can't send SIGSTOP since same procress
 +        String table = tableName("validationparticipentcrashesandcomesback");
 +        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, PRIMARY KEY (key))", KEYSPACE, table));
 +        AtomicReference<Future<Void>> participantShutdown = new AtomicReference<>();
 +        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
 +            // the nice thing about this is that this lambda is "capturing" and not "transfer", what this means is that
 +            // this lambda isn't serialized and any object held isn't copied.
 +            participantShutdown.set(CLUSTER.get(2).shutdown());
 +            return true; // drop it so this node doesn't reply before shutdown.
 +        })).drop();
 +        try
 +        {
 +            // since nodetool is blocking, need to handle participantShutdown in the background
 +            CompletableFuture<Void> recovered = CompletableFuture.runAsync(() -> {
 +                try {
 +                    while (participantShutdown.get() == null) {
 +                        // event not happened, wait for it
 +                        TimeUnit.MILLISECONDS.sleep(100);
 +                    }
 +                    Future<Void> f = participantShutdown.get();
 +                    f.get(); // wait for shutdown to complete
 +                    CLUSTER.get(2).startup();
 +                } catch (Exception e) {
 +                    if (e instanceof RuntimeException) {
 +                        throw (RuntimeException) e;
 +                    }
 +                    throw new RuntimeException(e);
 +                }
 +            });
 +
 +            long repairExceptions = getRepairExceptions(CLUSTER, 1);
 +            NodeToolResult result = repair(1, KEYSPACE, table);
 +            recovered.join(); // if recovery didn't happen then the results are not what are being tested, so block here first
 +            result.asserts()
 +                  .failure();
 +            if (withNotifications)
 +            {
 +                result.asserts()
 +                      .errorContains("Endpoint 127.0.0.2:7012 died")
 +                      .notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint 127.0.0.2:7012 died")
 +                      .notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with error");
 +            }
 +            else
 +            {
 +                // Right now coordination doesn't propgate the first exception, so we only know "there exists a issue".
 +                // With notifications on nodetool will see the error then complete, so the cmd state (what nodetool
 +                // polls on) is ignored.  With notifications off, the poll await fails and queries cmd state, and that
 +                // will have the below error.
 +                // NOTE: this isn't desireable, would be good to propgate
 +                result.asserts()
 +                      .errorContains("Some repair failed");
 +            }
 +
 +            Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 1));
 +            if (repairType != RepairType.PREVIEW)
 +            {
 +                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, table, "Endpoint 127.0.0.2:7012 died");
 +            }
 +            else
 +            {
 +                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 +            }
 +        }
 +        finally
 +        {
 +            filter.off();
 +            try {
 +                CLUSTER.get(2).startup();
 +            } catch (Exception e) {
 +                // if you call startup twice it is allowed to fail, so ignore it... hope this didn't brike the other tests =x
 +            }
 +        }
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 1af329f,0000000..4e44543
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@@ -1,206 -1,0 +1,205 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.EnumSet;
 +import java.util.Map;
- import java.util.Set;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.StorageProxy;
 +
- public class RepairDigestTrackingTest extends DistributedTestBase implements Serializable
++public class RepairDigestTrackingTest extends TestBaseImpl implements Serializable// TODO: why serializable?
 +{
 +
 +    @Test
 +    public void testInconsistenciesFound() throws Throwable
 +    {
-         try (Cluster cluster = init(Cluster.create(2)))
++        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
 +        {
 +
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +            });
 +
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
 +
 +            cluster.get(1).runOnInstance(() ->
 +               Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
 +            );
 +            cluster.get(2).runOnInstance(() ->
 +               Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
 +            );
 +
 +            for (int i = 10; i < 20; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
 +
 +            cluster.get(1).runOnInstance(() ->
 +                                         Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
 +            );
 +            cluster.get(2).runOnInstance(() ->
 +                                         Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
 +            );
 +
 +            cluster.get(1).runOnInstance(() ->
 +                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
 +            );
 +            cluster.get(2).runOnInstance(() ->
 +                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
 +            );
 +
 +            cluster.get(2).runOnInstance(() ->
 +                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::markRepaired)
 +            );
 +
 +
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
 +            cluster.get(1).runOnInstance(() ->
 +              Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
 +            );
 +            cluster.get(2).runOnInstance(() ->
 +              Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertRepaired)
 +            );
 +
 +            long ccBefore = cluster.get(1).callOnInstance(() ->
 +                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
 +            );
 +
 +            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
 +            long ccAfter = cluster.get(1).callOnInstance(() ->
 +                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
 +            );
 +
 +            Assert.assertEquals("confirmed count should differ by 1 after range read", ccBefore + 1, ccAfter);
 +        }
 +    }
 +
 +    @Test
 +    public void testPurgeableTombstonesAreIgnored() throws Throwable
 +    {
-         try (Cluster cluster = init(Cluster.create(2)))
++        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
 +        {
 +
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +            });
 +
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
 +            // on node1 only insert some tombstones, then flush
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
 +            }
 +            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
 +
 +            // insert data on both nodes and flush
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
 +            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
 +            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
 +
 +            // nothing is repaired yet
 +            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
 +            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
 +            // mark everything repaired
 +            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
 +            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
 +            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
 +            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
 +
 +            // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
 +            }
 +
 +            long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
 +            // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
 +            TimeUnit.SECONDS.sleep(2);
 +            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL);
 +            long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
 +
 +            Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
 +        }
 +    }
 +
 +    private void assertNotRepaired(SSTableReader reader) {
 +        Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE);
 +    }
 +
 +    private void assertRepaired(SSTableReader reader) {
 +        Assert.assertTrue("repaired at is not set for sstable: " + reader.descriptor, getRepairedAt(reader) > 0);
 +    }
 +
 +    private long getRepairedAt(SSTableReader reader)
 +    {
 +        Descriptor descriptor = reader.descriptor;
 +        try
 +        {
 +            Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
 +                                                                      .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
 +
 +            StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
 +            return stats.repairedAt;
 +        } catch (IOException e) {
 +            throw new RuntimeException(e);
 +        }
 +
 +    }
 +
 +    private void markRepaired(SSTableReader reader) {
 +        Descriptor descriptor = reader.descriptor;
 +        try
 +        {
 +            descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
 +            reader.reloadSSTableMetadata();
 +        } catch (IOException e) {
 +            throw new RuntimeException(e);
 +        }
 +
 +    }
 +
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
index 0460dec,0000000..f37a3d8
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
@@@ -1,181 -1,0 +1,185 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.Map;
 +import java.util.function.Consumer;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableMap;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
- import org.apache.cassandra.distributed.impl.InstanceConfig;
++import org.apache.cassandra.distributed.api.ICluster;
++import org.apache.cassandra.distributed.api.IInstanceConfig;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.concurrent.SimpleCondition;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +
 +import static java.util.concurrent.TimeUnit.MINUTES;
- import static org.apache.cassandra.distributed.impl.ExecUtil.rethrow;
++import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
++import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 +
- public class RepairTest extends DistributedTestBase
++public class RepairTest extends TestBaseImpl
 +{
 +    private static final String insert = withKeyspace("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');");
 +    private static final String query = withKeyspace("SELECT k, c1, c2 FROM %s.test WHERE k = ?;");
-     private static Cluster cluster;
 +
-     private static void insert(Cluster cluster, int start, int end, int ... nodes)
++    private static ICluster<IInvokableInstance> cluster;
++
++    private static void insert(ICluster<IInvokableInstance> cluster, int start, int end, int ... nodes)
 +    {
 +        for (int i = start ; i < end ; ++i)
 +            for (int node : nodes)
 +                cluster.get(node).executeInternal(insert, Integer.toString(i));
 +    }
 +
-     private static void verify(Cluster cluster, int start, int end, int ... nodes)
++    private static void verify(ICluster<IInvokableInstance> cluster, int start, int end, int ... nodes)
 +    {
 +        for (int i = start ; i < end ; ++i)
 +        {
 +            for (int node = 1 ; node <= cluster.size() ; ++node)
 +            {
 +                Object[][] rows = cluster.get(node).executeInternal(query, Integer.toString(i));
 +                if (Arrays.binarySearch(nodes, node) >= 0)
 +                    assertRows(rows, new Object[] { Integer.toString(i), "value1", "value2" });
 +                else
 +                    assertRows(rows);
 +            }
 +        }
 +    }
 +
-     private static void flush(Cluster cluster, int ... nodes)
++    private static void flush(ICluster<IInvokableInstance> cluster, int ... nodes)
 +    {
 +        for (int node : nodes)
 +            cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(KEYSPACE)));
 +    }
 +
-     private static Cluster create(Consumer<InstanceConfig> configModifier) throws IOException
++    private static ICluster create(Consumer<IInstanceConfig> configModifier) throws IOException
 +    {
 +        configModifier = configModifier.andThen(
 +        config -> config.set("hinted_handoff_enabled", false)
 +                        .set("commitlog_sync_batch_window_in_ms", 5)
 +                        .with(NETWORK)
 +                        .with(GOSSIP)
 +        );
 +
-         return init(Cluster.build(3).withConfig(configModifier).start());
++        return init(Cluster.build().withNodes(3).withConfig(configModifier).start());
 +    }
 +
-     private void repair(Cluster cluster, Map<String, String> options)
++    private void repair(ICluster<IInvokableInstance> cluster, Map<String, String> options)
 +    {
 +        cluster.get(1).runOnInstance(rethrow(() -> {
 +            SimpleCondition await = new SimpleCondition();
 +            StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
 +                if (event.getType() == ProgressEventType.COMPLETE)
 +                    await.signalAll();
 +            })).right.get();
 +            await.await(1L, MINUTES);
 +        }));
 +    }
 +
-     void populate(Cluster cluster, String compression)
++    void populate(ICluster<IInvokableInstance> cluster, String compression) throws Exception
 +    {
 +        try
 +        {
 +            cluster.schemaChange(withKeyspace("DROP TABLE IF EXISTS %s.test;"));
 +            cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = " + compression));
 +
 +            insert(cluster,    0, 1000, 1, 2, 3);
 +            flush(cluster, 1);
 +            insert(cluster, 1000, 1001, 1, 2);
 +            insert(cluster, 1001, 2001, 1, 2, 3);
 +            flush(cluster, 1, 2, 3);
 +
 +            verify(cluster,    0, 1000, 1, 2, 3);
 +            verify(cluster, 1000, 1001, 1, 2);
 +            verify(cluster, 1001, 2001, 1, 2, 3);
 +        }
 +        catch (Throwable t)
 +        {
 +            cluster.close();
 +            throw t;
 +        }
 +
 +    }
 +
-     void repair(Cluster cluster, boolean sequential, String compression)
++    void repair(ICluster<IInvokableInstance> cluster, boolean sequential, String compression) throws Exception
 +    {
 +        populate(cluster, compression);
 +        repair(cluster, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel"));
 +        verify(cluster, 0, 2001, 1, 2, 3);
 +    }
 +
 +    @BeforeClass
 +    public static void setupCluster() throws IOException
 +    {
 +        cluster = create(config -> {});
 +    }
 +
 +    @AfterClass
-     public static void closeCluster()
++    public static void closeCluster() throws Exception
 +    {
 +        if (cluster != null)
 +            cluster.close();
 +    }
 +
 +    @Test
-     public void testSequentialRepairWithDefaultCompression()
++    public void testSequentialRepairWithDefaultCompression() throws Exception
 +    {
 +        repair(cluster, true, "{'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}");
 +    }
 +
 +    @Test
-     public void testParallelRepairWithDefaultCompression()
++    public void testParallelRepairWithDefaultCompression() throws Exception
 +    {
 +        repair(cluster, false, "{'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}");
 +    }
 +
 +    @Test
-     public void testSequentialRepairWithMinCompressRatio()
++    public void testSequentialRepairWithMinCompressRatio() throws Exception
 +    {
 +        repair(cluster, true, "{'class': 'org.apache.cassandra.io.compress.LZ4Compressor', 'min_compress_ratio': 4.0}");
 +    }
 +
 +    @Test
-     public void testParallelRepairWithMinCompressRatio()
++    public void testParallelRepairWithMinCompressRatio() throws Exception
 +    {
 +        repair(cluster, false, "{'class': 'org.apache.cassandra.io.compress.LZ4Compressor', 'min_compress_ratio': 4.0}");
 +    }
 +
 +    @Test
-     public void testSequentialRepairWithoutCompression()
++    public void testSequentialRepairWithoutCompression() throws Exception
 +    {
 +        repair(cluster, true, "{'enabled': false}");
 +    }
 +
 +    @Test
-     public void testParallelRepairWithoutCompression()
++    public void testParallelRepairWithoutCompression() throws Exception
 +    {
 +        repair(cluster, false, "{'enabled': false}");
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index fb5f758,1c4850a..5430800
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@@ -34,13 -33,12 +33,9 @@@ import org.junit.Ignore
  import org.junit.Test;
  
  import com.sun.management.HotSpotDiagnosticMXBean;
- import org.apache.cassandra.db.ConsistencyLevel;
--import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.distributed.Cluster;
- import org.apache.cassandra.distributed.impl.InstanceConfig;
- import org.apache.cassandra.gms.Gossiper;
- import org.apache.cassandra.service.CassandraDaemon;
- import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.IInstanceConfig;
 -import org.apache.cassandra.gms.Gossiper;
 -import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.SigarLibrary;
  
  import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
@@@ -147,16 -145,16 +142,13 @@@ public class ResourceLeakTest extends T
          for (int loop = 0; loop < numTestLoops; loop++)
          {
              System.out.println(String.format("========== Starting loop %03d ========", loop));
--            try (Cluster cluster = Cluster.build(numClusterNodes).withConfig(updater).start())
++            try (Cluster cluster = (Cluster) builder().withNodes(numClusterNodes).withConfig(updater).start())
              {
--                if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to settle on the seed node
--                    cluster.get(1).runOnInstance(() -> Gossiper.waitToSettle());
--
                  init(cluster);
                  String tableName = "tbl" + loop;
                  cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
                  cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." + tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL);
--                cluster.get(1).callOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(KEYSPACE).flush()));
++                cluster.get(1).flush(KEYSPACE);
                  if (dumpEveryLoop)
                  {
                      dumpResources(String.format("loop%03d", loop));
diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index e5ee240,f1f8674..f635a28
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -19,336 -1,207 +19,330 @@@
  package org.apache.cassandra.distributed.test;
  
  import org.junit.Assert;
- import org.junit.BeforeClass;
  import org.junit.Test;
  
 +import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.ConsistencyLevel;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.distributed.Cluster;
- import org.apache.cassandra.distributed.impl.IInvokableInstance;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.metrics.ReadRepairMetrics;
  
 -import static org.junit.Assert.assertEquals;
 -
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
- import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP;
- import static org.junit.Assert.assertEquals;
- 
- import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 +import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
++import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
++import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP;
 +import static org.junit.Assert.fail;
  
- public class SimpleReadWriteTest extends DistributedTestBase
+ // TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
 -public class SimpleReadWriteTest extends SharedClusterTestBase
++public class SimpleReadWriteTest extends TestBaseImpl
  {
-     @BeforeClass
-     public static void before()
-     {
-         DatabaseDescriptor.clientInitialization();
-     }
- 
      @Test
      public void coordinatorReadTest() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(3)))
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 -
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
 -        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
 -
 -        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 -                                                  ConsistencyLevel.ALL,
 -                                                  1),
 -                   row(1, 1, 1),
 -                   row(1, 2, 2),
 -                   row(1, 3, 3));
++        try (ICluster cluster = init(builder().withNodes(3).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
 +
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
 +
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 +                                                     ConsistencyLevel.ALL,
 +                                                     1),
 +                       row(1, 1, 1),
 +                       row(1, 2, 2),
 +                       row(1, 3, 3));
 +        }
      }
  
      @Test
      public void largeMessageTest() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(2)))
 -        int largeMessageThreshold = 1024 * 64;
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
 -        StringBuilder builder = new StringBuilder();
 -        for (int i = 0; i < largeMessageThreshold; i++)
 -            builder.append('a');
 -        String s = builder.toString();
 -        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
 -                                       ConsistencyLevel.ALL,
 -                                       s);
 -        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 -                                                  ConsistencyLevel.ALL,
 -                                                  1),
 -                   row(1, 1, s));
++        try (ICluster cluster = init(builder().withNodes(2).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
 +            StringBuilder builder = new StringBuilder();
 +            for (int i = 0; i < LARGE_MESSAGE_THRESHOLD ; i++)
 +                builder.append('a');
 +            String s = builder.toString();
 +            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
 +                                           ConsistencyLevel.ALL,
 +                                           s);
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 +                                                      ConsistencyLevel.ALL,
 +                                                      1),
 +                       row(1, 1, s));
 +        }
      }
  
      @Test
      public void coordinatorWriteTest() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(3)))
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++        try (ICluster cluster = init(builder().withNodes(3).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
  
 -        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
 -                                       ConsistencyLevel.QUORUM);
 +            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
 +                                          ConsistencyLevel.QUORUM);
  
 -        for (int i = 0; i < 3; i++)
 -        {
 -            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
 +            for (int i = 0; i < 3; i++)
 +            {
 +                assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
 +                           row(1, 1, 1));
 +            }
 +
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 +                                                     ConsistencyLevel.QUORUM),
                         row(1, 1, 1));
          }
 -
 -        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 -                                                  ConsistencyLevel.QUORUM),
 -                   row(1, 1, 1));
      }
  
      @Test
      public void readRepairTest() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(3)))
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++        try (ICluster cluster = init(builder().withNodes(3).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
  
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
  
 -        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
 +            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
  
 -        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 -                                                  ConsistencyLevel.ALL), // ensure node3 in preflist
 -                   row(1, 1, 1));
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 +                                                     ConsistencyLevel.ALL), // ensure node3 in preflist
 +                       row(1, 1, 1));
  
 -        // Verify that data got repaired to the third node
 -        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
 -                   row(1, 1, 1));
 +            // Verify that data got repaired to the third node
 +            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
 +                       row(1, 1, 1));
 +        }
      }
  
      @Test
 -    public void writeWithSchemaDisagreement() throws Throwable
 +    public void readRepairTimeoutTest() throws Throwable
      {
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
 +        final long reducedReadTimeout = 3000L;
-         try (Cluster cluster = init(Cluster.create(3)))
++        try (Cluster cluster = (Cluster) init(builder().withNodes(3).start()))
 +        {
 +            cluster.forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout)));
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
 +            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
 +            cluster.verbs(READ_REPAIR_RSP).to(1).drop();
 +            final long start = System.currentTimeMillis();
 +            try
 +            {
 +                cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
 +                fail("Read timeout expected but it did not occur");
 +            }
 +            catch (Exception ex)
 +            {
 +                // the containing exception class was loaded by another class loader. Comparing the message as a workaround to assert the exception
 +                Assert.assertTrue(ex.getMessage().contains("org.apache.cassandra.exceptions.ReadTimeoutException"));
 +                long actualTimeTaken = System.currentTimeMillis() - start;
 +                long magicDelayAmount = 100L; // it might not be the best way to check if the time taken is around the timeout value.
 +                // Due to the delays, the actual time taken from client perspective is slighly more than the timeout value
 +                Assert.assertTrue(actualTimeTaken > reducedReadTimeout);
 +                // But it should not exceed too much
 +                Assert.assertTrue(actualTimeTaken < reducedReadTimeout + magicDelayAmount);
 +                assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
 +                           row(1, 1, 1)); // the partition happened when the repaired node sending back ack. The mutation should be in fact applied.
 +            }
 +        }
 +    }
  
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +    @Test
 +    public void failingReadRepairTest() throws Throwable
 +    {
 +        // This test makes a explicit assumption about which nodes are read from; that 1 and 2 will be "contacts", and that 3 will be ignored.
 +        // This is a implementation detail of org.apache.cassandra.locator.ReplicaPlans#contactForRead and
 +        // org.apache.cassandra.locator.AbstractReplicationStrategy.getNaturalReplicasForToken that may change
 +        // in a future release; when that happens this test could start to fail but should only fail with the explicit
 +        // check that detects this behavior has changed.
 +        // see CASSANDRA-15507
 +        try (Cluster cluster = init(Cluster.create(3)))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
  
 -        // Introduce schema disagreement
 -        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 +            // nodes 1 and 3 are identical
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 43");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 43");
  
 -        Exception thrown = null;
 -        try
 -        {
 -            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
 -                                           ConsistencyLevel.QUORUM);
 +            // node 2 is different because of the timestamp; a repair is needed
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 42");
 +
 +            // 2 is out of date so needs to be repaired.  This will make sure that the repair does not happen on the node
 +            // which will trigger the coordinator to write to node 3
 +            cluster.verbs(READ_REPAIR_REQ).to(2).drop();
 +
 +            // save the state of the counters so its known if the contacts list changed
 +            long readRepairRequestsBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readRepairRequests.getCount());
 +            long speculatedWriteBefore = cluster.get(1).callOnInstance(() -> ReadRepairMetrics.speculatedWrite.getCount());
 +
 +            Object[][] rows = cluster.coordinator(1)
 +                       .execute("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.QUORUM);
 +
 +            // make sure to check counters first, so can detect if read repair executed as expected
 +            long readRepairRequestsAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readRepairRequests.getCount());
 +            long speculatedWriteAfter = cluster.get(1).callOnInstance(() -> ReadRepairMetrics.speculatedWrite.getCount());
 +
 +            // defensive checks to make sure the nodes selected are the ones expected
 +            Assert.assertEquals("number of read repairs after query does not match expected; its possible the nodes involved with the query did not match expected", readRepairRequestsBefore + 1, readRepairRequestsAfter);
 +            Assert.assertEquals("number of read repairs speculated writes after query does not match expected; its possible the nodes involved with the query did not match expected", speculatedWriteBefore + 1, speculatedWriteAfter);
 +
 +            // 1 has newer data than 2 so its write timestamp will be used for the result
 +            assertRows(rows, row(1, 1, 1, 43L));
 +
 +            // cheack each node's local state
 +            // 1 and 3 should match quorum response
 +            assertRows(cluster.get(1).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 43L));
 +            assertRows(cluster.get(3).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 43L));
 +
 +            // 2 was not repaired (message was dropped), so still has old data
 +            assertRows(cluster.get(2).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 42L));
          }
 -        catch (RuntimeException e)
 +    }
 +
 +    @Test
 +    public void writeWithSchemaDisagreement() throws Throwable
 +    {
-         try (Cluster cluster = init(Cluster.build(3).withConfig(config -> config.with(NETWORK)).start()))
++        try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
          {
 -            thrown = e;
 -        }
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
 +
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
  
 -        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
 -        Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
 +            // Introduce schema disagreement
 +            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 +
 +            try
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
 +                                              ConsistencyLevel.QUORUM);
 +                fail("Should have failed because of schema disagreement.");
 +            }
 +            catch (Exception e)
 +            {
 +                Assert.assertTrue(e instanceof RuntimeException);
 +                RuntimeException re = ((RuntimeException) e);
 +                // for some reason, we get weird errors when trying to check class directly
 +                // I suppose it has to do with some classloader manipulation going on
 +                Assert.assertTrue(re.getCause().getClass().toString().contains("WriteFailureException"));
 +                // we may see 1 or 2 failures in here, because of the fail-fast behavior of AbstractWriteResponseHandler
 +                Assert.assertTrue(re.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")
 +                                  || re.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
 +
 +            }
 +        }
      }
  
      @Test
      public void readWithSchemaDisagreement() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(3, config -> config.with(NETWORK))))
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
++        try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
  
 -        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 -        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
  
 -        // Introduce schema disagreement
 -        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 +            // Introduce schema disagreement
 +            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
  
 -        Exception thrown = null;
 -        try
 -        {
 -            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 -                                                      ConsistencyLevel.ALL),
 -                       row(1, 1, 1, null));
 -        }
 -        catch (Exception e)
 -        {
 -            thrown = e;
 -        }
 +            try
 +            {
 +                cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
 +                fail("Should have failed because of schema disagreement.");
 +            }
 +            catch (Exception e)
 +            {
 +                Assert.assertTrue(e instanceof RuntimeException);
 +                RuntimeException re = ((RuntimeException) e);
 +                // for some reason, we get weird errors when trying to check class directly
 +                // I suppose it has to do with some classloader manipulation going on
 +                Assert.assertTrue(re.getCause().getClass().toString().contains("ReadFailureException"));
 +                // we may see 1 or 2 failures in here, because of the fail-fast behavior of ReadCallback
 +                Assert.assertTrue(re.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")
 +                                  || re.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
 +            }
  
 -        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
 -        Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
 +        }
      }
  
      @Test
      public void simplePagedReadsTest() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(3)))
 -
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 -
 -        int size = 100;
 -        Object[][] results = new Object[size][];
 -        for (int i = 0; i < size; i++)
++        try (ICluster cluster = init(builder().withNodes(3).start()))
          {
 -            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
 -                                           ConsistencyLevel.QUORUM,
 -                                           i, i);
 -            results[i] = new Object[]{ 1, i, i };
 -        }
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
  
 -        // Make sure paged read returns same results with different page sizes
 -        for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
 -        {
 -            assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
 -                                                                ConsistencyLevel.QUORUM,
 -                                                                pageSize),
 -                       results);
 +            int size = 100;
 +            Object[][] results = new Object[size][];
 +            for (int i = 0; i < size; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
 +                                               ConsistencyLevel.QUORUM,
 +                                               i, i);
 +                results[i] = new Object[] { 1, i, i};
 +            }
 +
 +            // Make sure paged read returns same results with different page sizes
 +            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
 +            {
 +                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
 +                                                                    ConsistencyLevel.QUORUM,
 +                                                                    pageSize),
 +                           results);
 +            }
          }
      }
  
      @Test
      public void pagingWithRepairTest() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(3)))
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 -
 -        int size = 100;
 -        Object[][] results = new Object[size][];
 -        for (int i = 0; i < size; i++)
++        try (ICluster cluster = init(builder().withNodes(3).start()))
          {
 -            // Make sure that data lands on different nodes and not coordinator
 -            cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
 -                                                            i, i);
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
  
 -            results[i] = new Object[]{ 1, i, i };
 -        }
 +            int size = 100;
 +            Object[][] results = new Object[size][];
 +            for (int i = 0; i < size; i++)
 +            {
 +                // Make sure that data lands on different nodes and not coordinator
 +                cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
 +                                                                i, i);
  
 -        // Make sure paged read returns same results with different page sizes
 -        for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
 -        {
 -            assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
 -                                                                ConsistencyLevel.ALL,
 -                                                                pageSize),
 +                results[i] = new Object[] { 1, i, i};
 +            }
 +
 +            // Make sure paged read returns same results with different page sizes
 +            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
 +            {
 +                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
 +                                                                    ConsistencyLevel.ALL,
 +                                                                    pageSize),
 +                           results);
 +            }
 +
 +            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
                         results);
          }
 -
 -        assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
 -                   results);
      }
  
      @Test
      public void pagingTests() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(3));
-              Cluster singleNode = init(Cluster.build(1).withSubnet(1).start()))
 -        try (ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
++        try (ICluster cluster = init(builder().withNodes(3).start());
++             ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
          {
              cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
              singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
@@@ -402,22 -254,19 +396,22 @@@
      @Test
      public void metricsCountQueriesTest() throws Throwable
      {
-         try (Cluster cluster = init(Cluster.create(2)))
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 -        for (int i = 0; i < 100; i++)
 -            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
 -
 -        long readCount1 = readCount((IInvokableInstance) cluster.get(1));
 -        long readCount2 = readCount((IInvokableInstance) cluster.get(2));
 -        for (int i = 0; i < 100; i++)
 -            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
 -
 -        readCount1 = readCount((IInvokableInstance) cluster.get(1)) - readCount1;
 -        readCount2 = readCount((IInvokableInstance) cluster.get(2)) - readCount2;
 -        assertEquals(readCount1, readCount2);
 -        assertEquals(100, readCount1);
++        try (ICluster<IInvokableInstance> cluster = init(Cluster.create(2)))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +            for (int i = 0; i < 100; i++)
 +                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
 +
 +            long readCount1 = readCount(cluster.get(1));
 +            long readCount2 = readCount(cluster.get(2));
 +            for (int i = 0; i < 100; i++)
 +                cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
 +
 +            readCount1 = readCount(cluster.get(1)) - readCount1;
 +            readCount2 = readCount(cluster.get(2)) - readCount2;
-             assertEquals(readCount1, readCount2);
-             assertEquals(100, readCount1);
++            Assert.assertEquals(readCount1, readCount2);
++            Assert.assertEquals(100, readCount1);
 +        }
      }
  
      private long readCount(IInvokableInstance instance)
diff --cc test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
index 22cd590,0000000..bafd03d
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
@@@ -1,75 -1,0 +1,75 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.util.Arrays;
 +import java.util.Comparator;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.service.StorageService;
 +
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
- public class StreamingTest extends DistributedTestBase
++public class StreamingTest extends TestBaseImpl
 +{
 +
 +    private void testStreaming(int nodes, int replicationFactor, int rowCount, String compactionStrategy) throws Throwable
 +    {
-         try (Cluster cluster = Cluster.create(nodes, config -> config.with(NETWORK)))
++        try (Cluster cluster = (Cluster) builder().withNodes(nodes).withConfig(config -> config.with(NETWORK)).start())
 +        {
 +            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
 +            cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'true'}", KEYSPACE, compactionStrategy));
 +
 +            for (int i = 0 ; i < rowCount ; ++i)
 +            {
 +                for (int n = 1 ; n < nodes ; ++n)
 +                    cluster.get(n).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));
 +            }
 +
 +            cluster.get(nodes).executeInternal("TRUNCATE system.available_ranges;");
 +            {
 +                Object[][] results = cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
 +                Assert.assertEquals(0, results.length);
 +            }
 +
 +            cluster.get(nodes).runOnInstance(() -> StorageService.instance.rebuild(null, KEYSPACE, null, null));
 +            {
 +                Object[][] results = cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
 +                Assert.assertEquals(1000, results.length);
 +                Arrays.sort(results, Comparator.comparingInt(a -> Integer.parseInt((String) a[0])));
 +                for (int i = 0 ; i < results.length ; ++i)
 +                {
 +                    Assert.assertEquals(Integer.toString(i), results[i][0]);
 +                    Assert.assertEquals("value1", results[i][1]);
 +                    Assert.assertEquals("value2", results[i][2]);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Test
 +    public void test() throws Throwable
 +    {
 +        testStreaming(2, 2, 1000, "LeveledCompactionStrategy");
 +    }
 +
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 5f1263a,0e0561a..9752430
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@@ -18,25 -18,30 +18,31 @@@
  
  package org.apache.cassandra.distributed.test;
  
- import java.io.IOException;
- 
- import org.junit.Test;
+ import org.junit.After;
+ import org.junit.BeforeClass;
  
 +import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInstance;
+ import org.apache.cassandra.distributed.shared.Builder;
+ import org.apache.cassandra.distributed.shared.DistributedTestBase;
  
- import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
- import static org.apache.cassandra.distributed.api.Feature.NETWORK;
- 
- public class GossipSettlesTest extends DistributedTestBase
+ public class TestBaseImpl extends DistributedTestBase
  {
+     @After
+     public void afterEach() {
+         super.afterEach();
+     }
  
-     @Test
-     public void testGossipSettles() throws IOException
-     {
-         // Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails
-         try (Cluster cluster = Cluster.build(3).withConfig(config -> config.with(GOSSIP).with(NETWORK)).withSubnet(1).start())
-         {
-         }
+     @BeforeClass
+     public static void beforeClass() throws Throwable {
+         ICluster.setup();
      }
  
+     @Override
+     public <I extends IInstance, C extends ICluster> Builder<I, C> builder() {
+         // This is definitely not the smartest solution, but given the complexity of the alternatives and low risk, we can just rely on the
+         // fact that this code is going to work accross _all_ versions.
 -        return (Builder<I, C>) org.apache.cassandra.distributed.Cluster.build();
++        return (Builder<I, C>) Cluster.build();
+     }
 -}
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index 31f4b84,b98829d..6df72b8
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@@ -20,11 -20,11 +20,10 @@@ package org.apache.cassandra.distribute
  
  import org.junit.Test;
  
- import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.distributed.impl.Versions;
- import org.apache.cassandra.distributed.test.DistributedTestBase;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
 -import org.apache.cassandra.distributed.shared.DistributedTestBase;
+ import org.apache.cassandra.distributed.shared.Versions;
  
- import static org.apache.cassandra.distributed.impl.Versions.find;
+ import static org.apache.cassandra.distributed.shared.Versions.find;
  
  public class MixedModeReadRepairTest extends UpgradeTestBase
  {
@@@ -35,17 -35,17 +34,17 @@@
          .nodes(2)
          .upgrade(Versions.Major.v22, Versions.Major.v30)
          .nodesToUpgrade(2)
--        .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
++        .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
          .runAfterClusterUpgrade((cluster) -> {
              // now node2 is 3.0 and node1 is 2.2
              // make sure 2.2 side does not get the mutation
--            cluster.get(2).executeInternal("DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
--                                                                          "something");
++            cluster.get(2).executeInternal("DELETE FROM " + KEYSPACE + ".tbl WHERE pk = ?",
++                                           "something");
              // trigger a read repair
--            cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
++            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
                                             ConsistencyLevel.ALL,
                                             "something");
--            cluster.get(1).flush(DistributedTestBase.KEYSPACE);
++            cluster.get(1).flush(KEYSPACE);
              // upgrade node1 to 3.0
              cluster.get(1).shutdown().get();
              Versions allVersions = find();
@@@ -53,7 -53,7 +52,7 @@@
              cluster.get(1).startup();
  
              // and make sure the sstables are readable
--            cluster.get(1).forceCompact(DistributedTestBase.KEYSPACE, "tbl");
++            cluster.get(1).forceCompact(KEYSPACE, "tbl");
          }).run();
      }
--}
++}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 2e63725,5970992..88dd0f9
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@@ -18,11 -18,16 +18,11 @@@
  
  package org.apache.cassandra.distributed.upgrade;
  
 -import java.util.Iterator;
 -
 -import com.google.common.collect.Iterators;
  import org.junit.Test;
  
- import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.distributed.impl.Versions;
- import org.apache.cassandra.distributed.test.DistributedTestBase;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.shared.Versions;
 -
 -import junit.framework.Assert;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.*;
  
  public class UpgradeTest extends UpgradeTestBase
  {
@@@ -31,23 -36,54 +31,22 @@@
      public void upgradeTest() throws Throwable
      {
          new TestCase()
-             .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
-             .upgrade(Versions.Major.v30, Versions.Major.v3X, Versions.Major.v4)
-             .setup((cluster) -> {
-                 cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+         .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
++        .upgrade(Versions.Major.v30, Versions.Major.v3X, Versions.Major.v4)
+         .setup((cluster) -> {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
  
-                 cluster.get(1).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-                 cluster.get(2).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
-                 cluster.get(3).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-             })
-             .runAfterClusterUpgrade((cluster) -> {
-                 DistributedTestBase.assertRows(cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
-                                                                               ConsistencyLevel.ALL,
-                                                                               1),
-                                                DistributedTestBase.row(1, 1, 1),
-                                                DistributedTestBase.row(1, 2, 2),
-                                                DistributedTestBase.row(1, 3, 3));
-             }).run();
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+             cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+         })
+         .runAfterClusterUpgrade((cluster) -> {
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 -                                                                          ConsistencyLevel.ALL,
 -                                                                          1),
 -                                           row(1, 1, 1),
 -                                           row(1, 2, 2),
 -                                           row(1, 3, 3));
 -        }).run();
 -    }
 -
 -    @Test
 -    public void mixedModePagingTest() throws Throwable
 -    {
 -        new TestCase()
 -        .upgrade(Versions.Major.v22, Versions.Major.v30)
 -        .nodes(2)
 -        .nodesToUpgrade(2)
 -        .setup((cluster) -> {
 -            cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
 -            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) with compact storage");
 -            for (int i = 0; i < 100; i++)
 -                for (int j = 0; j < 200; j++)
 -                    cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j);
 -            cluster.forEach((i) -> i.flush(KEYSPACE));
 -            for (int i = 0; i < 100; i++)
 -                for (int j = 10; j < 30; j++)
 -                    cluster.coordinator(2).execute("DELETE FROM " + KEYSPACE + ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j);
 -            cluster.forEach((i) -> i.flush(KEYSPACE));
 -        })
 -        .runAfterClusterUpgrade((cluster) -> {
 -            for (int i = 0; i < 100; i++)
 -            {
 -                for (int pageSize = 10; pageSize < 100; pageSize++)
 -                {
 -                    Iterator<Object[]> res = cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 -                                                                                      ConsistencyLevel.ALL,
 -                                                                                      pageSize, i);
 -                    Assert.assertEquals(180, Iterators.size(res));
 -                }
 -            }
++                                                      ConsistencyLevel.ALL,
++                                                      1),
++                       row(1, 1, 1),
++                       row(1, 2, 2),
++                       row(1, 3, 3));
+         }).run();
      }
- 
- }
+ }
diff --cc test/unit/org/apache/cassandra/CassandraIsolatedJunit4ClassRunner.java
index 5af6b1a,0000000..90e3896
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/CassandraIsolatedJunit4ClassRunner.java
+++ b/test/unit/org/apache/cassandra/CassandraIsolatedJunit4ClassRunner.java
@@@ -1,107 -1,0 +1,109 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra;
 +
 +import java.io.IOException;
 +import java.net.URLClassLoader;
 +import java.util.function.Predicate;
 +
 +import org.junit.runners.BlockJUnit4ClassRunner;
 +import org.junit.runners.model.InitializationError;
 +
- import org.apache.cassandra.distributed.impl.Versions;
++import org.apache.cassandra.distributed.impl.AbstractCluster;
++import org.apache.cassandra.distributed.shared.Versions;
++import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + *
 + * This class is usually used to test singletons. It ensure singletons can be unique in each test case.
 + *
 + */
 +public class CassandraIsolatedJunit4ClassRunner extends BlockJUnit4ClassRunner
 +{
 +
 +    private static final Predicate<String> isolatedPackage = name ->
 +                                                             name.startsWith("org.apache.cassandra.") ||
 +                                                             // YAML could not be shared because
 +                                                             // org.apache.cassandra.config.Config is loaded by org.yaml.snakeyaml.YAML
 +                                                             name.startsWith("org.yaml.snakeyaml.");
 +
 +
 +    /**
 +     * Creates a CassandraIsolatedJunit4ClassRunner to run {@code klass}
 +     *
 +     * @param clazz
 +     * @throws InitializationError if the test class is malformed.
 +     */
 +    public CassandraIsolatedJunit4ClassRunner(Class<?> clazz) throws InitializationError
 +    {
 +        super(createClassLoader(clazz));
 +    }
 +
 +    private static Class<?> createClassLoader(Class<?> clazz) throws InitializationError {
 +        try {
 +            ClassLoader testClassLoader = new CassandraIsolatedClassLoader();
 +            return Class.forName(clazz.getName(), true, testClassLoader);
 +        } catch (ClassNotFoundException e) {
 +            throw new InitializationError(e);
 +        }
 +    }
 +
 +    public static class CassandraIsolatedClassLoader extends URLClassLoader
 +    {
 +        public CassandraIsolatedClassLoader()
 +        {
-             super(Versions.CURRENT.classpath);
++            super(AbstractCluster.CURRENT_VERSION.classpath);
 +        }
 +
 +        @Override
 +        public Class<?> loadClass(String name) throws ClassNotFoundException
 +        {
 +
 +            if (isolatedPackage.test(name))
 +            {
 +                synchronized (getClassLoadingLock(name))
 +                {
 +                    // First, check if the class has already been loaded
 +                    Class<?> c = findLoadedClass(name);
 +
 +                    if (c == null)
 +                        c = findClass(name);
 +
 +                    return c;
 +                }
 +            }
 +            else
 +            {
 +                return super.loadClass(name);
 +            }
 +        }
 +
 +        protected void finalize()
 +        {
 +            try
 +            {
 +                close();
 +            }
 +            catch (IOException e)
 +            {
 +                e.printStackTrace();
 +            }
 +        }
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 534bea9,ce59c01..e787082
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@@ -86,20 -82,10 +86,20 @@@ public class DatabaseDescriptorRefTes
      "org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker$1",
      "org.apache.cassandra.config.YamlConfigurationLoader$CustomConstructor",
      "org.apache.cassandra.config.TransparentDataEncryptionOptions",
 +    "org.apache.cassandra.db.ConsistencyLevel",
 +    "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerFactory",
 +    "org.apache.cassandra.db.commitlog.DefaultCommitLogSegmentMgrFactory",
 +    "org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager",
 +    "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerCDC",
 +    "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerStandard",
 +    "org.apache.cassandra.db.commitlog.CommitLog",
 +    "org.apache.cassandra.db.commitlog.CommitLogMBean",
      "org.apache.cassandra.dht.IPartitioner",
 -    "org.apache.cassandra.distributed.impl.InstanceClassLoader",
 +    "org.apache.cassandra.distributed.api.IInstance",
 +    "org.apache.cassandra.distributed.api.IIsolatedExecutor",
-     "org.apache.cassandra.distributed.impl.InstanceClassLoader",
++    "org.apache.cassandra.distributed.shared.InstanceClassLoader",
      "org.apache.cassandra.distributed.impl.InstanceConfig",
-     "org.apache.cassandra.distributed.impl.IInvokableInstance",
 -    "org.apache.cassandra.distributed.impl.InvokableInstance",
++    "org.apache.cassandra.distributed.api.IInvokableInstance",
      "org.apache.cassandra.distributed.impl.InvokableInstance$CallableNoExcept",
      "org.apache.cassandra.distributed.impl.InvokableInstance$InstanceFunction",
      "org.apache.cassandra.distributed.impl.InvokableInstance$SerializableBiConsumer",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org