You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/27 18:24:26 UTC

[cassandra] branch cassandra-2.2 updated: Extract in-jvm-dtest API

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 1f72cc6  Extract in-jvm-dtest API
1f72cc6 is described below

commit 1f72cc6197187abac5b1f70a19589dd4883e8d98
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Fri Feb 7 18:47:33 2020 -0800

    Extract in-jvm-dtest API
    
    Patch by Alex Petrov for CASSANDRA-15539; reviewed by David Capwell and Dinesh Joshi.
---
 build.xml                                          |   5 +
 .../org/apache/cassandra/net/MessagingService.java |   2 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |  16 +
 src/java/org/apache/cassandra/tools/NodeTool.java  |   4 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  32 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  32 +-
 .../apache/cassandra/distributed/api/Feature.java  |  24 --
 .../apache/cassandra/distributed/api/ICluster.java |  36 --
 .../cassandra/distributed/api/ICoordinator.java    |  36 --
 .../cassandra/distributed/api/IInstance.java       |  57 ----
 .../cassandra/distributed/api/IInstanceConfig.java |  58 ----
 .../distributed/api/IIsolatedExecutor.java         | 126 -------
 .../apache/cassandra/distributed/api/IListen.java  |  28 --
 .../apache/cassandra/distributed/api/IMessage.java |  37 --
 .../cassandra/distributed/api/IMessageFilters.java |  56 ----
 .../distributed/impl/AbstractCluster.java          | 373 +++++----------------
 .../cassandra/distributed/impl/Coordinator.java    |  52 ++-
 .../impl/DelegatingInvokableInstance.java          |  10 +-
 .../distributed/impl/DistributedTestSnitch.java    |  33 +-
 .../distributed/impl/IInvokableInstance.java       |  67 ----
 .../distributed/impl/IUpgradeableInstance.java     |   1 +
 .../cassandra/distributed/impl/Instance.java       | 290 ++++++++++++----
 .../distributed/impl/InstanceClassLoader.java      | 132 --------
 .../cassandra/distributed/impl/InstanceConfig.java |  98 +++---
 .../cassandra/distributed/impl/InstanceKiller.java |  50 +++
 .../distributed/impl/IsolatedExecutor.java         |   4 +
 .../apache/cassandra/distributed/impl/Listen.java  |   1 -
 .../cassandra/distributed/impl/MessageFilters.java | 165 ---------
 .../impl/{Message.java => MessageImpl.java}        |  27 +-
 .../distributed/impl/NetworkTopology.java          | 137 --------
 .../apache/cassandra/distributed/impl/RowUtil.java |   7 +
 .../cassandra/distributed/impl/TracingUtil.java    |   2 +-
 .../cassandra/distributed/impl/Versions.java       | 190 -----------
 .../mock/nodetool/InternalNodeProbe.java           |  33 +-
 .../mock/nodetool/InternalNodeProbeFactory.java    |  11 +-
 .../cassandra/distributed/test/BootstrapTest.java  |  50 +--
 .../test/DistributedReadWritePathTest.java         | 210 ------------
 .../distributed/test/DistributedTestBase.java      | 169 ----------
 .../distributed/test/GossipSettlesTest.java        |  16 +-
 .../distributed/test/MessageFiltersTest.java       |  92 +++--
 .../distributed/test/MessageForwardingTest.java    |  10 +-
 .../distributed/test/NativeProtocolTest.java       |  49 +--
 .../distributed/test/NetworkTopologyTest.java      |  40 ++-
 .../cassandra/distributed/test/NodeToolTest.java   |   2 +-
 .../distributed/test/ResourceLeakTest.java         |  13 +-
 .../distributed/test/SimpleReadWritePathTest.java  | 225 +++++++++++++
 .../cassandra/distributed/test/TestBaseImpl.java   |  49 +++
 .../upgrade/MixedModeReadRepairTest.java           |  17 +-
 .../cassandra/distributed/upgrade/UpgradeTest.java |  42 +--
 .../distributed/upgrade/UpgradeTestBase.java       |  20 +-
 50 files changed, 1099 insertions(+), 2137 deletions(-)

diff --git a/build.xml b/build.xml
index d86cc99..ed9c1a2 100644
--- a/build.xml
+++ b/build.xml
@@ -394,6 +394,8 @@
 	         <exclusion groupId="commons-logging" artifactId="commons-logging"/>
           </dependency>
           <dependency groupId="junit" artifactId="junit" version="4.6" />
+          <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
+          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.1" />
           <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
              <exclusion groupId="commons-lang" artifactId="commons-lang"/>
           </dependency>
@@ -506,6 +508,8 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
+        <dependency groupId="org.mockito" artifactId="mockito-core" />
+        <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
         <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
       	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
@@ -530,6 +534,7 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
+        <dependency groupId="org.mockito" artifactId="mockito-core" />
         <dependency groupId="org.apache.pig" artifactId="pig">
           <exclusion groupId="xmlenc" artifactId="xmlenc"/>
           <exclusion groupId="tomcat" artifactId="jasper-runtime"/>
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e42b91b..f125b09 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -254,7 +254,7 @@ public final class MessagingService implements MessagingServiceMBean
      * a placeholder class that means "deserialize using the callback." We can't implement this without
      * special-case code in InboundTcpConnection because there is no way to pass the message id to IVersionedSerializer.
      */
-    static class CallbackDeterminedSerializer implements IVersionedSerializer<Object>
+    public static class CallbackDeterminedSerializer implements IVersionedSerializer<Object>
     {
         public static final CallbackDeterminedSerializer instance = new CallbackDeterminedSerializer();
 
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 3d7db43..9798763 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -163,6 +163,13 @@ public class NodeProbe implements AutoCloseable
         connect();
     }
 
+    protected NodeProbe()
+    {
+        // this constructor is only used for extensions to rewrite their own connect method
+        this.host = "";
+        this.port = 0;
+    }
+
     /**
      * Create a connection to the JMX agent and setup the M[X]Bean proxies.
      *
@@ -763,6 +770,15 @@ public class NodeProbe implements AutoCloseable
         return spProxy;
     }
 
+    public StorageServiceMBean getStorageService() {
+        return ssProxy;
+    }
+
+    public GossiperMBean getGossProxy()
+    {
+        return gossProxy;
+    }
+
     public String getEndpoint()
     {
         Map<String, String> hostIdToEndpoint = ssProxy.getHostIdToEndpoint();
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index a11b80e..b6dadd6 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -210,13 +210,13 @@ public class NodeTool
         }
     }
 
-    private static void badUse(Exception e)
+    protected void badUse(Exception e)
     {
         System.out.println("nodetool: " + e.getMessage());
         System.out.println("See 'nodetool help' or 'nodetool help <command>'.");
     }
 
-    private static void err(Throwable e)
+    protected void err(Throwable e)
     {
         System.err.println("error: " + e.getMessage());
         System.err.println("-- StackTrace --");
diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index d3533d3..d657638 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -23,31 +23,44 @@ import java.io.IOException;
 import java.util.List;
 import java.util.function.Consumer;
 
-import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
-import org.apache.cassandra.distributed.impl.IInvokableInstance;
 import org.apache.cassandra.distributed.impl.InstanceConfig;
-import org.apache.cassandra.distributed.impl.Versions;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
 
 /**
  * A simple cluster supporting only the 'current' Cassandra version, offering easy access to the convenience methods
  * of IInvokableInstance on each node.
  */
-public class Cluster extends AbstractCluster<IInvokableInstance> implements ICluster, AutoCloseable
+public class Cluster extends AbstractCluster<IInvokableInstance>
 {
-    private Cluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+
+    private Cluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader)
     {
         super(root, version, configs, sharedClassLoader);
     }
 
-    protected IInvokableInstance newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config)
+    protected IInvokableInstance newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config)
     {
         return new Wrapper(generation, version, config);
     }
 
     public static Builder<IInvokableInstance, Cluster> build()
     {
-        return new Builder<>(Cluster::new);
+        return new Builder<IInvokableInstance, Cluster>(Cluster::new)
+        {
+            {
+                withVersion(CURRENT_VERSION);
+            }
+
+            protected IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
+            {
+                return InstanceConfig.generate(nodeNum, ipAddress, networkTopology, root, token, seedIp);
+            }
+        };
     }
 
     public static Builder<IInvokableInstance, Cluster> build(int nodeCount)
@@ -55,7 +68,7 @@ public class Cluster extends AbstractCluster<IInvokableInstance> implements IClu
         return build().withNodes(nodeCount);
     }
 
-    public static Cluster create(int nodeCount, Consumer<InstanceConfig> configUpdater) throws IOException
+    public static Cluster create(int nodeCount, Consumer<IInstanceConfig> configUpdater) throws IOException
     {
         return build(nodeCount).withConfig(configUpdater).start();
     }
@@ -64,5 +77,4 @@ public class Cluster extends AbstractCluster<IInvokableInstance> implements IClu
     {
         return build(nodeCount).start();
     }
-}
-
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 1fe960a..a7899fe 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -21,11 +21,13 @@ package org.apache.cassandra.distributed;
 import java.io.File;
 import java.util.List;
 
-import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
-import org.apache.cassandra.distributed.impl.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.impl.InstanceConfig;
-import org.apache.cassandra.distributed.impl.Versions;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
 
 /**
  * A multi-version cluster, offering only the cross-version API
@@ -34,21 +36,36 @@ import org.apache.cassandra.distributed.impl.Versions;
  * to permit upgrade tests to perform cluster operations without updating the cross-version API,
  * so long as one node is up-to-date.
  */
-public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> implements ICluster, AutoCloseable
+public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> implements AutoCloseable
 {
-    private UpgradeableCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+    private UpgradeableCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader)
     {
         super(root, version, configs, sharedClassLoader);
     }
 
-    protected IUpgradeableInstance newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config)
+    protected IUpgradeableInstance newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config)
     {
         return new Wrapper(generation, version, config);
     }
 
     public static Builder<IUpgradeableInstance, UpgradeableCluster> build()
     {
-        return new Builder<>(UpgradeableCluster::new);
+        return new Builder<IUpgradeableInstance, UpgradeableCluster>(UpgradeableCluster::new)
+        {
+            {
+                withVersion(CURRENT_VERSION);
+            }
+
+            protected void setupLogging(File file)
+            {
+                setupLogging(file);
+            }
+
+            protected IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
+            {
+                return InstanceConfig.generate(nodeNum, ipAddress, networkTopology, root, token, seedIp);
+            }
+        };
     }
 
     public static Builder<IUpgradeableInstance, UpgradeableCluster> build(int nodeCount)
@@ -66,4 +83,3 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
         return build(nodeCount).withVersion(version).start();
     }
 }
-
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Feature.java b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
deleted file mode 100644
index b4ba036..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/Feature.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-public enum Feature
-{
-    NETWORK, GOSSIP, NATIVE_PROTOCOL
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/ICluster.java b/test/distributed/org/apache/cassandra/distributed/api/ICluster.java
deleted file mode 100644
index 091e5f0..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/ICluster.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-import java.util.stream.Stream;
-
-public interface ICluster
-{
-
-    IInstance get(int i);
-    IInstance get(InetAddressAndPort endpoint);
-    int size();
-    Stream<? extends IInstance> stream();
-    Stream<? extends IInstance> stream(String dcName);
-    Stream<? extends IInstance> stream(String dcName, String rackName);
-    IMessageFilters filters();
-
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
deleted file mode 100644
index ef44853..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import java.util.Iterator;
-import java.util.UUID;
-import java.util.concurrent.Future;
-
-// The cross-version API requires that a Coordinator can be constructed without any constructor arguments
-public interface ICoordinator
-{
-    // a bit hacky, but ConsistencyLevel draws in too many dependent classes, so we cannot have a cross-version
-    // method signature that accepts ConsistencyLevel directly.  So we just accept an Enum<?> and cast.
-    Object[][] execute(String query, Enum<?> consistencyLevel, Object... boundValues);
-
-    Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevel, int pageSize, Object... boundValues);
-
-    Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, Enum<?> consistencyLevel, Object... boundValues);
-    Object[][] executeWithTracing(UUID sessionId, String query, Enum<?> consistencyLevel, Object... boundValues);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
deleted file mode 100644
index 240c7ec..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-import java.util.UUID;
-import java.util.concurrent.Future;
-
-// The cross-version API requires that an Instance has a constructor signature of (IInstanceConfig, ClassLoader)
-public interface IInstance extends IIsolatedExecutor
-{
-    ICoordinator coordinator();
-    IListen listen();
-
-    void schemaChangeInternal(String query);
-    public Object[][] executeInternal(String query, Object... args);
-
-    IInstanceConfig config();
-    public InetAddressAndPort broadcastAddressAndPort();
-    UUID schemaVersion();
-
-    void startup();
-    boolean isShutdown();
-    Future<Void> shutdown();
-    Future<Void> shutdown(boolean graceful);
-
-    int liveMemberCount();
-
-    int nodetool(String... commandAndArgs);
-
-    // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
-    void startup(ICluster cluster);
-    void receiveMessage(IMessage message);
-
-    int getMessagingVersion();
-    void setMessagingVersion(InetAddressAndPort endpoint, int version);
-
-    void flush(String keyspace);
-    void forceCompact(String keyspace, String table);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
deleted file mode 100644
index d2804c2..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import java.util.UUID;
-
-import org.apache.cassandra.distributed.impl.NetworkTopology;
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-public interface IInstanceConfig
-{
-    int num();
-    UUID hostId();
-    InetAddressAndPort broadcastAddressAndPort();
-    NetworkTopology networkTopology();
-
-    default public String localRack()
-    {
-        return networkTopology().localRack(broadcastAddressAndPort());
-    }
-
-    default public String localDatacenter()
-    {
-        return networkTopology().localDC(broadcastAddressAndPort());
-    }
-
-    /**
-     * write the specified parameters to the Config object; we do not specify Config as the type to support a Config
-     * from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but
-     * must use the reflection API to modify the state
-     */
-    void propagate(Object writeToConfig);
-
-    /**
-     * Validates whether the config properties are within range of accepted values.
-     */
-    void validate();
-    Object get(String fieldName);
-    String getString(String fieldName);
-    int getInt(String fieldName);
-    boolean has(Feature featureFlag);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
deleted file mode 100644
index 6bb41d7..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import java.io.Serializable;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-/**
- * Represents a clean way to handoff evaluation of some work to an executor associated
- * with a node's lifetime.
- *
- * There is no transfer of execution to the parallel class hierarchy.
- *
- * Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class
- * to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself.
- * Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary.
- */
-public interface IIsolatedExecutor
-{
-    public interface CallableNoExcept<O> extends Callable<O> { public O call(); }
-    public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { }
-    public interface SerializableRunnable extends Runnable, Serializable {}
-    public interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
-    public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
-    public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
-    public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
-    public interface TriFunction<I1, I2, I3, O>
-    {
-        O apply(I1 i1, I2 i2, I3 i3);
-    }
-    public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { }
-
-    Future<Void> shutdown();
-
-    /**
-     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
-     */
-    <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call);
-
-    /**
-     * Convert the execution to one performed synchronously on the IsolatedExecutor
-     */
-    <O> CallableNoExcept<O> sync(CallableNoExcept<O> call);
-
-    /**
-     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
-     */
-    CallableNoExcept<Future<?>> async(Runnable run);
-
-    /**
-     * Convert the execution to one performed synchronously on the IsolatedExecutor
-     */
-    Runnable sync(Runnable run);
-
-    /**
-     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
-     */
-    <I> Function<I, Future<?>> async(Consumer<I> consumer);
-
-    /**
-     * Convert the execution to one performed synchronously on the IsolatedExecutor
-     */
-    <I> Consumer<I> sync(Consumer<I> consumer);
-
-    /**
-     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
-     */
-    <I1, I2> BiFunction<I1, I2, Future<?>> async(BiConsumer<I1, I2> consumer);
-
-    /**
-     * Convert the execution to one performed synchronously on the IsolatedExecutor
-     */
-    <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer);
-
-    /**
-     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
-     */
-    <I, O> Function<I, Future<O>> async(Function<I, O> f);
-
-    /**
-     * Convert the execution to one performed synchronously on the IsolatedExecutor
-     */
-    <I, O> Function<I, O> sync(Function<I, O> f);
-
-    /**
-     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
-     */
-    <I1, I2, O> BiFunction<I1, I2, Future<O>> async(BiFunction<I1, I2, O> f);
-
-    /**
-     * Convert the execution to one performed synchronously on the IsolatedExecutor
-     */
-    <I1, I2, O> BiFunction<I1, I2, O> sync(BiFunction<I1, I2, O> f);
-
-    /**
-     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
-     */
-    <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> async(TriFunction<I1, I2, I3, O> f);
-
-    /**
-     * Convert the execution to one performed synchronously on the IsolatedExecutor
-     */
-    <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);
-
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IListen.java b/test/distributed/org/apache/cassandra/distributed/api/IListen.java
deleted file mode 100644
index c2e8dd6..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IListen.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-public interface IListen
-{
-    public interface Cancel { void cancel(); }
-
-    Cancel schema(Runnable onChange);
-
-    Cancel liveMembers(Runnable onChange);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IMessage.java b/test/distributed/org/apache/cassandra/distributed/api/IMessage.java
deleted file mode 100644
index cd98543..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import java.io.Serializable;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-/**
- * A cross-version interface for delivering internode messages via message sinks.
- *
- * Message implementations should be serializable so we could load into instances.
- */
-public interface IMessage extends Serializable
-{
-    int verb();
-    byte[] bytes();
-    int id();
-    int version();
-    InetAddressAndPort from();
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
deleted file mode 100644
index 01fe972..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-public interface IMessageFilters
-{
-    public interface Filter
-    {
-        Filter off();
-        Filter on();
-    }
-
-    public interface Builder
-    {
-        Builder from(int ... nums);
-        Builder to(int ... nums);
-
-        /**
-         * Every message for which matcher returns `true` will be _dropped_ (assuming all
-         * other matchers in the chain will return `true` as well).
-         */
-        Builder messagesMatching(Matcher filter);
-        Filter drop();
-    }
-
-    public interface Matcher
-    {
-        boolean matches(int from, int to, IMessage message);
-    }
-
-    Builder verbs(int... verbs);
-    Builder allVerbs();
-    void reset();
-
-    /**
-     * {@code true} value returned by the implementation implies that the message was
-     * not matched by any filters and therefore should be delivered.
-     */
-    boolean permit(int from, int to, IMessage msg);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 474ade8..05c8af8 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -19,10 +19,7 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,28 +32,34 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.InstanceClassLoader;
+import org.apache.cassandra.distributed.shared.MessageFilters;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -85,8 +88,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
  * handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
  * coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
  */
-public abstract class AbstractCluster<I extends IInstance> implements ICluster, AutoCloseable
+public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
 {
+    public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());;
+
     // WARNING: we have this logger not (necessarily) for logging, but
     // to ensure we have instantiated the main classloader's LoggerFactory (and any LogbackStatusListener)
     // before we instantiate any for a new instance
@@ -98,17 +103,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
 
     // mutated by starting/stopping a node
     private final List<I> instances;
-    private final Map<InetAddressAndPort, I> instanceMap;
+    private final Map<InetSocketAddress, I> instanceMap;
 
     private final Versions.Version initialVersion;
 
     // mutated by user-facing API
     private final MessageFilters filters;
+    private volatile Thread.UncaughtExceptionHandler previousHandler = null;
 
     protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
     {
         private final int generation;
-        private final InstanceConfig config;
+        private final IInstanceConfig config;
         private volatile IInvokableInstance delegate;
         private volatile Versions.Version version;
         private volatile boolean isShutdown = true;
@@ -120,7 +126,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             return delegate;
         }
 
-        public Wrapper(int generation, Versions.Version version, InstanceConfig config)
+        public Wrapper(int generation, Versions.Version version, IInstanceConfig config)
         {
             this.generation = generation;
             this.config = config;
@@ -131,9 +137,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
 
         private IInvokableInstance newInstance(int generation)
         {
-            ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
-            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>) Instance::new, classLoader)
-                           .apply(config, classLoader);
+            ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader);
+            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader)
+                                        .apply(config, classLoader);
         }
 
         public IInstanceConfig config()
@@ -181,10 +187,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             throw new IllegalStateException("Cannot get live member count on shutdown instance");
         }
 
-        public int nodetool(String... commandAndArgs)
+        public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
         {
+            return delegate().nodetoolResult(withNotifications, commandAndArgs);
+        }
 
-            return delegate().nodetool(commandAndArgs);
+        public long killAttempts()
+        {
+            IInvokableInstance local = delegate;
+            // if shutdown cleared the delegate, then no longer know how many kill attempts happened, so return -1
+            if (local == null)
+                return -1;
+            return local.killAttempts();
         }
 
         @Override
@@ -209,9 +223,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
                 delegate = null;
             }
         }
+
+        public void uncaughtException(Thread thread, Throwable throwable)
+        {
+            IInvokableInstance delegate = this.delegate;
+            if (delegate != null)
+                delegate.uncaughtException(thread, throwable);
+            else
+                logger.error("uncaught exception in thread {}", thread, throwable);
+        }
     }
 
-    protected AbstractCluster(File root, Versions.Version initialVersion, List<InstanceConfig> configs,
+    protected AbstractCluster(File root, Versions.Version initialVersion, List<IInstanceConfig> configs,
                               ClassLoader sharedClassLoader)
     {
         this.root = root;
@@ -221,27 +244,27 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         this.initialVersion = initialVersion;
         int generation = AbstractCluster.generation.incrementAndGet();
 
-        for (InstanceConfig config : configs)
+        for (IInstanceConfig config : configs)
         {
             I instance = newInstanceWrapperInternal(generation, initialVersion, config);
             instances.add(instance);
             // we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
-            I prev = instanceMap.put(instance.broadcastAddressAndPort(), instance);
+            I prev = instanceMap.put(instance.broadcastAddress(), instance);
             if (null != prev)
-                throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddressAndPort() + " vs " + prev.broadcastAddressAndPort());
+                throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddress() + " vs " + prev.broadcastAddress());
         }
         this.filters = new MessageFilters();
     }
 
-    protected abstract I newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config);
+    protected abstract I newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config);
 
-    protected I newInstanceWrapperInternal(int generation, Versions.Version version, InstanceConfig config)
+    protected I newInstanceWrapperInternal(int generation, Versions.Version version, IInstanceConfig config)
     {
         config.validate();
         return newInstanceWrapper(generation, version, config);
     }
 
-    public I bootstrap(InstanceConfig config)
+    public I bootstrap(IInstanceConfig config)
     {
         if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK))
             throw new IllegalStateException("New nodes can only be bootstrapped when gossip and networking is enabled.");
@@ -249,12 +272,13 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         I instance = newInstanceWrapperInternal(0, initialVersion, config);
 
         instances.add(instance);
-        I prev = instanceMap.put(config.broadcastAddressAndPort(), instance);
+
+        I prev = instanceMap.put(config.broadcastAddress(), instance);
 
         if (null != prev)
         {
             throw new IllegalStateException(String.format("This cluster already contains a node (%d) with with same address and port: %s",
-                                                          config.num,
+                                                          config.num(),
                                                           instance));
         }
 
@@ -277,7 +301,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         return instances.get(node - 1);
     }
 
-    public I get(InetAddressAndPort addr)
+    public I get(InetSocketAddress addr)
     {
         return instanceMap.get(addr);
     }
@@ -336,7 +360,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         return filters;
     }
 
-    public MessageFilters.Builder verbs(MessagingService.Verb... verbs)
+    public IMessageFilters.Builder verbs(MessagingService.Verb... verbs)
     {
         int[] ids = new int[verbs.length];
         for (int i = 0; i < verbs.length; ++i)
@@ -364,6 +388,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }).run();
     }
 
+    public void schemaChange(String statement, int instance)
+    {
+        get(instance).schemaChangeInternal(statement);
+    }
+
     private void updateMessagingVersions()
     {
         for (IInstance reportTo : instances)
@@ -377,7 +406,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
                     continue;
 
                 int minVersion = Math.min(reportFrom.getMessagingVersion(), reportTo.getMessagingVersion());
-                reportTo.setMessagingVersion(reportFrom.broadcastAddressAndPort(), minVersion);
+                reportTo.setMessagingVersion(reportFrom.broadcastAddress(), minVersion);
             }
         }
     }
@@ -497,13 +526,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
-    public void schemaChange(String statement, int instance)
-    {
-        get(instance).schemaChangeInternal(statement);
-    }
-
-    void startup()
+    public void startup()
     {
+        previousHandler = Thread.getDefaultUncaughtExceptionHandler();
+        Thread.setDefaultUncaughtExceptionHandler(this::uncaughtExceptions);
         try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
         {
             // Start any instances with auto_bootstrap enabled first, and in series to avoid issues
@@ -527,255 +553,17 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
-    protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
-    {
-        C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader);
-    }
-
-    public static class Builder<I extends IInstance, C extends AbstractCluster<I>>
-    {
-        private final Factory<I, C> factory;
-        private int nodeCount;
-        private int subnet;
-        private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
-        private TokenSupplier tokenSupplier;
-        private File root;
-        private Versions.Version version = Versions.CURRENT;
-        private Consumer<InstanceConfig> configUpdater;
-
-        public Builder(Factory<I, C> factory)
-        {
-            this.factory = factory;
-        }
-
-        public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
-        {
-            this.tokenSupplier = tokenSupplier;
-            return this;
-        }
-
-        public Builder<I, C> withSubnet(int subnet)
-        {
-            this.subnet = subnet;
-            return this;
-        }
-
-        public Builder<I, C> withNodes(int nodeCount)
-        {
-            this.nodeCount = nodeCount;
-            return this;
-        }
-
-        public Builder<I, C> withDCs(int dcCount)
-        {
-            return withRacks(dcCount, 1);
-        }
-
-        public Builder<I, C> withRacks(int dcCount, int racksPerDC)
-        {
-            if (nodeCount == 0)
-                throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
-
-            int totalRacks = dcCount * racksPerDC;
-            int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
-            return withRacks(dcCount, racksPerDC, nodesPerRack);
-        }
-
-        public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
-        {
-            if (nodeIdTopology != null)
-                throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
-
-            nodeIdTopology = new HashMap<>();
-            int nodeId = 1;
-            for (int dc = 1; dc <= dcCount; dc++)
-            {
-                for (int rack = 1; rack <= racksPerDC; rack++)
-                {
-                    for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
-                        nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
-                }
-            }
-            // adjust the node count to match the allocatation
-            final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
-            if (adjustedNodeCount != nodeCount)
-            {
-                assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
-                logger.info("Network topology of {} DCs with {} racks per DC and {} nodes per rack required increasing total nodes to {}",
-                            dcCount, racksPerDC, nodesPerRack, adjustedNodeCount);
-                nodeCount = adjustedNodeCount;
-            }
-            return this;
-        }
-
-        public Builder<I, C> withDC(String dcName, int nodeCount)
-        {
-            return withRack(dcName, rackName(1), nodeCount);
-        }
-
-        public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
-        {
-            if (nodeIdTopology == null)
-            {
-                if (nodeCount > 0)
-                    throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
-
-                nodeIdTopology = new HashMap<>();
-            }
-            for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
-                nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
-
-            nodeCount += nodesInRack;
-            return this;
-        }
-
-        // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
-        public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
-        {
-            if (nodeIdTopology.isEmpty())
-                throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
-
-            IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
-                if (nodeIdTopology.get(nodeId) == null)
-                    throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
-            });
-
-            if (nodeCount != nodeIdTopology.size())
-            {
-                nodeCount = nodeIdTopology.size();
-                logger.info("Adjusting node count to {} for supplied network topology", nodeCount);
-            }
-
-            this.nodeIdTopology = new HashMap<>(nodeIdTopology);
-
-            return this;
-        }
-
-        public Builder<I, C> withRoot(File root)
-        {
-            this.root = root;
-            return this;
-        }
-
-        public Builder<I, C> withVersion(Versions.Version version)
-        {
-            this.version = version;
-            return this;
-        }
-
-        public Builder<I, C> withConfig(Consumer<InstanceConfig> updater)
-        {
-            this.configUpdater = updater;
-            return this;
-        }
-
-        public C createWithoutStarting() throws IOException
-        {
-            if (root == null)
-                root = Files.createTempDirectory("dtests").toFile();
-
-            if (nodeCount <= 0)
-                throw new IllegalStateException("Cluster must have at least one node");
-
-            if (nodeIdTopology == null)
-            {
-                nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
-                                          .collect(Collectors.toMap(nodeId -> nodeId,
-                                                                    nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
-            }
-
-            root.mkdirs();
-            setupLogging(root);
-
-            ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
-
-            List<InstanceConfig> configs = new ArrayList<>();
-
-            if (tokenSupplier == null)
-                tokenSupplier = evenlyDistributedTokens(nodeCount);
-
-            for (int i = 0; i < nodeCount; ++i)
-            {
-                int nodeNum = i + 1;
-                configs.add(createInstanceConfig(nodeNum));
-            }
-
-            return factory.newCluster(root, version, configs, sharedClassLoader);
-        }
-
-        public InstanceConfig newInstanceConfig(C cluster)
-        {
-            return createInstanceConfig(cluster.size() + 1);
-        }
-
-        private InstanceConfig createInstanceConfig(int nodeNum)
-        {
-            String ipPrefix = "127.0." + subnet + ".";
-            String seedIp = ipPrefix + "1";
-            String ipAddress = ipPrefix + nodeNum;
-            long token = tokenSupplier.token(nodeNum);
-
-            NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
-
-            InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
-            if (configUpdater != null)
-                configUpdater.accept(config);
-
-            return config;
-        }
-
-        public C start() throws IOException
-        {
-            C cluster = createWithoutStarting();
-            cluster.startup();
-            return cluster;
-        }
-    }
-
-    public static TokenSupplier evenlyDistributedTokens(int numNodes)
-    {
-        long increment = (Long.MAX_VALUE / numNodes) * 2;
-        return (int nodeId) -> {
-            assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
-                                                      nodeId, numNodes);
-            return Long.MIN_VALUE + 1 + nodeId * increment;
-        };
-    }
-
-    public static interface TokenSupplier
-    {
-        public long token(int nodeId);
-    }
-
-    static String dcName(int index)
-    {
-        return "datacenter" + index;
-    }
-
-    static String rackName(int index)
+    private void uncaughtExceptions(Thread thread, Throwable error)
     {
-        return "rack" + index;
-    }
-
-    private static void setupLogging(File root)
-    {
-        try
-        {
-            String testConfPath = "test/conf/logback-dtest.xml";
-            Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
-
-            if (!logConfPath.toFile().exists())
-            {
-                Files.copy(new File(testConfPath).toPath(),
-                           logConfPath);
-            }
-
-            System.setProperty("logback.configurationFile", "file://" + logConfPath);
-        }
-        catch (IOException e)
+        if (!(thread.getContextClassLoader() instanceof InstanceClassLoader))
         {
-            throw new RuntimeException(e);
+            Thread.UncaughtExceptionHandler handler = previousHandler;
+            if (null != handler)
+                handler.uncaughtException(thread, error);
+            return;
         }
+        InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader();
+        get(cl.getInstanceId()).uncaughtException(thread, error);
     }
 
     @Override
@@ -790,7 +578,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         instances.clear();
         instanceMap.clear();
         // Make sure to only delete directory when threads are stopped
-        FileUtils.deleteRecursive(root);
+        if (root.exists())
+            FileUtils.deleteRecursive(root);
+        Thread.setDefaultUncaughtExceptionHandler(previousHandler);
+        previousHandler = null;
 
         //withThreadLeakCheck(futures);
     }
@@ -813,5 +604,23 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
+    public List<Token> tokens()
+    {
+        return stream()
+               .map(i ->
+                    {
+                        try
+                        {
+                            IPartitioner partitioner = ((IPartitioner)Class.forName(i.config().getString("partitioner")).newInstance());
+                            return partitioner.getTokenFactory().fromString(i.config().getString("initial_token"));
+                        }
+                        catch (Throwable t)
+                        {
+                            throw new RuntimeException(t);
+                        }
+                    })
+               .collect(Collectors.toList());
+    }
+
 }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index cca3643..91a2aaf 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.impl;
 
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -30,8 +31,10 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.QueryResult;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.pager.Pageable;
@@ -41,6 +44,7 @@ import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class Coordinator implements ICoordinator
 {
@@ -51,18 +55,18 @@ public class Coordinator implements ICoordinator
     }
 
     @Override
-    public Object[][] execute(String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
+    public QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues)
     {
-        return instance.sync(() -> executeInternal(query, consistencyLevelOrigin, boundValues)).call();
+        return instance().sync(() -> executeInternal(query, consistencyLevel, boundValues)).call();
     }
 
-    public Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
+    public Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
     {
         return instance.async(() -> {
             try
             {
                 Tracing.instance.newSession(sessionId);
-                return executeInternal(query, consistencyLevelOrigin, boundValues);
+                return executeInternal(query, consistencyLevelOrigin, boundValues).toObjectArrays();
             }
             finally
             {
@@ -71,7 +75,12 @@ public class Coordinator implements ICoordinator
         }).call();
     }
 
-    private Object[][] executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues)
+    protected org.apache.cassandra.db.ConsistencyLevel toCassandraCL(ConsistencyLevel cl)
+    {
+        return org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal());
+    }
+
+    private QueryResult executeInternal(String query, ConsistencyLevel consistencyLevelOrigin, Object[] boundValues)
     {
         ClientState clientState = ClientState.forInternalCalls();
         CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement;
@@ -82,7 +91,7 @@ public class Coordinator implements ICoordinator
 
         prepared.validate(QueryState.forInternalCalls().getClientState());
         ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
-                                             QueryOptions.create(consistencyLevel,
+                                             QueryOptions.create(toCassandraCL(consistencyLevel),
                                                                  boundBBValues,
                                                                  false,
                                                                  Integer.MAX_VALUE,
@@ -91,18 +100,30 @@ public class Coordinator implements ICoordinator
                                                                  Server.CURRENT_VERSION));
 
         if (res != null && res.kind == ResultMessage.Kind.ROWS)
-            return RowUtil.toObjects((ResultMessage.Rows) res);
+        {
+            ResultMessage.Rows rows = (ResultMessage.Rows) res;
+            String[] names = rows.result.metadata.names.stream().map(c -> c.name.toString()).toArray(String[]::new);
+            Object[][] results = RowUtil.toObjects(rows);
+            return new QueryResult(names, results);
+        }
         else
-            return new Object[][]{};
+        {
+            return QueryResult.EMPTY;
+        }
     }
 
-    public Object[][] executeWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
+    public Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
     {
         return IsolatedExecutor.waitOn(asyncExecuteWithTracing(sessionId, query, consistencyLevelOrigin, boundValues));
     }
 
+    public IInstance instance()
+    {
+        return instance;
+    }
+
     @Override
-    public Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevelOrigin, int pageSize, Object... boundValues)
+    public Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevelOrigin, int pageSize, Object... boundValues)
     {
         if (pageSize <= 0)
             throw new IllegalArgumentException("Page size should be strictly positive but was " + pageSize);
@@ -121,7 +142,7 @@ public class Coordinator implements ICoordinator
 
             ClientState clientState = QueryState.forInternalCalls().getClientState();
             SelectStatement selectStatement = (SelectStatement) prepared;
-            QueryOptions queryOptions = QueryOptions.create(consistencyLevel,
+            QueryOptions queryOptions = QueryOptions.create(toCassandraCL(consistencyLevel),
                                                             boundBBValues,
                                                             false,
                                                             pageSize,
@@ -132,7 +153,7 @@ public class Coordinator implements ICoordinator
 
             // Usually pager fetches a single page (see SelectStatement#execute). We need to iterate over all
             // of the results lazily.
-            QueryPager pager = QueryPagers.pager(pageable, consistencyLevel, clientState, null);
+            QueryPager pager = QueryPagers.pager(pageable, toCassandraCL(consistencyLevel), clientState, null);
             Iterator<Object[]> iter = RowUtil.toObjects(selectStatement.getResultMetadata().names,
                                                         UntypedResultSet.create(selectStatement,
                                                                                 pager,
@@ -152,4 +173,9 @@ public class Coordinator implements ICoordinator
             };
         }).call();
     }
+
+    private static ClientState makeFakeClientState()
+    {
+        return ClientState.forExternalCalls(new InetSocketAddress(FBUtilities.getLocalAddress(), 9042));
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index 7294944..2f7a043 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
@@ -29,9 +30,10 @@ import java.util.function.Function;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 
 public abstract class DelegatingInvokableInstance implements IInvokableInstance
 {
@@ -44,9 +46,9 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
     }
 
     @Override
-    public InetAddressAndPort broadcastAddressAndPort()
+    public InetSocketAddress broadcastAddress()
     {
-        return delegate().broadcastAddressAndPort();
+        return delegate().broadcastAddress();
     }
 
     @Override
@@ -80,7 +82,7 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
     }
 
     @Override
-    public void setMessagingVersion(InetAddressAndPort endpoint, int version)
+    public void setMessagingVersion(InetSocketAddress endpoint, int version)
     {
         delegate().setMessagingVersion(endpoint, version);
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
index f8f157a..8652bbc 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
@@ -19,11 +19,14 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
@@ -35,6 +38,30 @@ import org.apache.cassandra.utils.FBUtilities;
 public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
 {
     private static NetworkTopology mapping = null;
+    private static final Map<InetAddressAndPort, InetSocketAddress> cache = new ConcurrentHashMap<>();
+    private static final Map<InetSocketAddress, InetAddressAndPort> cacheInverse = new ConcurrentHashMap<>();
+
+    static InetAddressAndPort toCassandraInetAddressAndPort(InetSocketAddress addressAndPort)
+    {
+        InetAddressAndPort m = cacheInverse.get(addressAndPort);
+        if (m == null)
+        {
+            m = InetAddressAndPort.getByAddressOverrideDefaults(addressAndPort.getAddress(), addressAndPort.getPort());
+            cache.put(m, addressAndPort);
+        }
+        return m;
+    }
+
+    static InetSocketAddress fromCassandraInetAddressAndPort(InetAddressAndPort addressAndPort)
+    {
+        InetSocketAddress m = cache.get(addressAndPort);
+        if (m == null)
+        {
+            m = NetworkTopology.addressAndPort(addressAndPort.address, addressAndPort.port);
+            cache.put(addressAndPort, m);
+        }
+        return m;
+    }
 
     private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
     private static final String DEFAULT_DC = "UNKNOWN_DC";
@@ -49,7 +76,7 @@ public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
     public String getRack(InetAddressAndPort endpoint)
     {
         assert mapping != null : "network topology must be assigned before using snitch";
-        return maybeGetFromEndpointState(mapping.localRack(endpoint), endpoint, ApplicationState.RACK, DEFAULT_RACK);
+        return maybeGetFromEndpointState(mapping.localRack(fromCassandraInetAddressAndPort(endpoint)), endpoint, ApplicationState.RACK, DEFAULT_RACK);
     }
 
     public String getDatacenter(InetAddress endpoint)
@@ -61,7 +88,7 @@ public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
     public String getDatacenter(InetAddressAndPort endpoint)
     {
         assert mapping != null : "network topology must be assigned before using snitch";
-        return maybeGetFromEndpointState(mapping.localDC(endpoint), endpoint, ApplicationState.DC, DEFAULT_DC);
+        return maybeGetFromEndpointState(mapping.localDC(fromCassandraInetAddressAndPort(endpoint)), endpoint, ApplicationState.DC, DEFAULT_DC);
     }
 
     // Here, the logic is slightly different from what we have in GossipingPropertyFileSnitch since we have a different
@@ -84,7 +111,6 @@ public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
                                        entry.getValue());
                 }
             }
-
             if (savedEndpoints.containsKey(endpoint))
                 return savedEndpoints.get(endpoint).get("data_center");
 
@@ -103,6 +129,7 @@ public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
     {
         super.gossiperStarting();
 
+
         Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
                                                    StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/IInvokableInstance.java
deleted file mode 100644
index d0f8a5f..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/IInvokableInstance.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import java.io.Serializable;
-import java.util.concurrent.Future;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.apache.cassandra.distributed.api.IInstance;
-
-/**
- * This version is only supported for a Cluster running the same code as the test environment, and permits
- * ergonomic cross-node behaviours, without editing the cross-version API.
- *
- * A lambda can be written tto be invoked on any or all of the nodes.
- *
- * The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will
- * not be the same in the alternate version.  Even were it not, there would likely be a runtime linkage error given
- * any code divergence.
- */
-public interface IInvokableInstance extends IInstance
-{
-    public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
-    public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
-    public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }
-
-    public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
-    public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
-    public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
-
-    public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
-    public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
-
-    public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
-    public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
-
-    public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
-    public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
-
-    public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
-    public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
-
-    public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
-    public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
-
-    public <E extends Serializable> E transfer(E object);
-
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
index 3eb3657..d42e799 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.impl;
 
 import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.shared.Versions;
 
 // this lives outside the api package so that we do not have to worry about inter-version compatibility
 public interface IUpgradeableInstance extends IInstance
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 0647198..1c19bca 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,11 +32,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import javax.management.ListenerNotFoundException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.SharedExecutorPool;
 import org.apache.cassandra.concurrent.StageManager;
@@ -60,16 +67,21 @@ import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.IndexSummaryManager;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -79,18 +91,19 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.streaming.StreamCoordinator;
 import org.apache.cassandra.tools.NodeTool;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Ref;
-import org.w3c.dom.events.UIEvent;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
@@ -99,6 +112,13 @@ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 
 public class Instance extends IsolatedExecutor implements IInvokableInstance
 {
+    private static final Map<Class<?>, Function<Object, Object>> mapper = new HashMap<Class<?>, Function<Object, Object>>() {{
+        this.put(IInstanceConfig.ParameterizedClass.class, (obj) -> {
+            IInstanceConfig.ParameterizedClass pc = (IInstanceConfig.ParameterizedClass) obj;
+            return new org.apache.cassandra.config.ParameterizedClass(pc.class_name, pc.parameters);
+        });
+    }};
+
     public final IInstanceConfig config;
 
     // should never be invoked directly, so that it is instantiated on other class loader;
@@ -108,18 +128,21 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         super("node" + config.num(), classLoader);
         this.config = config;
         InstanceIDDefiner.setInstanceId(config.num());
-        FBUtilities.setBroadcastInetAddress(config.broadcastAddressAndPort().address);
+        FBUtilities.setBroadcastInetAddress(config.broadcastAddress().getAddress());
+
         // Set the config at instance creation, possibly before startup() has run on all other instances.
         // setMessagingVersions below will call runOnInstance which will instantiate
         // the MessagingService and dependencies preventing later changes to network parameters.
         Config.setOverrideLoadConfig(() -> loadConfig(config));
     }
 
+    @Override
     public IInstanceConfig config()
     {
         return config;
     }
 
+    @Override
     public ICoordinator coordinator()
     {
         return new Coordinator(this);
@@ -131,8 +154,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     }
 
     @Override
-    public InetAddressAndPort broadcastAddressAndPort() { return config.broadcastAddressAndPort(); }
+    public InetSocketAddress broadcastAddress() { return config.broadcastAddress(); }
 
+    @Override
     public Object[][] executeInternal(String query, Object... args)
     {
         return sync(() -> {
@@ -162,7 +186,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
     public boolean isShutdown()
     {
-        throw new UnsupportedOperationException();
+        return isolatedExecutor.isShutdown();
     }
 
     @Override
@@ -188,32 +212,34 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }).run();
     }
 
-    private void registerMockMessaging(ICluster cluster)
+    private void registerMockMessaging(ICluster<IInstance> cluster)
     {
-        BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
-        BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
+        BiConsumer<InetSocketAddress, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
+        BiConsumer<InetSocketAddress, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
             int fromNum = config().num();
             int toNum = cluster.get(to).config().num();
-            if (cluster.filters().permit(fromNum, toNum, message))
+
+            if (cluster.filters().permitOutbound(fromNum, toNum, message)
+                && cluster.filters().permitInbound(fromNum, toNum, message))
                 deliverToInstance.accept(to, message);
         };
 
-        Map<InetAddress, InetAddressAndPort> addressAndPortMap = new HashMap<>();
+        Map<InetAddress, InetSocketAddress> addressAndPortMap = new HashMap<>();
         cluster.stream().forEach(instance -> {
-            InetAddressAndPort addressAndPort = instance.broadcastAddressAndPort();
-            if (!addressAndPort.equals(instance.config().broadcastAddressAndPort()))
-                throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddressAndPort());
-            InetAddressAndPort prev = addressAndPortMap.put(addressAndPort.address, addressAndPort);
+            InetSocketAddress addressAndPort = instance.broadcastAddress();
+            if (!addressAndPort.equals(instance.config().broadcastAddress()))
+                throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddress());
+            InetSocketAddress prev = addressAndPortMap.put(addressAndPort.getAddress(),
+                                                                        addressAndPort);
             if (null != prev)
                 throw new IllegalStateException("This version of Cassandra does not support multiple nodes with the same InetAddress: " + addressAndPort + " vs " + prev);
         });
 
-        MessagingService.instance().addMessageSink(
-                new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
+        MessagingService.instance().addMessageSink(new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
     }
 
     // unnecessary if registerMockMessaging used
-    private void registerFilter(ICluster cluster)
+    private void registerFilters(ICluster cluster)
     {
         IInstance instance = this;
         MessagingService.instance().addMessageSink(new IMessageSink()
@@ -221,31 +247,93 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress toAddress)
             {
                 // Port is not passed in, so take a best guess at the destination port from this instance
-                IInstance to = cluster.get(InetAddressAndPort.getByAddressOverrideDefaults(toAddress, instance.config().broadcastAddressAndPort().port));
+                IInstance to = cluster.get(NetworkTopology.addressAndPort(toAddress,
+                                                                          instance.config().broadcastAddress().getPort()));
                 int fromNum = config().num();
                 int toNum = to.config().num();
-                return cluster.filters().permit(fromNum, toNum, serializeMessage(message, id, broadcastAddressAndPort(), to.broadcastAddressAndPort()));
+                return cluster.filters().permitOutbound(fromNum, toNum, serializeMessage(message, id,
+                                                                                 broadcastAddress(),
+                                                                                 to.broadcastAddress()));
             }
 
             public boolean allowIncomingMessage(MessageIn message, int id)
             {
-                return true;
+                // Port is not passed in, so take a best guess at the destination port from this instance
+                IInstance from = cluster.get(NetworkTopology.addressAndPort(message.from,
+                                                                            instance.config().broadcastAddress().getPort()));
+                int fromNum = from.config().num();
+                int toNum = config().num();
+
+
+                IMessage msg = serializeMessage(message, id, from.broadcastAddress(), broadcastAddress());
+
+                return cluster.filters().permitInbound(fromNum, toNum, msg);
             }
         });
     }
 
-    public static IMessage serializeMessage(MessageOut messageOut, int id, InetAddressAndPort from, InetAddressAndPort to)
+    public static IMessage serializeMessage(MessageOut messageOut, int id, InetSocketAddress from, InetSocketAddress to)
     {
         try (DataOutputBuffer out = new DataOutputBuffer(1024))
         {
-            int version = MessagingService.instance().getVersion(to.address);
+            int version = MessagingService.instance().getVersion(to.getAddress());
 
             out.writeInt(MessagingService.PROTOCOL_MAGIC);
             out.writeInt(id);
             long timestamp = System.currentTimeMillis();
             out.writeInt((int) timestamp);
             messageOut.serialize(out, version);
-            return new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
+            return new MessageImpl(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static IMessage serializeMessage(MessageIn<?> messageIn, int id, InetSocketAddress from, InetSocketAddress to)
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer(1024))
+        {
+            // Serialize header
+            int version = MessagingService.instance().getVersion(to.getAddress());
+
+            out.writeInt(MessagingService.PROTOCOL_MAGIC);
+            out.writeInt(id);
+            long timestamp = System.currentTimeMillis();
+            out.writeInt((int) timestamp);
+
+            // Serialize the message itself
+            IVersionedSerializer serializer = MessagingService.instance().verbSerializers.get(messageIn.verb);
+            CompactEndpointSerializationHelper.serialize(from.getAddress(), out);
+
+            out.writeInt(messageIn.verb.ordinal());
+            out.writeInt(messageIn.parameters.size());
+            for (Map.Entry<String, byte[]> entry : messageIn.parameters.entrySet())
+            {
+                out.writeUTF(entry.getKey());
+                out.writeInt(entry.getValue().length);
+                out.write(entry.getValue());
+            }
+
+            if (messageIn.payload != null && serializer != MessagingService.CallbackDeterminedSerializer.instance)
+            {
+                try (DataOutputBuffer dob = new DataOutputBuffer())
+                {
+                    serializer.serialize(messageIn.payload, dob, version);
+
+                    int size = dob.getLength();
+                    out.writeInt(size);
+                    out.write(dob.getData(), 0, size);
+                }
+            }
+            else
+            {
+                out.writeInt(0);
+            }
+
+
+            return new MessageImpl(messageIn.verb.ordinal(), out.toByteArray(), id, version, from);
         }
         catch (IOException e)
         {
@@ -255,9 +343,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
     private class MessageDeliverySink implements IMessageSink
     {
-        private final BiConsumer<InetAddressAndPort, IMessage> deliver;
-        private final Function<InetAddress, InetAddressAndPort> lookupAddressAndPort;
-        MessageDeliverySink(BiConsumer<InetAddressAndPort, IMessage> deliver, Function<InetAddress, InetAddressAndPort> lookupAddressAndPort)
+        private final BiConsumer<InetSocketAddress, IMessage> deliver;
+        private final Function<InetAddress, InetSocketAddress> lookupAddressAndPort;
+
+        MessageDeliverySink(BiConsumer<InetSocketAddress, IMessage> deliver,
+                            Function<InetAddress, InetSocketAddress> lookupAddressAndPort)
         {
             this.deliver = deliver;
             this.lookupAddressAndPort = lookupAddressAndPort;
@@ -265,35 +355,35 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
         public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress to)
         {
-            InetAddressAndPort from = broadcastAddressAndPort();
-            InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
+            InetSocketAddress from = broadcastAddress();
             assert from.equals(lookupAddressAndPort.apply(messageOut.from));
 
-            IMessage serialized = serializeMessage(messageOut, id, broadcastAddressAndPort(), lookupAddressAndPort.apply(messageOut.from));
-
             // Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected
             byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER);
             if (sessionBytes != null)
             {
                 UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                 TraceState state = Tracing.instance.get(sessionId);
-                String message = String.format("Sending %s message to %s", messageOut.verb, to);
+                String traceMessage = String.format("Sending %s message to %s", messageOut.verb, to);
                 // session may have already finished; see CASSANDRA-5668
                 if (state == null)
                 {
                     byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE);
                     Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
-                    TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL());
+                    TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), traceMessage, -1, traceType.getTTL());
                 }
                 else
                 {
-                    state.trace(message);
+                    state.trace(traceMessage);
                     if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE)
                         Tracing.instance.doneWithNonLocalSession(state);
                 }
             }
 
-            deliver.accept(toFull, serialized);
+            InetSocketAddress toFull = lookupAddressAndPort.apply(to);
+            deliver.accept(toFull,
+                           serializeMessage(messageOut, id, broadcastAddress(), toFull));
+
             return false;
         }
 
@@ -304,12 +394,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }
     }
 
-    public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage msg)
+    public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage imessage)
     {
         // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
-        try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(msg.bytes())))
+        try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(imessage.bytes())))
         {
-            int version = msg.version();
+            int version = imessage.version();
             if (version > MessagingService.current_version)
             {
                 throw new IllegalStateException(String.format("Received message version %d but current version is %d",
@@ -323,8 +413,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 id = Integer.parseInt(input.readUTF());
             else
                 id = input.readInt();
-            if (msg.id() != id)
-                throw new IllegalStateException(String.format("Message id mismatch: %d != %d", msg.id(), id));
+            if (imessage.id() != id)
+                throw new IllegalStateException(String.format("Message id mismatch: %d != %d", imessage.id(), id));
 
             // make sure to readInt, even if cross_node_to is not enabled
             int partial = input.readInt();
@@ -347,7 +437,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
             }
             catch (Throwable t)
             {
-                throw new RuntimeException("Exception occurred on node " + broadcastAddressAndPort(), t);
+                throw new RuntimeException("Exception occurred on node " + broadcastAddress(), t);
             }
 
             MessageIn<Object> message = deserialized.left;
@@ -380,9 +470,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         return callsOnInstance(() -> MessagingService.current_version).call();
     }
 
-    public void setMessagingVersion(InetAddressAndPort endpoint, int version)
+    public void setMessagingVersion(InetSocketAddress endpoint, int version)
     {
-        runOnInstance(() -> MessagingService.instance().setVersion(endpoint.address, version));
+        runOnInstance(() -> MessagingService.instance().setVersion(endpoint.getAddress(), version));
     }
 
     public void flush(String keyspace)
@@ -412,7 +502,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
             {
                 mkdirs();
 
-                assert config.networkTopology().contains(config.broadcastAddressAndPort());
+                assert config.networkTopology().contains(config.broadcastAddress());
                 DistributedTestSnitch.assign(config.networkTopology());
 
                 DatabaseDescriptor.setDaemonInitialized();
@@ -446,7 +536,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
                 if (config.has(NETWORK))
                 {
-                    registerFilter(cluster);
+                    registerFilters(cluster);
                     MessagingService.instance().listen();
                 }
                 else
@@ -479,9 +569,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                     StorageService.instance.setRpcReady(true);
                 }
 
-                if (!FBUtilities.getBroadcastAddress().equals(broadcastAddressAndPort().address))
+                if (!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress()))
                     throw new IllegalStateException();
-                if (DatabaseDescriptor.getStoragePort() != broadcastAddressAndPort().port)
+                if (DatabaseDescriptor.getStoragePort() != broadcastAddress().getPort())
                     throw new IllegalStateException();
             }
             catch (Throwable t)
@@ -504,7 +594,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     private static Config loadConfig(IInstanceConfig overrides)
     {
         Config config = new Config();
-        overrides.propagate(config);
+        overrides.propagate(config, mapper);
         return config;
     }
 
@@ -513,13 +603,13 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         // This should be done outside instance in order to avoid serializing config
         String partitionerName = config.getString("partitioner");
         List<String> initialTokens = new ArrayList<>();
-        List<InetAddressAndPort> hosts = new ArrayList<>();
+        List<InetSocketAddress> hosts = new ArrayList<>();
         List<UUID> hostIds = new ArrayList<>();
         for (int i = 1 ; i <= cluster.size() ; ++i)
         {
             IInstanceConfig config = cluster.get(i).config();
             initialTokens.add(config.getString("initial_token"));
-            hosts.add(config.broadcastAddressAndPort());
+            hosts.add(config.broadcastAddress());
             hostIds.add(config.hostId());
         }
 
@@ -533,24 +623,24 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
             for (int i = 0; i < tokens.size(); i++)
             {
-                InetAddressAndPort ep = hosts.get(i);
-                Gossiper.instance.initializeNodeUnsafe(ep.address, hostIds.get(i), 1);
-                Gossiper.instance.injectApplicationState(ep.address,
+                InetSocketAddress ep = hosts.get(i);
+                Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostIds.get(i), 1);
+                Gossiper.instance.injectApplicationState(ep.getAddress(),
                         ApplicationState.TOKENS,
                         new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i))));
-                storageService.onChange(ep.address,
+                storageService.onChange(ep.getAddress(),
                         ApplicationState.STATUS,
                         new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
-                Gossiper.instance.realMarkAlive(ep.address, Gossiper.instance.getEndpointStateForEndpoint(ep.address));
+                Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
                 int messagingVersion = cluster.get(ep).isShutdown()
                                        ? MessagingService.current_version
                                        : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion());
-                MessagingService.instance().setVersion(ep.address, messagingVersion);
+                MessagingService.instance().setVersion(ep.getAddress(), messagingVersion);
             }
 
             // check that all nodes are in token metadata
             for (int i = 0; i < tokens.size(); ++i)
-                assert storageService.getTokenMetadata().isMember(hosts.get(i).address);
+                assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress());
         }
         catch (Throwable e) // UnknownHostException
         {
@@ -563,6 +653,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         return shutdown(true);
     }
 
+    @Override
     public Future<Void> shutdown(boolean graceful)
     {
         Future<?> future = async((ExecutorService executor) -> {
@@ -582,6 +673,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
             }
 
             error = parallelRun(error, executor,
+                                MessagingService.instance()::shutdown
+            );
+
+            error = parallelRun(error, executor,
                                 () -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
                                 CompactionManager.instance::forceShutdown,
                                 () -> BatchlogManager.shutdownAndWait(1L, MINUTES),
@@ -597,8 +692,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
             );
             error = parallelRun(error, executor,
-                                CommitLog.instance::shutdownBlocking,
-                                MessagingService.instance()::shutdown
+                                CommitLog.instance::shutdownBlocking
             );
             error = parallelRun(error, executor,
                                 () -> StageManager.shutdownAndWait(1L, MINUTES),
@@ -621,11 +715,85 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }).call();
     }
 
-    public int nodetool(String... commandAndArgs)
+    public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
+    {
+        return sync(() -> {
+            DTestNodeTool nodetool = new DTestNodeTool(withNotifications);
+            int rc =  nodetool.execute(commandAndArgs);
+            return new NodeToolResult(commandAndArgs, rc, new ArrayList<>(nodetool.notifications.notifications), nodetool.latestError);
+        }).call();
+    }
+
+    private static class DTestNodeTool extends NodeTool {
+        private final StorageServiceMBean storageProxy;
+        private final CollectingNotificationListener notifications = new CollectingNotificationListener();
+
+        private Throwable latestError;
+
+        DTestNodeTool(boolean withNotifications) {
+            super(new InternalNodeProbeFactory(withNotifications));
+            storageProxy = new InternalNodeProbe(withNotifications).getStorageService();
+            storageProxy.addNotificationListener(notifications, null, null);
+        }
+
+        public int execute(String... args)
+        {
+            try
+            {
+                return super.execute(args);
+            }
+            finally
+            {
+                try
+                {
+                    storageProxy.removeNotificationListener(notifications, null, null);
+                }
+                catch (ListenerNotFoundException e)
+                {
+                    // ignored
+                }
+            }
+        }
+
+        protected void badUse(Exception e)
+        {
+            super.badUse(e);
+            latestError = e;
+        }
+
+        protected void err(Throwable e)
+        {
+            super.err(e);
+            latestError = e;
+        }
+    }
+
+    private static final class CollectingNotificationListener implements NotificationListener
     {
-        return sync(() -> new NodeTool(new InternalNodeProbeFactory()).execute(commandAndArgs)).call();
+        private final List<Notification> notifications = new CopyOnWriteArrayList<>();
+
+        public void handleNotification(Notification notification, Object handback)
+        {
+            notifications.add(notification);
+        }
+    }
+
+    public void uncaughtException(Thread thread, Throwable throwable)
+    {
+        System.out.println(String.format("Exception %s occurred on thread %s", throwable.getMessage(), thread.getName()));
+        throwable.printStackTrace();
     }
 
+    public long killAttempts()
+    {
+        return callOnInstance(InstanceKiller::getKillAttempts);
+    }
+
+    private static void shutdownAndWait(List<ExecutorService> executors) throws TimeoutException, InterruptedException
+    {
+        ExecutorUtils.shutdownNow(executors);
+        ExecutorUtils.awaitTermination(1L, MINUTES, executors);
+    }
 
     private static Throwable parallelRun(Throwable accumulate, ExecutorService runOn, ThrowingRunnable ... runnables)
     {
@@ -659,4 +827,4 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }
         return accumulate;
     }
-}
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
deleted file mode 100644
index fb78902..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import com.google.common.base.Predicate;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SigarLibrary;
-
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class InstanceClassLoader extends URLClassLoader
-{
-    // Classes that have to be shared between instances, for configuration or returning values
-    private static final Set<String> sharedClassNames = Arrays.stream(new Class[]
-            {
-                    Pair.class,
-                    InetAddressAndPort.class,
-                    ParameterizedClass.class,
-                    SigarLibrary.class,
-                    IInvokableInstance.class,
-                    NetworkTopology.class
-            })
-            .map(Class::getName)
-            .collect(Collectors.toSet());
-
-    private static final Predicate<String> sharePackage = name ->
-               name.startsWith("org.apache.cassandra.distributed.api.")
-            || name.startsWith("sun.")
-            || name.startsWith("oracle.")
-            || name.startsWith("com.intellij.")
-            || name.startsWith("com.sun.")
-            || name.startsWith("com.oracle.")
-            || name.startsWith("java.")
-            || name.startsWith("javax.")
-            || name.startsWith("jdk.")
-            || name.startsWith("netscape.")
-            || name.startsWith("org.xml.sax.");
-
-    private static final Predicate<String> shareClass = name -> sharePackage.apply(name) || sharedClassNames.contains(name);
-
-    public static interface Factory
-    {
-        InstanceClassLoader create(int id, URL[] urls, ClassLoader sharedClassLoader);
-    }
-
-    private volatile boolean isClosed = false;
-    private final URL[] urls;
-    private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected
-    private final int id;
-    private final ClassLoader sharedClassLoader;
-
-    InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
-    {
-        super(urls, null);
-        this.urls = urls;
-        this.sharedClassLoader = sharedClassLoader;
-        this.generation = generation;
-        this.id = id;
-    }
-
-    @Override
-    public Class<?> loadClass(String name) throws ClassNotFoundException
-    {
-        if (shareClass.apply(name))
-            return sharedClassLoader.loadClass(name);
-
-        return loadClassInternal(name);
-    }
-
-    Class<?> loadClassInternal(String name) throws ClassNotFoundException
-    {
-        if (isClosed)
-            throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name));
-
-        synchronized (getClassLoadingLock(name))
-        {
-            // First, check if the class has already been loaded
-            Class<?> c = findLoadedClass(name);
-
-            if (c == null)
-                c = findClass(name);
-
-            return c;
-        }
-    }
-
-    /**
-     * @return true iff this class was loaded by an InstanceClassLoader, and as such is used by a dtest node
-     */
-    public static boolean wasLoadedByAnInstanceClassLoader(Class<?> clazz)
-    {
-        return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
-    }
-
-    public String toString()
-    {
-        return "InstanceClassLoader{" +
-               "generation=" + generation +
-               ", id = " + id +
-               ", urls=" + Arrays.toString(urls) +
-               '}';
-    }
-
-    public void close() throws IOException
-    {
-        isClosed = true;
-        super.close();
-    }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 6d668e6..0aafbca 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -18,16 +18,9 @@
 
 package org.apache.cassandra.distributed.impl;
 
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.distributed.api.Feature;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.SimpleSeedProvider;
-
 import java.io.File;
 import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,6 +28,14 @@ import java.util.EnumSet;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.function.Function;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.SimpleSeedProvider;
 
 public class InstanceConfig implements IInstanceConfig
 {
@@ -54,28 +55,6 @@ public class InstanceConfig implements IInstanceConfig
 
     private volatile InetAddressAndPort broadcastAddressAndPort;
 
-    @Override
-    public InetAddressAndPort broadcastAddressAndPort()
-    {
-        if (broadcastAddressAndPort == null)
-        {
-            broadcastAddressAndPort = getAddressAndPortFromConfig("broadcast_address", "storage_port");
-        }
-        return broadcastAddressAndPort;
-    }
-
-    private InetAddressAndPort getAddressAndPortFromConfig(String addressProp, String portProp)
-    {
-        try
-        {
-            return InetAddressAndPort.getByNameOverrideDefaults(getString(addressProp), getInt(portProp));
-        }
-        catch (UnknownHostException e)
-        {
-            throw new IllegalStateException(e);
-        }
-    }
-
     private InstanceConfig(int num,
                            NetworkTopology networkTopology,
                            String broadcast_address,
@@ -138,6 +117,44 @@ public class InstanceConfig implements IInstanceConfig
         this.broadcastAddressAndPort = copy.broadcastAddressAndPort;
     }
 
+
+    @Override
+    public InetSocketAddress broadcastAddress()
+    {
+        return DistributedTestSnitch.fromCassandraInetAddressAndPort(getBroadcastAddressAndPort());
+    }
+
+    protected InetAddressAndPort getBroadcastAddressAndPort()
+    {
+        if (broadcastAddressAndPort == null)
+        {
+            broadcastAddressAndPort = getAddressAndPortFromConfig("broadcast_address", "storage_port");
+        }
+        return broadcastAddressAndPort;
+    }
+
+    private InetAddressAndPort getAddressAndPortFromConfig(String addressProp, String portProp)
+    {
+        try
+        {
+            return InetAddressAndPort.getByNameOverrideDefaults(getString(addressProp), getInt(portProp));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public String localRack()
+    {
+        return networkTopology().localRack(broadcastAddress());
+    }
+
+    public String localDatacenter()
+    {
+        return networkTopology().localDC(broadcastAddress());
+    }
+
     public InstanceConfig with(Feature featureFlag)
     {
         featureFlags.add(featureFlag);
@@ -161,8 +178,6 @@ public class InstanceConfig implements IInstanceConfig
         if (value == null)
             value = NULL;
 
-        // test value
-        propagate(new Config(), fieldName, value, false);
         params.put(fieldName, value);
         return this;
     }
@@ -177,16 +192,10 @@ public class InstanceConfig implements IInstanceConfig
         return this;
     }
 
-    public void propagateIfSet(Object writeToConfig, String fieldName)
-    {
-        if (params.containsKey(fieldName))
-            propagate(writeToConfig, fieldName, params.get(fieldName), true);
-    }
-
-    public void propagate(Object writeToConfig)
+    public void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>> mapping)
     {
         for (Map.Entry<String, Object> e : params.entrySet())
-            propagate(writeToConfig, e.getKey(), e.getValue(), true);
+            propagate(writeToConfig, e.getKey(), e.getValue(), mapping);
     }
 
     public void validate()
@@ -195,11 +204,14 @@ public class InstanceConfig implements IInstanceConfig
             throw new IllegalArgumentException("In-JVM dtests do not support vnodes as of now.");
     }
 
-    private void propagate(Object writeToConfig, String fieldName, Object value, boolean ignoreMissing)
+    private void propagate(Object writeToConfig, String fieldName, Object value, Map<Class<?>, Function<Object, Object>> mapping)
     {
         if (value == NULL)
             value = null;
 
+        if (mapping != null && mapping.containsKey(value.getClass()))
+            value = mapping.get(value.getClass()).apply(value);
+
         Class<?> configClass = writeToConfig.getClass();
         Field valueField;
         try
@@ -208,9 +220,7 @@ public class InstanceConfig implements IInstanceConfig
         }
         catch (NoSuchFieldException e)
         {
-            if (!ignoreMissing)
-                throw new IllegalStateException(e);
-            return;
+            throw new IllegalStateException(e);
         }
 
         if (valueField.getType().isEnum() && value instanceof String)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
new file mode 100644
index 0000000..e7ca49b
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+public class InstanceKiller extends JVMStabilityInspector.Killer
+{
+    private static final AtomicLong KILL_ATTEMPTS = new AtomicLong(0);
+
+    public static long getKillAttempts()
+    {
+        return KILL_ATTEMPTS.get();
+    }
+
+    public static void clear()
+    {
+        KILL_ATTEMPTS.set(0);
+    }
+
+    @Override
+    protected void killCurrentJVM(Throwable t, boolean quiet)
+    {
+        KILL_ATTEMPTS.incrementAndGet();
+        // the bad part is that System.exit kills the JVM, so all code which calls kill won't hit the
+        // next line; yet in in-JVM dtests System.exit is not desirable, so need to rely on a runtime exception
+        // as a means to try to stop execution
+        throw new InstanceShutdown();
+    }
+
+    public static final class InstanceShutdown extends RuntimeException { }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index 248155a..fc31fdf 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -39,6 +39,7 @@ import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.slf4j.LoggerFactory;
 
@@ -101,6 +102,9 @@ public class IsolatedExecutor implements IIsolatedExecutor
         });
     }
 
+    public <O> Supplier<Future<O>> supplyAsync(SerializableSupplier<O> call) { return () -> isolatedExecutor.submit(call::get); }
+    public <O> Supplier<O> supplySync(SerializableSupplier<O> call) { return () -> waitOn(supplyAsync(call).get()); }
+
     public <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call) { return () -> isolatedExecutor.submit(call); }
     public <O> CallableNoExcept<O> sync(CallableNoExcept<O> call) { return () -> waitOn(async(call).call()); }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
index 27ae156..9d9beea 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
@@ -24,7 +24,6 @@ import java.util.concurrent.locks.LockSupport;
 import java.util.function.Supplier;
 
 import org.apache.cassandra.distributed.api.IListen;
-import org.apache.cassandra.gms.Gossiper;
 
 public class Listen implements IListen
 {
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
deleted file mode 100644
index c92553f..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.cassandra.distributed.api.IMessage;
-import org.apache.cassandra.distributed.api.IMessageFilters;
-
-public class MessageFilters implements IMessageFilters
-{
-    private final List<Filter> filters = new CopyOnWriteArrayList<>();
-
-    public boolean permit(int from, int to, IMessage msg)
-    {
-        for (Filter filter : filters)
-        {
-            if (filter.matches(from, to, msg))
-                return false;
-        }
-        return true;
-    }
-
-    public class Filter implements IMessageFilters.Filter
-    {
-        final int[] from;
-        final int[] to;
-        final int[] verbs;
-        final Matcher matcher;
-
-        Filter(int[] from, int[] to, int[] verbs, Matcher matcher)
-        {
-            if (from != null)
-            {
-                from = from.clone();
-                Arrays.sort(from);
-            }
-            if (to != null)
-            {
-                to = to.clone();
-                Arrays.sort(to);
-            }
-            if (verbs != null)
-            {
-                verbs = verbs.clone();
-                Arrays.sort(verbs);
-            }
-            this.from = from;
-            this.to = to;
-            this.verbs = verbs;
-            this.matcher = matcher;
-        }
-
-        public int hashCode()
-        {
-            return (from == null ? 0 : Arrays.hashCode(from))
-                   + (to == null ? 0 : Arrays.hashCode(to))
-                   + (verbs == null ? 0 : Arrays.hashCode(verbs));
-        }
-
-        public boolean equals(Object that)
-        {
-            return that instanceof Filter && equals((Filter) that);
-        }
-
-        public boolean equals(Filter that)
-        {
-            return Arrays.equals(from, that.from)
-                   && Arrays.equals(to, that.to)
-                   && Arrays.equals(verbs, that.verbs);
-        }
-
-        public Filter off()
-        {
-            filters.remove(this);
-            return this;
-        }
-
-        public Filter on()
-        {
-            filters.add(this);
-            return this;
-        }
-
-        public boolean matches(int from, int to, IMessage msg)
-        {
-            return (this.from == null || Arrays.binarySearch(this.from, from) >= 0)
-                   && (this.to == null || Arrays.binarySearch(this.to, to) >= 0)
-                   && (this.verbs == null || Arrays.binarySearch(this.verbs, msg.verb()) >= 0)
-                   && (this.matcher == null || this.matcher.matches(from, to, msg));
-        }
-    }
-
-    public class Builder implements IMessageFilters.Builder
-    {
-        int[] from;
-        int[] to;
-        int[] verbs;
-        Matcher matcher;
-
-        private Builder(int[] verbs)
-        {
-            this.verbs = verbs;
-        }
-
-        public Builder from(int... nums)
-        {
-            from = nums;
-            return this;
-        }
-
-        public Builder to(int... nums)
-        {
-            to = nums;
-            return this;
-        }
-
-        public IMessageFilters.Builder messagesMatching(Matcher matcher)
-        {
-            this.matcher = matcher;
-            return this;
-        }
-
-        public Filter drop()
-        {
-            return new Filter(from, to, verbs, matcher).on();
-        }
-    }
-
-
-    public Builder verbs(int... verbs)
-    {
-        return new Builder(verbs);
-    }
-
-    @Override
-    public Builder allVerbs()
-    {
-        return new Builder(null);
-    }
-
-    @Override
-    public void reset()
-    {
-        filters.clear();
-    }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Message.java b/test/distributed/org/apache/cassandra/distributed/impl/MessageImpl.java
similarity index 74%
rename from test/distributed/org/apache/cassandra/distributed/impl/Message.java
rename to test/distributed/org/apache/cassandra/distributed/impl/MessageImpl.java
index 6f8085c..ebc31b1 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Message.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageImpl.java
@@ -18,19 +18,21 @@
 
 package org.apache.cassandra.distributed.impl;
 
+import java.net.InetSocketAddress;
+
 import org.apache.cassandra.distributed.api.IMessage;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 
 // a container for simplifying the method signature for per-instance message handling/delivery
-public class Message implements IMessage
+public class MessageImpl implements IMessage
 {
-    private final int verb;
-    private final byte[] bytes;
-    private final int id;
-    private final int version;
-    private final InetAddressAndPort from;
+    public final int verb;
+    public final byte[] bytes;
+    public final long id;
+    public final int version;
+    public final InetSocketAddress from;
 
-    public Message(int verb, byte[] bytes, int id, int version, InetAddressAndPort from)
+    public MessageImpl(int verb, byte[] bytes, long id, int version, InetSocketAddress from)
     {
         this.verb = verb;
         this.bytes = bytes;
@@ -39,32 +41,27 @@ public class Message implements IMessage
         this.from = from;
     }
 
-    @Override
     public int verb()
     {
         return verb;
     }
 
-    @Override
     public byte[] bytes()
     {
         return bytes;
     }
 
-    @Override
     public int id()
     {
-        return id;
+        return (int) id;
     }
 
-    @Override
     public int version()
     {
         return version;
     }
 
-    @Override
-    public InetAddressAndPort from()
+    public InetSocketAddress from()
     {
         return from;
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java b/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
deleted file mode 100644
index f7c31ff..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.IntFunction;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.Pair;
-
-public class NetworkTopology
-{
-    private final Map<InetAddressAndPort, DcAndRack> map;
-
-    public static class DcAndRack
-    {
-        private final String dc;
-        private final String rack;
-
-        private DcAndRack(String dc, String rack)
-        {
-            this.dc = dc;
-            this.rack = rack;
-        }
-    }
-
-    public static DcAndRack dcAndRack(String dc, String rack)
-    {
-        return new DcAndRack(dc, rack);
-    }
-
-    private NetworkTopology() {
-        map = new HashMap<>();
-    }
-
-    @SuppressWarnings("WeakerAccess")
-    public NetworkTopology(NetworkTopology networkTopology)
-    {
-        map = new HashMap<>(networkTopology.map);
-    }
-
-    public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology)
-    {
-        final NetworkTopology topology = new NetworkTopology();
-
-        for (int nodeId = 1; nodeId <= nodeIdTopology.size(); nodeId++)
-        {
-            String broadcastAddress = ipPrefix + nodeId;
-
-            try
-            {
-                DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
-                if (dcAndRack == null)
-                    throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap");
-
-                InetAddressAndPort broadcastAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(
-                    InetAddress.getByName(broadcastAddress), broadcastPort);
-                topology.put(broadcastAddressAndPort, dcAndRack);
-            }
-            catch (UnknownHostException e)
-            {
-                throw new ConfigurationException("Unknown broadcast_address '" + broadcastAddress + '\'', false);
-            }
-        }
-        return topology;
-    }
-
-    public DcAndRack put(InetAddressAndPort key, DcAndRack value)
-    {
-        return map.put(key, value);
-    }
-
-    public String localRack(InetAddressAndPort key)
-    {
-        DcAndRack p  = map.get(key);
-        if (p == null)
-            return null;
-        return p.rack;
-    }
-
-    public String localDC(InetAddressAndPort key)
-    {
-        DcAndRack p = map.get(key);
-        if (p == null)
-            return null;
-        return p.dc;
-    }
-
-    public boolean contains(InetAddressAndPort key)
-    {
-        return map.containsKey(key);
-    }
-
-    public String toString()
-    {
-        return "NetworkTopology{" + map + '}';
-    }
-
-
-    public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount,
-                                                                                  String dc,
-                                                                                  String rack)
-    {
-        return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack));
-    }
-
-    public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount,
-                                                                          IntFunction<DcAndRack> dcAndRackSupplier)
-    {
-
-        return IntStream.rangeClosed(1, nodeCount).boxed()
-                        .collect(Collectors.toMap(nodeId -> nodeId,
-                                                  dcAndRackSupplier::apply));
-    }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
index ded3708..2da3676 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
@@ -51,6 +51,11 @@ public class RowUtil
         return result;
     }
 
+    public static Iterator<Object[]> toObjects(UntypedResultSet rs)
+    {
+        return toObjects(rs.metadata(), rs.iterator());
+    }
+
     public static Iterator<Object[]> toObjects(List<ColumnSpecification> columnSpecs, Iterator<UntypedResultSet.Row> rs)
     {
         return Iterators.transform(rs,
@@ -60,8 +65,10 @@ public class RowUtil
                                        {
                                            ColumnSpecification columnSpec = columnSpecs.get(i);
                                            ByteBuffer bb = row.getBytes(columnSpec.name.toString());
+
                                            if (bb != null)
                                                objectRow[i] = columnSpec.type.getSerializer().deserialize(bb);
+
                                        }
                                        return objectRow;
                                    });
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/TracingUtil.java b/test/distributed/org/apache/cassandra/distributed/impl/TracingUtil.java
index d671e55..439d4b7 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/TracingUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/TracingUtil.java
@@ -24,7 +24,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 
 
 /**
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Versions.java b/test/distributed/org/apache/cassandra/distributed/impl/Versions.java
deleted file mode 100644
index d5d6e7d..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/Versions.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class Versions
-{
-    private static final Logger logger = LoggerFactory.getLogger(Versions.class);
-    public static Version CURRENT = new Version(FBUtilities.getReleaseVersionString(), ((URLClassLoader)Versions.class.getClassLoader()).getURLs());
-
-    public enum Major
-    {
-        v22("2\\.2\\.([0-9]+)"),
-        v30("3\\.0\\.([0-9]+)"),
-        v3X("3\\.([1-9]|1[01])(\\.([0-9]+))?"),
-        v4("4\\.([0-9]+)");
-        final Pattern pattern;
-        Major(String verify)
-        {
-            this.pattern = Pattern.compile(verify);
-        }
-
-        static Major fromFull(String version)
-        {
-            if (version.isEmpty())
-                throw new IllegalArgumentException(version);
-            switch (version.charAt(0))
-            {
-                case '2':
-                    if (version.startsWith("2.2"))
-                        return v22;
-                    throw new IllegalArgumentException(version);
-                case '3':
-                    if (version.startsWith("3.0"))
-                        return v30;
-                    return v3X;
-                case '4':
-                    return v4;
-                default:
-                    throw new IllegalArgumentException(version);
-            }
-        }
-
-        // verify that the version string is valid for this major version
-        boolean verify(String version)
-        {
-            return pattern.matcher(version).matches();
-        }
-
-        // sort two strings of the same major version as this enum instance
-        int compare(String a, String b)
-        {
-            Matcher ma = pattern.matcher(a);
-            Matcher mb = pattern.matcher(a);
-            if (!ma.matches()) throw new IllegalArgumentException(a);
-            if (!mb.matches()) throw new IllegalArgumentException(b);
-            int result = Integer.compare(Integer.parseInt(ma.group(1)), Integer.parseInt(mb.group(1)));
-            if (result == 0 && this == v3X && (ma.group(3) != null || mb.group(3) != null))
-            {
-                if (ma.group(3) != null && mb.group(3) != null)
-                {
-                    result = Integer.compare(Integer.parseInt(ma.group(3)), Integer.parseInt(mb.group(3)));
-                }
-                else
-                {
-                    result = ma.group(3) != null ? 1 : -1;
-                }
-            }
-            // sort descending
-            return -result;
-        }
-    }
-
-    public static class Version
-    {
-        public final Major major;
-        public final String version;
-        public final URL[] classpath;
-
-        public Version(String version, URL[] classpath)
-        {
-            this(Major.fromFull(version), version, classpath);
-        }
-        public Version(Major major, String version, URL[] classpath)
-        {
-            this.major = major;
-            this.version = version;
-            this.classpath = classpath;
-        }
-    }
-
-    private final Map<Major, List<Version>> versions;
-    public Versions(Map<Major, List<Version>> versions)
-    {
-        this.versions = versions;
-    }
-
-    public Version get(String full)
-    {
-        return versions.get(Major.fromFull(full))
-                       .stream()
-                       .filter(v -> full.equals(v.version))
-                       .findFirst()
-                       .orElseThrow(() -> new RuntimeException("No version " + full + " found"));
-    }
-
-    public Version getLatest(Major major)
-    {
-        return versions.get(major).stream().findFirst().orElseThrow(() -> new RuntimeException("No " + major + " versions found"));
-    }
-
-    public static Versions find()
-    {
-        final String dtestJarDirectory = System.getProperty(Config.PROPERTY_PREFIX + "test.dtest_jar_path","build");
-        final File sourceDirectory = new File(dtestJarDirectory);
-        logger.info("Looking for dtest jars in " + sourceDirectory.getAbsolutePath());
-        final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar");
-        final Map<Major, List<Version>> versions = new HashMap<>();
-        for (Major major : Major.values())
-            versions.put(major, new ArrayList<>());
-
-        for (File file : sourceDirectory.listFiles())
-        {
-            Matcher m = pattern.matcher(file.getName());
-            if (!m.matches())
-                continue;
-            String version = m.group("fullversion");
-            Major major = Major.fromFull(version);
-            versions.get(major).add(new Version(major, version, new URL[] { toURL(file) }));
-        }
-
-        for (Map.Entry<Major, List<Version>> e : versions.entrySet())
-        {
-            if (e.getValue().isEmpty())
-                continue;
-            Collections.sort(e.getValue(), Comparator.comparing(v -> v.version, e.getKey()::compare));
-            logger.info("Found " + e.getValue().stream().map(v -> v.version).collect(Collectors.joining(", ")));
-        }
-
-        return new Versions(versions);
-    }
-
-    public static URL toURL(File file)
-    {
-        try
-        {
-            return file.toURI().toURL();
-        }
-        catch (MalformedURLException e)
-        {
-            throw new IllegalArgumentException(e);
-        }
-    }
-
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
index ca1d53a..625b4aa 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
@@ -23,6 +23,8 @@ import java.lang.management.ManagementFactory;
 import java.util.Iterator;
 import java.util.Map;
 
+import javax.management.ListenerNotFoundException;
+
 import com.google.common.collect.Multimap;
 
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -41,14 +43,19 @@ import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.service.GCInspector;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.tools.NodeProbe;
+import org.mockito.Mockito;
 
 public class InternalNodeProbe extends NodeProbe
 {
-    public InternalNodeProbe() throws IOException
+    private final boolean withNotifications;
+
+    public InternalNodeProbe(boolean withNotifications)
     {
-        super("", 0);
+        this.withNotifications = withNotifications;
+        connect();
     }
 
     protected void connect()
@@ -57,6 +64,28 @@ public class InternalNodeProbe extends NodeProbe
         mbeanServerConn = null;
         jmxc = null;
 
+
+        if (withNotifications)
+        {
+            ssProxy = StorageService.instance;
+        }
+        else
+        {
+            // replace the notification apis with a no-op method
+            StorageServiceMBean mock = Mockito.spy(StorageService.instance);
+            Mockito.doNothing().when(mock).addNotificationListener(Mockito.any(), Mockito.any(), Mockito.any());
+            try
+            {
+                Mockito.doNothing().when(mock).removeNotificationListener(Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.doNothing().when(mock).removeNotificationListener(Mockito.any());
+            }
+            catch (ListenerNotFoundException e)
+            {
+                throw new AssertionError(e);
+            }
+            ssProxy = mock;
+        }
+
         ssProxy = StorageService.instance;
         msProxy = MessagingService.instance();
         streamProxy = StreamManager.instance;
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
index f7c9dcf..1904aa7 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
@@ -25,11 +25,18 @@ import org.apache.cassandra.tools.INodeProbeFactory;
 
 public class InternalNodeProbeFactory implements INodeProbeFactory
 {
+    private final boolean withNotifications;
+
+    public InternalNodeProbeFactory(boolean withNotifications)
+    {
+        this.withNotifications = withNotifications;
+    }
+
     public NodeProbe create(String host, int port) throws IOException {
-        return new InternalNodeProbe();
+        return new InternalNodeProbe(withNotifications);
     }
 
     public NodeProbe create(String host, int port, String username, String password) throws IOException {
-        return new InternalNodeProbe();
+        return new InternalNodeProbe(withNotifications);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
index 0af26ea..11c2f9d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
@@ -25,17 +25,21 @@ import java.util.stream.IntStream;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.distributed.impl.IInvokableInstance;
-import org.apache.cassandra.distributed.impl.InstanceConfig;
-import org.apache.cassandra.distributed.impl.NetworkTopology;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.DistributedTestBase.KEYSPACE;
 
-public class BootstrapTest extends DistributedTestBase
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class BootstrapTest extends TestBaseImpl
 {
 
     @Test
@@ -43,24 +47,22 @@ public class BootstrapTest extends DistributedTestBase
     {
         int originalNodeCount = 2;
         int expandedNodeCount = originalNodeCount + 1;
-        Cluster.Builder<IInvokableInstance, Cluster> builder = Cluster.build(originalNodeCount)
-                                                                      .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount))
-                                                                      .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
-                                                                      .withConfig(config -> config.with(NETWORK, GOSSIP));
+        Builder<IInstance, ICluster> builder = builder().withNodes(originalNodeCount)
+                                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
+                                                        .withConfig(config -> config.with(NETWORK, GOSSIP));
 
         Map<Integer, Long> withBootstrap = null;
         Map<Integer, Long> naturally = null;
-
-        try (Cluster cluster = builder.start())
+        try (ICluster<IInvokableInstance> cluster = builder.withNodes(originalNodeCount).start())
         {
             populate(cluster);
 
-            InstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
-                                           .newInstanceConfig(cluster);
+            IInstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+                                            .newInstanceConfig(cluster);
             config.set("auto_bootstrap", true);
 
-            IInstance newInstance = cluster.bootstrap(config);
-            newInstance.startup();
+            cluster.bootstrap(config).startup();
 
             cluster.stream().forEach(instance -> {
                 instance.nodetool("cleanup", KEYSPACE, "tbl");
@@ -69,20 +71,21 @@ public class BootstrapTest extends DistributedTestBase
             withBootstrap = count(cluster);
         }
 
-        builder = Cluster.build(expandedNodeCount)
-                         .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount))
+        builder = builder.withNodes(expandedNodeCount)
+                         .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
                          .withConfig(config -> config.with(NETWORK, GOSSIP));
 
-        try (Cluster cluster = builder.start())
+        try (ICluster cluster = builder.start())
         {
             populate(cluster);
             naturally = count(cluster);
         }
 
-        Assert.assertEquals(withBootstrap, naturally);
+        for (Map.Entry<Integer, Long> e : withBootstrap.entrySet())
+            Assert.assertTrue(e.getValue() >= naturally.get(e.getKey()));
     }
 
-    public void populate(Cluster cluster)
+    public void populate(ICluster cluster)
     {
         cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};");
         cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
@@ -93,12 +96,11 @@ public class BootstrapTest extends DistributedTestBase
                                            i, i, i);
     }
 
-    public Map<Integer, Long> count(Cluster cluster)
+    public Map<Integer, Long> count(ICluster cluster)
     {
         return IntStream.rangeClosed(1, cluster.size())
                         .boxed()
                         .collect(Collectors.toMap(nodeId -> nodeId,
                                                   nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
     }
-
-}
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
deleted file mode 100644
index 8cd731d..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import org.junit.Test;
-
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.distributed.Cluster;
-
-public class DistributedReadWritePathTest extends DistributedTestBase
-{
-    @Test
-    public void coordinatorReadTest() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(3)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
-                                                     ConsistencyLevel.ALL,
-                                                     1),
-                       row(1, 1, 1),
-                       row(1, 2, 2),
-                       row(1, 3, 3));
-        }
-    }
-
-    @Test
-    public void coordinatorWriteTest() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(3)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
-                                          ConsistencyLevel.QUORUM);
-
-            for (int i = 0; i < 3; i++)
-            {
-                assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
-                           row(1, 1, 1));
-            }
-
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
-                                                     ConsistencyLevel.QUORUM),
-                       row(1, 1, 1));
-        }
-    }
-
-    @Test
-    public void readRepairTest() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(3)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-
-            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
-
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
-                                                     ConsistencyLevel.ALL), // ensure node3 in preflist
-                       row(1, 1, 1));
-
-            // Verify that data got repaired to the third node
-            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
-                       row(1, 1, 1));
-        }
-    }
-
-    @Test
-    public void simplePagedReadsTest() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(3)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            int size = 100;
-            Object[][] results = new Object[size][];
-            for (int i = 0; i < size; i++)
-            {
-                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                               ConsistencyLevel.QUORUM,
-                                               i, i);
-                results[i] = new Object[] { 1, i, i};
-            }
-
-            // First, make sure that non-paged reads are able to fetch the results
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.QUORUM),
-                       results);
-
-            // Make sure paged read returns same results with different page sizes
-            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
-            {
-                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
-                                                                    ConsistencyLevel.QUORUM,
-                                                                    pageSize),
-                           results);
-            }
-        }
-    }
-
-    @Test
-    public void pagingWithRepairTest() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(3)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            int size = 10;
-            Object[][] results = new Object[size][];
-            for (int i = 0; i < size; i++)
-            {
-                // Make sure that data lands on different nodes and not coordinator
-                cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                                i, i);
-
-                results[i] = new Object[] { 1, i, i};
-            }
-
-            // Make sure paged read returns same results with different page sizes
-            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
-            {
-                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
-                                                                    ConsistencyLevel.ALL,
-                                                                    pageSize),
-                           results);
-            }
-
-            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
-                       results);
-        }
-    }
-
-    @Test
-    public void pagingTests() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(3));
-             Cluster singleNode = init(Cluster.create(1)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-            singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            for (int i = 0; i < 10; i++)
-            {
-                for (int j = 0; j < 10; j++)
-                {
-                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                   ConsistencyLevel.QUORUM,
-                                                   i, j, i + i);
-                    singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                      ConsistencyLevel.QUORUM,
-                                                      i, j, i + i);
-                }
-            }
-
-            int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
-            String[] statements = new String [] {"SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
-                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl LIMIT 3",
-                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10)",
-                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
-            };
-            for (String statement : statements)
-            {
-                for (int pageSize : pageSizes)
-                {
-                    assertRows(cluster.coordinator(1)
-                                      .executeWithPaging(statement,
-                                                         ConsistencyLevel.QUORUM,  pageSize),
-                               singleNode.coordinator(1)
-                                         .executeWithPaging(statement,
-                                                            ConsistencyLevel.QUORUM,  Integer.MAX_VALUE));
-                }
-            }
-
-        }
-    }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
deleted file mode 100644
index 7a3d52d..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import com.google.common.collect.Iterators;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-import com.datastax.driver.core.ResultSet;
-import org.apache.cassandra.distributed.impl.AbstractCluster;
-import org.apache.cassandra.distributed.impl.IsolatedExecutor;
-import org.apache.cassandra.distributed.impl.RowUtil;
-
-public class DistributedTestBase
-{
-    @After
-    public void afterEach()
-    {
-        System.runFinalization();
-        System.gc();
-    }
-
-    public static String KEYSPACE = "distributed_test_keyspace";
-
-    public static void nativeLibraryWorkaround()
-    {
-        // Disable the C library for in-JVM dtests otherwise it holds a gcroot against the InstanceClassLoader
-        System.setProperty("cassandra.disable_clibrary", "true");
-
-        // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
-        // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
-        // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
-        System.setProperty("cassandra.disable_tcactive_openssl", "true");
-        System.setProperty("io.netty.transport.noNative", "true");
-    }
-
-    public static void processReaperWorkaround()
-    {
-        // Make sure the 'process reaper' thread is initially created under the main classloader,
-        // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
-        // which prevents it from being garbage collected.
-        IsolatedExecutor.ThrowingRunnable.toRunnable(() -> new ProcessBuilder().command("true").start().waitFor()).run();
-    }
-
-    @BeforeClass
-    public static void setup()
-    {
-        System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000));
-        System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
-        nativeLibraryWorkaround();
-        processReaperWorkaround();
-    }
-
-    static String withKeyspace(String replaceIn)
-    {
-        return String.format(replaceIn, KEYSPACE);
-    }
-
-    protected static <C extends AbstractCluster<?>> C init(C cluster)
-    {
-        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
-        return cluster;
-    }
-
-    public static void assertRows(ResultSet actual,Object[]... expected)
-    {
-        assertRows(RowUtil.toObjects(actual), expected);
-    }
-
-    public static void assertRows(Object[][] actual, Object[]... expected)
-    {
-        Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
-                            expected.length, actual.length);
-
-        for (int i = 0; i < expected.length; i++)
-        {
-            Object[] expectedRow = expected[i];
-            Object[] actualRow = actual[i];
-            Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected),
-                              Arrays.equals(expectedRow, actualRow));
-        }
-    }
-
-    public static void assertRow(Object[] actual, Object... expected)
-    {
-        Assert.assertTrue(rowNotEqualErrorMessage(actual, expected),
-                          Arrays.equals(actual, expected));
-    }
-
-    public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected)
-    {
-        while (actual.hasNext() && expected.hasNext())
-            assertRow(actual.next(), expected.next());
-
-        Assert.assertEquals("Resultsets have different sizes", actual.hasNext(), expected.hasNext());
-    }
-
-    public static void assertRows(Iterator<Object[]> actual, Object[]... expected)
-    {
-        assertRows(actual, Iterators.forArray(expected));
-    }
-
-    public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected)
-    {
-        return String.format("Expected: %s\nActual:%s\n",
-                             Arrays.toString(expected),
-                             Arrays.toString(actual));
-    }
-
-    public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
-    {
-        return String.format("Expected: %s\nActual: %s\n",
-                             rowsToString(expected),
-                             rowsToString(actual));
-    }
-
-    public static String rowsToString(Object[][] rows)
-    {
-        StringBuilder builder = new StringBuilder();
-        builder.append("[");
-        boolean isFirst = true;
-        for (Object[] row : rows)
-        {
-            if (isFirst)
-                isFirst = false;
-            else
-                builder.append(",");
-            builder.append(Arrays.toString(row));
-        }
-        builder.append("]");
-        return builder.toString();
-    }
-
-    public static Object[][] toObjectArray(Iterator<Object[]> iter)
-    {
-        List<Object[]> res = new ArrayList<>();
-        while (iter.hasNext())
-            res.add(iter.next());
-
-        return res.toArray(new Object[res.size()][]);
-    }
-
-    public static Object[] row(Object... expected)
-    {
-        return expected;
-    }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
index 5f1263a..509fe6f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
@@ -18,23 +18,25 @@
 
 package org.apache.cassandra.distributed.test;
 
-import java.io.IOException;
-
 import org.junit.Test;
 
-import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICluster;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 
-public class GossipSettlesTest extends DistributedTestBase
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class GossipSettlesTest extends TestBaseImpl
 {
 
     @Test
-    public void testGossipSettles() throws IOException
+    public void testGossipSettles() throws Throwable
     {
-        // Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails
-        try (Cluster cluster = Cluster.build(3).withConfig(config -> config.with(GOSSIP).with(NETWORK)).withSubnet(1).start())
+        /* Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails */
+        try (ICluster cluster = builder().withNodes(3)
+                                         .withConfig(config -> config.with(GOSSIP).with(NETWORK))
+                                         .withSubnet(1)
+                                         .start())
         {
         }
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 96974d8..062f401 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@ -18,29 +18,46 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.net.InetSocketAddress;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.distributed.impl.Instance;
-import org.apache.cassandra.distributed.impl.MessageFilters;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.distributed.shared.MessageFilters;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
-public class MessageFiltersTest extends DistributedTestBase
+public class MessageFiltersTest extends TestBaseImpl
 {
+
+    @Test
+    public void simpleInboundFiltersTest()
+    {
+        simpleFiltersTest(true);
+    }
+
     @Test
-    public void simpleFiltersTest() throws Throwable
+    public void simpleOutboundFiltersTest()
+    {
+        simpleFiltersTest(false);
+    }
+
+    private interface Permit
+    {
+        boolean test(int from, int to, IMessage msg);
+    }
+
+    private static void simpleFiltersTest(boolean inbound)
     {
         int VERB1 = MessagingService.Verb.READ.ordinal();
         int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal();
@@ -52,61 +69,62 @@ public class MessageFiltersTest extends DistributedTestBase
         String MSG2 = "msg2";
 
         MessageFilters filters = new MessageFilters();
-        MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+        Permit permit = inbound ? filters::permitInbound : filters::permitOutbound;
 
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+        IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop();
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1)));
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
         filter.off();
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
         filters.reset();
 
-        filters.verbs(VERB1).from(1).to(2).drop();
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+        filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop();
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1)));
 
         filters.reset();
         AtomicInteger counter = new AtomicInteger();
-        filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> {
+        filters.verbs(VERB1).inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
             counter.incrementAndGet();
             return Arrays.equals(msg.bytes(), MSG1.getBytes());
         }).drop();
-        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
         Assert.assertEquals(counter.get(), 1);
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG2)));
         Assert.assertEquals(counter.get(), 2);
 
         // filter chain gets interrupted because a higher level filter returns no match
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
         Assert.assertEquals(counter.get(), 2);
-        Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+        Assert.assertTrue(permit.test(i2, i1, msg(VERB2, MSG1)));
         Assert.assertEquals(counter.get(), 2);
         filters.reset();
 
-        filters.allVerbs().from(3, 2).to(2, 1).drop();
-        Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
-        Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+        filters.allVerbs().inbound(inbound).from(3, 2).to(2, 1).drop();
+        Assert.assertFalse(permit.test(i3, i1, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i3, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(permit.test(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i2, i3, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
         filters.reset();
 
         counter.set(0);
-        filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+        filters.allVerbs().inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
             counter.incrementAndGet();
             return false;
         }).drop();
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
-        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
+        Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
         Assert.assertEquals(2, counter.get());
     }
 
-    IMessage msg(int verb, String msg)
+    private static IMessage msg(int verb, String msg)
     {
         return new IMessage()
         {
@@ -114,7 +132,7 @@ public class MessageFiltersTest extends DistributedTestBase
             public byte[] bytes() { return msg.getBytes(); }
             public int id() { return 0; }
             public int version() { return 0;  }
-            public InetAddressAndPort from() { return null; }
+            public InetSocketAddress from() { return null; }
         };
     }
 
@@ -161,8 +179,8 @@ public class MessageFiltersTest extends DistributedTestBase
 
             AtomicInteger counter = new AtomicInteger();
 
-            Set<Integer> verbs = new HashSet<>(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
-                                                             MessagingService.Verb.MUTATION.ordinal()));
+            Set<Integer> verbs = Sets.newHashSet(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
+                                                               MessagingService.Verb.MUTATION.ordinal()));
 
             // Reads and writes are going to time out in both directions
             IMessageFilters.Filter filter = cluster.filters()
diff --git a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
index 72928e4..61ccb5f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
@@ -31,13 +31,13 @@ import java.util.stream.Stream;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.impl.IsolatedExecutor;
 import org.apache.cassandra.distributed.impl.TracingUtil;
 import org.apache.cassandra.utils.UUIDGen;
 
-public class MessageForwardingTest extends DistributedTestBase
+public class MessageForwardingTest extends TestBaseImpl
 {
     @Test
     public void mutationsForwardedToAllReplicasTest()
@@ -53,12 +53,12 @@ public class MessageForwardingTest extends DistributedTestBase
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
 
-            cluster.forEach(instance -> commitCounts.put(instance.broadcastAddressAndPort().address, 0));
+            cluster.forEach(instance -> commitCounts.put(instance.broadcastAddress().getAddress(), 0));
             final UUID sessionId = UUIDGen.getTimeUUID();
             Stream<Future<Object[][]>> inserts = IntStream.range(0, numInserts).mapToObj((idx) ->
                 cluster.coordinator(1).asyncExecuteWithTracing(sessionId,
                                                          "INSERT INTO " + KEYSPACE + ".tbl(pk,ck,v) VALUES (1, 1, 'x')",
-                                                         ConsistencyLevel.ALL)
+                                                               ConsistencyLevel.ALL)
             );
 
             // Wait for each of the futures to complete before checking the traces, don't care
@@ -66,7 +66,7 @@ public class MessageForwardingTest extends DistributedTestBase
             //noinspection ResultOfMethodCallIgnored
             inserts.map(IsolatedExecutor::waitOn).count();
 
-            cluster.forEach(instance -> commitCounts.put(instance.broadcastAddressAndPort().address, 0));
+            cluster.forEach(instance -> commitCounts.put(instance.broadcastAddress().getAddress(), 0));
             List<TracingUtil.TraceEntry> traces = TracingUtil.getTrace(cluster, sessionId, ConsistencyLevel.ALL);
             traces.forEach(traceEntry -> {
                 if (traceEntry.activity.contains("Appending to commitlog"))
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
index 45d9840..0905b92 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@ -18,50 +18,53 @@
 
 package org.apache.cassandra.distributed.test;
 
+import org.apache.cassandra.distributed.impl.RowUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.Statement;
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.impl.RowUtil;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Iterator;
+import org.apache.cassandra.distributed.api.ICluster;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 
-public class NativeProtocolTest extends DistributedTestBase
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class NativeProtocolTest extends TestBaseImpl
 {
 
     @Test
     public void withClientRequests() throws Throwable
     {
-        try (Cluster ignored = init(Cluster.create(3,
-                                                   config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))))
+        try (ICluster ignored = init(builder().withNodes(3)
+                                              .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                              .start()))
         {
-            final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
-            Session session = cluster.connect();
-            session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));");
-            session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);");
-            Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
-            final ResultSet resultSet = session.execute(select);
-            assertRows(resultSet, row(1, 1, 1));
-            Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
-            session.close();
-            cluster.close();
+
+            try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+                 Session session = cluster.connect())
+            {
+                session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));");
+                session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);");
+                Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
+                final ResultSet resultSet = session.execute(select);
+                assertRows(RowUtil.toObjects(resultSet), row(1, 1, 1));
+                Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
+            }
         }
     }
 
     @Test
     public void withCounters() throws Throwable
     {
-        try (Cluster dtCluster = init(Cluster.create(3,
-                config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))))
+        try (ICluster ignored = init(builder().withNodes(3)
+                                              .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                              .start()))
         {
             final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
             Session session = cluster.connect();
@@ -69,7 +72,7 @@ public class NativeProtocolTest extends DistributedTestBase
             session.execute("UPDATE " + KEYSPACE + ".tbl set ck = ck + 10 where pk = 1;");
             Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
             final ResultSet resultSet = session.execute(select);
-            assertRows(resultSet, row(1, 10L));
+            assertRows(RowUtil.toObjects(resultSet), row(1, 10L));
             Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
             session.close();
             cluster.close();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
index a9c2cee..8230fd5 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -26,22 +26,23 @@ import java.util.stream.Stream;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.distributed.impl.NetworkTopology;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 
-public class NetworkTopologyTest extends DistributedTestBase
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class NetworkTopologyTest extends TestBaseImpl
 {
     @Test
     public void namedDcTest() throws Throwable
     {
-        try (Cluster cluster = Cluster.build()
-                                      .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0")))
-                                      .withRack("elsewhere", "firstrack", 1)
-                                      .withRack("elsewhere", "secondrack", 2)
-                                      .withDC("nearthere", 4)
-                                      .start())
+        try (ICluster<IInvokableInstance> cluster = builder()
+                                                    .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0")))
+                                                    .withRack("elsewhere", "firstrack", 1)
+                                                    .withRack("elsewhere", "secondrack", 2)
+                                                    .withDC("nearthere", 4)
+                                                    .start())
         {
             Assert.assertEquals(1, cluster.stream("somewhere").count());
             Assert.assertEquals(1, cluster.stream("elsewhere", "firstrack").count());
@@ -61,9 +62,8 @@ public class NetworkTopologyTest extends DistributedTestBase
     public void automaticNamedDcTest() throws Throwable
 
     {
-        try (Cluster cluster = Cluster.build()
-                                      .withRacks(2, 1, 3)
-                                      .start())
+        try (ICluster cluster = builder().withRacks(2, 1, 3)
+                                         .start())
         {
             Assert.assertEquals(6, cluster.stream().count());
             Assert.assertEquals(3, cluster.stream("datacenter1").count());
@@ -74,27 +74,25 @@ public class NetworkTopologyTest extends DistributedTestBase
     @Test(expected = IllegalStateException.class)
     public void noCountsAfterNamingDCsTest()
     {
-        Cluster.build()
-               .withDC("nameddc", 1)
-               .withDCs(1);
+        builder().withDC("nameddc", 1)
+                 .withDCs(1);
     }
 
     @Test(expected = IllegalStateException.class)
     public void mustProvideNodeCountBeforeWithDCsTest()
     {
-        Cluster.build()
-               .withDCs(1);
+        builder().withDCs(1);
     }
 
     @Test(expected = IllegalStateException.class)
     public void noEmptyNodeIdTopologyTest()
     {
-        Cluster.build().withNodeIdTopology(Collections.emptyMap());
+        builder().withNodeIdTopology(Collections.emptyMap());
     }
 
     @Test(expected = IllegalStateException.class)
     public void noHolesInNodeIdTopologyTest()
     {
-        Cluster.build().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
+        builder().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
     }
-}
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
index e209d1d..1d78152 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.distributed.Cluster;
 
 import static org.junit.Assert.assertEquals;
 
-public class NodeToolTest extends DistributedTestBase
+public class NodeToolTest extends TestBaseImpl
 {
     @Test
     public void test() throws Throwable
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 09f40e4..808b95c 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -26,7 +26,6 @@ import java.nio.file.Path;
 import java.sql.Date;
 import java.text.SimpleDateFormat;
 import java.time.Instant;
-import java.util.List;
 import java.util.function.Consumer;
 import javax.management.MBeanServer;
 
@@ -34,17 +33,17 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import com.sun.management.HotSpotDiagnosticMXBean;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SigarLibrary;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 
 /* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
  * All objects referencing the InstanceClassLoader need to be garbage collected or
@@ -59,7 +58,7 @@ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
  * but it shows that the file handles for Data/Index files are being leaked.
  */
 @Ignore
-public class ResourceLeakTest extends DistributedTestBase
+public class ResourceLeakTest extends TestBaseImpl
 {
     // Parameters to adjust while hunting for leaks
     final int numTestLoops = 1;            // Set this value high to crash on leaks, or low when tracking down an issue.
@@ -141,7 +140,7 @@ public class ResourceLeakTest extends DistributedTestBase
         }
     }
 
-    void doTest(int numClusterNodes, Consumer<InstanceConfig> updater) throws Throwable
+    void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws Throwable
     {
         for (int loop = 0; loop < numTestLoops; loop++)
         {
@@ -149,7 +148,7 @@ public class ResourceLeakTest extends DistributedTestBase
             try (Cluster cluster = Cluster.build(numClusterNodes).withConfig(updater).start())
             {
                 if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to settle on the seed node
-                    cluster.get(1).runOnInstance(() -> CassandraDaemon.waitForGossipToSettle());
+                    cluster.get(1).runOnInstance(CassandraDaemon::waitForGossipToSettle);
 
                 init(cluster);
                 String tableName = "tbl" + loop;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWritePathTest.java
new file mode 100644
index 0000000..8c9e8af
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWritePathTest.java
@@ -0,0 +1,225 @@
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class SimpleReadWritePathTest extends TestBaseImpl
+{
+    private static final TestBaseImpl impl = new TestBaseImpl();
+    private static ICluster cluster;
+
+    @BeforeClass
+    public static void before() throws IOException
+    {
+        cluster = init(impl.builder().withNodes(3).start());
+    }
+
+    @AfterClass
+    public static void after() throws Exception
+    {
+        cluster.close();
+    }
+
+    @After
+    public void afterEach()
+    {
+        cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+        init(cluster);
+    }
+
+    @Test
+    public void coordinatorReadTest() throws Throwable
+    {
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+
+        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                  ConsistencyLevel.ALL,
+                                                  1),
+                   row(1, 1, 1),
+                   row(1, 2, 2),
+                   row(1, 3, 3));
+    }
+
+    @Test
+    public void largeMessageTest() throws Throwable
+    {
+        int largeMessageThreshold = 1024 * 64;
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < largeMessageThreshold; i++)
+            builder.append('a');
+        String s = builder.toString();
+        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+                                       ConsistencyLevel.ALL,
+                                       s);
+        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                  ConsistencyLevel.ALL,
+                                                  1),
+                   row(1, 1, s));
+    }
+
+    @Test
+    public void coordinatorWriteTest() throws Throwable
+    {
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
+                                       ConsistencyLevel.QUORUM);
+
+        for (int i = 0; i < 3; i++)
+        {
+            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, 1));
+        }
+
+        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                  ConsistencyLevel.QUORUM),
+                   row(1, 1, 1));
+    }
+
+    @Test
+    public void readRepairTest() throws Throwable
+    {
+
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                  ConsistencyLevel.ALL), // ensure node3 in preflist
+                   row(1, 1, 1));
+
+        // Verify that data got repaired to the third node
+        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                   row(1, 1, 1));
+    }
+
+    @Test
+    public void simplePagedReadsTest() throws Throwable
+    {
+
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+        int size = 100;
+        Object[][] results = new Object[size][];
+        for (int i = 0; i < size; i++)
+        {
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                           ConsistencyLevel.QUORUM,
+                                           i, i);
+            results[i] = new Object[]{ 1, i, i };
+        }
+
+        // Make sure paged read returns same results with different page sizes
+        for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
+        {
+            assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+                                                                ConsistencyLevel.QUORUM,
+                                                                pageSize),
+                       results);
+        }
+    }
+
+    @Test
+    public void pagingWithRepairTest() throws Throwable
+    {
+
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+        int size = 100;
+        Object[][] results = new Object[size][];
+        for (int i = 0; i < size; i++)
+        {
+            // Make sure that data lands on different nodes and not coordinator
+            cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                            i, i);
+
+            results[i] = new Object[]{ 1, i, i };
+        }
+
+        // Make sure paged read returns same results with different page sizes
+        for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
+        {
+            assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+                                                                ConsistencyLevel.ALL,
+                                                                pageSize),
+                       results);
+        }
+
+        assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
+                   results);
+    }
+
+    @Test
+    public void pagingTests() throws Throwable
+    {
+        try (ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+            singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            for (int i = 0; i < 10; i++)
+            {
+                for (int j = 0; j < 10; j++)
+                {
+                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                   ConsistencyLevel.QUORUM,
+                                                   i, j, i + i);
+                    singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                      ConsistencyLevel.QUORUM,
+                                                      i, j, i + i);
+                }
+            }
+
+            int[] pageSizes = new int[]{ 1, 2, 3, 5, 10, 20, 50 };
+            String[] statements = new String[]{ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
+                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
+                                                "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3",
+                                                "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
+                                                "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
+            };
+            for (String statement : statements)
+            {
+                for (int pageSize : pageSizes)
+                {
+                    assertRows(cluster.coordinator(1)
+                                      .executeWithPaging(statement,
+                                                         ConsistencyLevel.QUORUM, pageSize),
+                               singleNode.coordinator(1)
+                                         .executeWithPaging(statement,
+                                                            ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
+                }
+            }
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
new file mode 100644
index 0000000..a89a352
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
+
+public class TestBaseImpl extends DistributedTestBase
+{
+    protected static final TestBaseImpl impl = new TestBaseImpl();
+
+    @After
+    public void afterEach() {
+        super.afterEach();
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        ICluster.setup();
+    }
+
+    @Override
+    public <I extends IInstance, C extends ICluster> Builder<I, C> builder() {
+        // This is definitely not the smartest solution, but given the complexity of the alternatives and low risk, we can just rely on the
+        // fact that this code is going to work accross _all_ versions.
+        return (Builder<I, C>) org.apache.cassandra.distributed.Cluster.build();
+    }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index 31f4b84..e69e38a 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@ -20,11 +20,10 @@ package org.apache.cassandra.distributed.upgrade;
 
 import org.junit.Test;
 
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.distributed.impl.Versions;
-import org.apache.cassandra.distributed.test.DistributedTestBase;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
 
-import static org.apache.cassandra.distributed.impl.Versions.find;
+import static org.apache.cassandra.distributed.shared.Versions.find;
 
 public class MixedModeReadRepairTest extends UpgradeTestBase
 {
@@ -35,17 +34,17 @@ public class MixedModeReadRepairTest extends UpgradeTestBase
         .nodes(2)
         .upgrade(Versions.Major.v22, Versions.Major.v30)
         .nodesToUpgrade(2)
-        .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
+        .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
         .runAfterClusterUpgrade((cluster) -> {
             // now node2 is 3.0 and node1 is 2.2
             // make sure 2.2 side does not get the mutation
-            cluster.get(2).executeInternal("DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
+            cluster.get(2).executeInternal("DELETE FROM " + KEYSPACE + ".tbl WHERE pk = ?",
                                                                           "something");
             // trigger a read repair
-            cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
+            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
                                            ConsistencyLevel.ALL,
                                            "something");
-            cluster.get(1).flush(DistributedTestBase.KEYSPACE);
+            cluster.get(1).flush(KEYSPACE);
             // upgrade node1 to 3.0
             cluster.get(1).shutdown().get();
             Versions allVersions = find();
@@ -53,7 +52,7 @@ public class MixedModeReadRepairTest extends UpgradeTestBase
             cluster.get(1).startup();
 
             // and make sure the sstables are readable
-            cluster.get(1).forceCompact(DistributedTestBase.KEYSPACE, "tbl");
+            cluster.get(1).forceCompact(KEYSPACE, "tbl");
         }).run();
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 9d2534a..93ae78e 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@ -20,9 +20,11 @@ package org.apache.cassandra.distributed.upgrade;
 
 import org.junit.Test;
 
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.distributed.impl.Versions;
-import org.apache.cassandra.distributed.test.DistributedTestBase;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import junit.framework.Assert;
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 
 public class UpgradeTest extends UpgradeTestBase
 {
@@ -31,22 +33,22 @@ public class UpgradeTest extends UpgradeTestBase
     public void upgradeTest() throws Throwable
     {
         new TestCase()
-            .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
-            .setup((cluster) -> {
-                cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-                cluster.get(1).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-                cluster.get(2).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
-                cluster.get(3).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-            })
-            .runAfterClusterUpgrade((cluster) -> {
-                DistributedTestBase.assertRows(cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
-                                                                              ConsistencyLevel.ALL,
-                                                                              1),
-                                               DistributedTestBase.row(1, 1, 1),
-                                               DistributedTestBase.row(1, 2, 2),
-                                               DistributedTestBase.row(1, 3, 3));
-            }).run();
+        .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
+        .setup((cluster) -> {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+        })
+        .runAfterClusterUpgrade((cluster) -> {
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                      ConsistencyLevel.ALL,
+                                                      1),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+        }).run();
     }
 
-}
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 27094d8..6ae1d7b 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -25,16 +25,24 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.impl.Instance;
-import org.apache.cassandra.distributed.impl.Versions;
-import org.apache.cassandra.distributed.impl.Versions.Version;
-import org.apache.cassandra.distributed.test.DistributedTestBase;
 
-import static org.apache.cassandra.distributed.impl.Versions.Major;
-import static org.apache.cassandra.distributed.impl.Versions.find;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import org.apache.cassandra.distributed.shared.Versions;
+import static org.apache.cassandra.distributed.shared.Versions.Version;
+import static org.apache.cassandra.distributed.shared.Versions.Major;
+import static org.apache.cassandra.distributed.shared.Versions.find;
 
 public class UpgradeTestBase extends DistributedTestBase
 {
+    public <I extends IInstance, C extends ICluster> Builder<I, C> builder()
+    {
+        return (Builder<I, C>) UpgradeableCluster.build();
+    }
+
     public static interface RunOnCluster
     {
         public void run(UpgradeableCluster cluster) throws Throwable;
@@ -164,4 +172,4 @@ public class UpgradeTestBase extends DistributedTestBase
         }
     }
 
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org