You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/10/07 11:25:48 UTC

[cassandra] 06/06: [CASSANDRA-17013] CEP-10 Phase 1: in-jvm-dtest-api changes and version bump

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f5fb1b0bd32b5dc7da13ec66d43acbdad7fe9dbf
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Thu Jul 29 18:06:32 2021 +0100

    [CASSANDRA-17013] CEP-10 Phase 1: in-jvm-dtest-api changes and version bump
---
 build.xml                                          |   2 +-
 .../cassandra/concurrent/ExecutorFactory.java      |   2 +-
 .../cql3/functions/JavaBasedUDFunction.java        |   1 +
 .../cassandra/cql3/functions/UDFunction.java       |   2 +-
 .../org/apache/cassandra/gms/EndpointState.java    |   5 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    |  10 +
 .../org/apache/cassandra/gms/HeartBeatState.java   |   5 +-
 .../cassandra/io/sstable/format/SSTableReader.java |   2 +
 src/java/org/apache/cassandra/io/util/File.java    |   8 +-
 .../apache/cassandra/triggers/TriggerExecutor.java |   2 +-
 .../org/apache/cassandra/distributed/Cluster.java  |   1 -
 .../distributed/impl/AbstractCluster.java          | 103 +++++-
 .../cassandra/distributed/impl/Coordinator.java    |   4 +-
 .../impl/DelegatingInvokableInstance.java          |  62 ++++
 .../impl/DirectStreamingConnectionFactory.java     | 386 +++++++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       | 247 ++++++-------
 .../cassandra/distributed/impl/InstanceConfig.java |  22 +-
 .../cassandra/distributed/impl/InstanceKiller.java |   1 +
 .../distributed/impl/IsolatedExecutor.java         |  95 +++--
 .../apache/cassandra/distributed/impl/Query.java   | 110 ++++++
 .../distributed/impl/UnsafeGossipHelper.java       | 265 ++++++++++++++
 .../apache/cassandra/distributed/test/CASTest.java |  32 +-
 .../distributed/upgrade/UpgradeTestBase.java       |   3 +-
 .../cassandra/db/RangeTombstoneListTest.java       |   3 +-
 .../utils/concurrent/ImmediateFutureTest.java      |   3 +-
 25 files changed, 1141 insertions(+), 235 deletions(-)

diff --git a/build.xml b/build.xml
index f5acaff..b8eabb8 100644
--- a/build.xml
+++ b/build.xml
@@ -532,7 +532,7 @@
           <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" scope="test">
             <exclusion groupId="com.google.guava" artifactId="guava"/>
           </dependency>
-          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.9" scope="test"/>
+          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.10" scope="test"/>
           <dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" scope="test"/>
           <dependency groupId="com.puppycrawl.tools" artifactId="checkstyle" version="8.40" scope="test"/>
           <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.3" scope="provided">
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java b/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
index 9c7a2cf..52ba94a 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
@@ -146,7 +146,7 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
     {
         // deliberately not volatile to ensure zero overhead outside of testing;
         // depend on other memory visibility primitives to ensure visibility
-        private static ExecutorFactory FACTORY = new ExecutorFactory.Default(null, null, JVMStabilityInspector::uncaughtException);
+        private static ExecutorFactory FACTORY = new ExecutorFactory.Default(Global.class.getClassLoader(), null, JVMStabilityInspector::uncaughtException);
         public static ExecutorFactory executorFactory()
         {
             return FACTORY;
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
index 3200c88..10be467 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
@@ -350,6 +350,7 @@ public final class JavaBasedUDFunction extends UDFunction
         catch (InvocationTargetException e)
         {
             // in case of an ITE, use the cause
+            logger.error(String.format("Could not compile function '%s' from Java source:%n%s", name, javaSource), e);
             throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e.getCause()));
         }
         catch (InvalidRequestException | VirtualMachineError e)
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 172b998..eccea3c 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -743,7 +743,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
     private static class UDFClassLoader extends ClassLoader
     {
         // insecureClassLoader is the C* class loader
-        static final ClassLoader insecureClassLoader = Thread.currentThread().getContextClassLoader();
+        static final ClassLoader insecureClassLoader = UDFClassLoader.class.getClassLoader();
 
         private UDFClassLoader()
         {
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index a48a857..b7f6bdb 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -24,6 +24,8 @@ import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.util.File;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,7 +75,8 @@ public class EndpointState
         isAlive = true;
     }
 
-    HeartBeatState getHeartBeatState()
+    @VisibleForTesting
+    public HeartBeatState getHeartBeatState()
     {
         return hbState;
     }
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 98068db..3219145 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -662,6 +662,16 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         GossiperDiagnostics.removedEndpoint(this, endpoint);
     }
 
+    @VisibleForTesting
+    public void unsafeAnnulEndpoint(InetAddressAndPort endpoint)
+    {
+        removeEndpoint(endpoint);
+        justRemovedEndpoints.remove(endpoint);
+        endpointStateMap.remove(endpoint);
+        expireTimeEndpointMap.remove(endpoint);
+        unreachableEndpoints.remove(endpoint);
+    }
+
     /**
      * Quarantines the endpoint for QUARANTINE_DELAY
      *
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index 104d957..d0a7142 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.gms;
 
 import java.io.*;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -89,7 +91,8 @@ public class HeartBeatState
         generation += 1;
     }
 
-    void forceHighestPossibleVersionUnsafe()
+    @VisibleForTesting
+    public void forceHighestPossibleVersionUnsafe()
     {
         version = Integer.MAX_VALUE;
     }
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 1748a43..ebdb2ef 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -32,6 +32,8 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.slf4j.Logger;
diff --git a/src/java/org/apache/cassandra/io/util/File.java b/src/java/org/apache/cassandra/io/util/File.java
index 2b60904..5fa5c9d 100644
--- a/src/java/org/apache/cassandra/io/util/File.java
+++ b/src/java/org/apache/cassandra/io/util/File.java
@@ -48,7 +48,8 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
  */
 public class File implements Comparable<File>
 {
-    private static final FileSystem filesystem = FileSystems.getDefault();
+    private static FileSystem filesystem = FileSystems.getDefault();
+
     public enum WriteMode { OVERWRITE, APPEND }
 
     public static String pathSeparator()
@@ -604,5 +605,10 @@ public class File implements Comparable<File>
             throw new IllegalStateException("Cannot read from an empty path");
         return path;
     }
+
+    public static void unsafeSetFilesystem(FileSystem fs)
+    {
+        filesystem = fs;
+    }
 }
 
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 298ac56..c76c6bd 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -44,7 +44,7 @@ public class TriggerExecutor
     public static final TriggerExecutor instance = new TriggerExecutor();
 
     private final Map<String, ITrigger> cachedTriggers = Maps.newConcurrentMap();
-    private final ClassLoader parent = Thread.currentThread().getContextClassLoader();
+    private final ClassLoader parent = TriggerExecutor.class.getClassLoader();
     private volatile ClassLoader customClassLoader;
 
     private TriggerExecutor()
diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index a613fc5..05ea799 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.distributed.shared.Versions;
 @Shared
 public class Cluster extends AbstractCluster<IInvokableInstance>
 {
-
     private Cluster(Builder builder)
     {
         super(builder);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index eca9088..2f146bf 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.distributed.impl;
 
 import java.lang.annotation.Annotation;
 import java.net.InetSocketAddress;
+import java.nio.file.FileSystem;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -29,11 +32,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -54,15 +58,18 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IClassTransformer;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInstanceInitializer;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.IMessageSink;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.api.LogAction;
 import org.apache.cassandra.distributed.api.NodeToolResult;
@@ -75,8 +82,6 @@ import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.shared.Shared;
 import org.apache.cassandra.distributed.shared.ShutdownException;
 import org.apache.cassandra.distributed.shared.Versions;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.PathUtils;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.utils.concurrent.Condition;
@@ -125,7 +130,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
     // include byteman so tests can use
     private static final Set<String> SHARED_CLASSES = findClassesMarkedForSharedClassLoader();
     private static final Set<String> ISOLATED_CLASSES = findClassesMarkedForInstanceClassLoader();
-    private static final Predicate<String> SHARED_PREDICATE = s -> {
+    public static final Predicate<String> SHARED_PREDICATE = s -> {
         if (ISOLATED_CLASSES.contains(s))
             return false;
 
@@ -135,8 +140,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
     };
 
     private final UUID clusterId = UUID.randomUUID();
-    private final File root;
+    private final Path root;
     private final ClassLoader sharedClassLoader;
+    private final Predicate<String> sharedClassPredicate;
+    private final IClassTransformer classTransformer;
     private final int subnet;
     private final TokenSupplier tokenSupplier;
     private final Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
@@ -152,12 +159,16 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
     // mutated by user-facing API
     private final MessageFilters filters;
     private final INodeProvisionStrategy.Strategy nodeProvisionStrategy;
-    private final BiConsumer<ClassLoader, Integer> instanceInitializer;
+    private final IInstanceInitializer instanceInitializer;
     private final int datadirCount;
     private volatile Thread.UncaughtExceptionHandler previousHandler = null;
     private volatile BiPredicate<Integer, Throwable> ignoreUncaughtThrowable = null;
     private final List<Throwable> uncaughtExceptions = new CopyOnWriteArrayList<>();
 
+    private final ThreadGroup clusterThreadGroup = new ThreadGroup(clusterId.toString());
+
+    private volatile IMessageSink messageSink;
+
     /**
      * Common builder, add methods that are applicable to both Cluster and Upgradable cluster here.
      */
@@ -169,6 +180,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         public AbstractBuilder(Factory<I, C, B> factory)
         {
             super(factory);
+            withSharedClasses(SHARED_PREDICATE);
         }
 
         public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy nodeProvisionStrategy)
@@ -215,11 +227,33 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
 
         private IInvokableInstance newInstance(int generation)
         {
-            ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader, SHARED_PREDICATE);
+            ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader, sharedClassPredicate, classTransformer);
+            ThreadGroup threadGroup = new ThreadGroup(clusterThreadGroup, "node" + config.num() + (generation > 1 ? "_" + generation : ""));
             if (instanceInitializer != null)
-                instanceInitializer.accept(classLoader, config.num());
-            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
-                                        .apply(config.forVersion(version.version), classLoader);
+                instanceInitializer.initialise(classLoader, threadGroup, config.num(), generation);
+
+            IInvokableInstance instance;
+            try
+            {
+                instance = Instance.transferAdhoc((SerializableTriFunction<IInstanceConfig, ClassLoader, FileSystem, Instance>)Instance::new, classLoader)
+                                   .apply(config.forVersion(version.version), classLoader, root.getFileSystem());
+            }
+            catch (NoSuchMethodError e)
+            {
+                instance = Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
+                                   .apply(config.forVersion(version.version), classLoader);
+            }
+
+            if (instanceInitializer != null)
+                instanceInitializer.beforeStartup(instance);
+
+            return instance;
+        }
+
+        public Executor executorFor(int verb)
+        {
+            // this method must be lock-free to avoid Simulator deadlock
+            return delegate().executorFor(verb);
         }
 
         public IInstanceConfig config()
@@ -241,7 +275,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         public synchronized void startup()
         {
             startup(AbstractCluster.this);
+            postStartup();
         }
+
         public synchronized void startup(ICluster cluster)
         {
             if (cluster != AbstractCluster.this)
@@ -284,6 +320,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
                 throw t;
             }
             updateMessagingVersions();
+
+            if (instanceInitializer != null)
+                instanceInitializer.afterStartup(this);
         }
 
         @Override
@@ -342,6 +381,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         }
 
         @Override
+        public void receiveMessageWithInvokingThread(IMessage message)
+        {
+            IInvokableInstance delegate = this.delegate;
+            if (isRunning() && delegate != null) // since we sync directly on the other node, we drop messages immediately if we are shutdown
+                delegate.receiveMessageWithInvokingThread(message);
+        }
+
+        @Override
         public boolean getLogsEnabled()
         {
             return delegate().getLogsEnabled();
@@ -388,8 +435,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
 
     protected AbstractCluster(AbstractBuilder<I, ? extends ICluster<I>, ?> builder)
     {
-        this.root = new File(builder.getRoot());
+        this.root = builder.getRootPath();
         this.sharedClassLoader = builder.getSharedClassLoader();
+        this.sharedClassPredicate = builder.getSharedClasses();
+        this.classTransformer = builder.getClassTransformer();
         this.subnet = builder.getSubnet();
         this.tokenSupplier = builder.getTokenSupplier();
         this.nodeIdTopology = builder.getNodeIdTopology();
@@ -400,7 +449,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         this.instanceMap = new ConcurrentHashMap<>();
         this.initialVersion = builder.getVersion();
         this.filters = new MessageFilters();
-        this.instanceInitializer = builder.getInstanceInitializer();
+        this.instanceInitializer = builder.getInstanceInitializer2();
         this.datadirCount = builder.getDatadirCount();
 
         int generation = GENERATION.incrementAndGet();
@@ -526,7 +575,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
                                               i.config().localRack().equals(rackName));
     }
 
-    public void run(Consumer<? super I> action,  Predicate<I> filter)
+    public void run(Consumer<? super I> action, Predicate<I> filter)
     {
         run(Collections.singletonList(action), filter);
     }
@@ -594,6 +643,20 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         return filters;
     }
 
+    public synchronized void setMessageSink(IMessageSink sink)
+    {
+        if (messageSink != null && sink != null)
+            throw new IllegalStateException();
+        this.messageSink = sink;
+    }
+
+    public void deliverMessage(InetSocketAddress to, IMessage message)
+    {
+        IMessageSink sink = messageSink;
+        if (sink == null) get(to).receiveMessage(message);
+        else sink.accept(to, message);
+    }
+
     public IMessageFilters.Builder verbs(Verb... verbs)
     {
         int[] ids = new int[verbs.length];
@@ -815,8 +878,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
                     startParallel.add(instance);
             }
 
-            forEach(startSequentially, I::startup);
-            parallelForEach(startParallel, I::startup, 0, null);
+            forEach(startSequentially, i -> {
+                i.startup(this);
+                i.postStartup();
+            });
+            parallelForEach(startParallel, i -> {
+                i.startup(this);
+                i.postStartup();
+            }, 0, null);
             monitor.waitForCompletion();
         }
     }
@@ -859,8 +928,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         instanceMap.clear();
         PathUtils.setDeletionListener(ignore -> {});
         // Make sure to only delete directory when threads are stopped
-        if (root.exists())
-            FileUtils.deleteRecursive(root);
+        if (Files.exists(root))
+            PathUtils.deleteRecursive(root);
         Thread.setDefaultUncaughtExceptionHandler(previousHandler);
         previousHandler = null;
         checkAndResetUncaughtExceptions();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 9c4f255..9fdebed 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -80,7 +80,7 @@ public class Coordinator implements ICoordinator
         }).call();
     }
 
-    protected org.apache.cassandra.db.ConsistencyLevel toCassandraCL(ConsistencyLevel cl)
+    static org.apache.cassandra.db.ConsistencyLevel toCassandraCL(ConsistencyLevel cl)
     {
         return org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal());
     }
@@ -210,7 +210,7 @@ public class Coordinator implements ICoordinator
         }).call();
     }
 
-    private static final ClientState makeFakeClientState()
+    static ClientState makeFakeClientState()
     {
         return ClientState.forExternalCalls(new InetSocketAddress(FBUtilities.getJustLocalAddress(), 9042));
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index a6bd0f6..79bf946 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.distributed.impl;
 import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
@@ -31,6 +33,7 @@ import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
@@ -134,18 +137,42 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
     }
 
     @Override
+    public IIsolatedExecutor with(ExecutorService executor)
+    {
+        return delegate().with(executor);
+    }
+
+    @Override
+    public Executor executor()
+    {
+        return delegate().executor();
+    }
+
+    @Override
     public void startup(ICluster cluster)
     {
         delegateForStartup().startup(cluster);
     }
 
     @Override
+    public void postStartup()
+    {
+        delegateForStartup().postStartup();
+    }
+
+    @Override
     public void receiveMessage(IMessage message)
     {
         delegate().receiveMessage(message);
     }
 
     @Override
+    public void receiveMessageWithInvokingThread(IMessage message)
+    {
+        delegate().receiveMessageWithInvokingThread(message);
+    }
+
+    @Override
     public <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call)
     {
         return delegate().async(call);
@@ -194,6 +221,18 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
     }
 
     @Override
+    public <I1, I2, I3> TriFunction<I1, I2, I3, Future<?>> async(TriConsumer<I1, I2, I3> consumer)
+    {
+        return delegate().async(consumer);
+    }
+
+    @Override
+    public <I1, I2, I3> TriConsumer<I1, I2, I3> sync(TriConsumer<I1, I2, I3> consumer)
+    {
+        return delegate().sync(consumer);
+    }
+
+    @Override
     public <I, O> Function<I, Future<O>> async(Function<I, O> f)
     {
         return delegate().async(f);
@@ -229,4 +268,27 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
         return delegate().sync(f);
     }
 
+    @Override
+    public <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, Future<O>> async(QuadFunction<I1, I2, I3, I4, O> f)
+    {
+        return delegate().async(f);
+    }
+
+    @Override
+    public <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, O> sync(QuadFunction<I1, I2, I3, I4, O> f)
+    {
+        return delegate().sync(f);
+    }
+
+    @Override
+    public <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, Future<O>> async(QuintFunction<I1, I2, I3, I4, I5, O> f)
+    {
+        return delegate().async(f);
+    }
+
+    @Override
+    public <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, O> sync(QuintFunction<I1, I2, I3, I4, I5, O> f)
+    {
+        return delegate().sync(f);
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DirectStreamingConnectionFactory.java b/test/distributed/org/apache/cassandra/distributed/impl/DirectStreamingConnectionFactory.java
new file mode 100644
index 0000000..e598dc1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DirectStreamingConnectionFactory.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.streaming.StreamDeserializingTask;
+import org.apache.cassandra.streaming.StreamingDataInputPlus;
+import org.apache.cassandra.streaming.StreamingDataOutputPlus;
+import org.apache.cassandra.streaming.StreamingChannel;
+import org.apache.cassandra.streaming.StreamingDataOutputPlusFixed;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.concurrent.NotScheduledFuture;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+
+import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.net.MessagingService.*;
+
+public class DirectStreamingConnectionFactory
+{
+    static class DirectConnection
+    {
+        private static final AtomicInteger nextId = new AtomicInteger();
+
+        final int protocolVersion;
+
+        // TODO rename
+        private static class Buffer
+        {
+            boolean isClosed;
+            byte[] pending;
+        }
+
+        @SuppressWarnings({"InnerClassMayBeStatic","unused"}) // helpful for debugging
+        class DirectStreamingChannel implements StreamingChannel
+        {
+            class Out extends BufferedDataOutputStreamPlus implements StreamingDataOutputPlus
+            {
+                private final Buffer out;
+                private Thread thread;
+                private boolean inUse;
+
+                Out(Buffer out)
+                {
+                    super(ByteBuffer.allocate(16 << 10));
+                    this.out = out;
+                }
+
+                protected void doFlush(int count) throws IOException
+                {
+                    if (buffer.position() == 0)
+                        return;
+
+                    try
+                    {
+                        synchronized (out)
+                        {
+                            while (out.pending != null && !out.isClosed)
+                                out.wait();
+
+                            if (out.isClosed)
+                                throw new ClosedChannelException();
+
+                            buffer.flip();
+                            out.pending = ByteBufferUtil.getArray(buffer);
+                            buffer.clear();
+
+                            out.notify();
+                        }
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new UncheckedInterruptedException(e);
+                    }
+                }
+
+                public synchronized Out acquire()
+                {
+                    if (inUse)
+                        throw new IllegalStateException();
+                    inUse = true;
+                    thread = Thread.currentThread();
+                    return this;
+                }
+
+                public synchronized void close() throws IOException
+                {
+                    flush();
+                    inUse = false;
+                }
+
+                void realClose()
+                {
+                    synchronized (out)
+                    {
+                        out.isClosed = true;
+                        out.notifyAll();
+                    }
+                }
+
+                @Override
+                public int writeToChannel(Write write, RateLimiter limiter) throws IOException
+                {
+                    class Holder
+                    {
+                        ByteBuffer buffer;
+                    }
+                    Holder holder = new Holder();
+
+                    write.write(size -> {
+                        if (holder.buffer != null)
+                            throw new IllegalStateException("Can only allocate one ByteBuffer");
+                        holder.buffer = ByteBuffer.allocate(size);
+                        return holder.buffer;
+                    });
+
+                    ByteBuffer buffer = holder.buffer;
+                    int length = buffer.limit();
+                    write(buffer);
+                    return length;
+                }
+
+                // TODO (future): support RateLimiter
+                @Override
+                public long writeFileToChannel(FileChannel file, RateLimiter limiter) throws IOException
+                {
+                    long count = 0;
+                    while (file.read(buffer) >= 0)
+                    {
+                        count += buffer.position();
+                        doFlush(0);
+                    }
+                    return count;
+                }
+            }
+
+            class In extends RebufferingInputStream implements StreamingDataInputPlus
+            {
+                private final Buffer in;
+                private Thread thread;
+
+                In(Buffer in)
+                {
+                    super(ByteBuffer.allocate(0));
+                    this.in = in;
+                }
+
+                protected void reBuffer() throws IOException
+                {
+                    try
+                    {
+                        synchronized (in)
+                        {
+                            byte[] bytes;
+                            while ((bytes = in.pending) == null && !in.isClosed)
+                                in.wait();
+
+                            if (bytes == null)
+                                throw new ClosedChannelException();
+
+                            in.pending = null;
+                            buffer = ByteBuffer.wrap(bytes);
+
+                            in.notify();
+                        }
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new UncheckedInterruptedException(e);
+                    }
+                }
+
+                public void close()
+                {
+                    DirectStreamingChannel.this.close();
+                }
+
+                public void realClose()
+                {
+                    synchronized (in)
+                    {
+                        in.isClosed = true;
+                        in.notifyAll();
+                    }
+                }
+            }
+
+            final InetSocketAddress remoteAddress;
+
+            private final In in;
+            private final Out out;
+            private final Integer id = nextId.incrementAndGet();
+            Runnable onClose;
+            boolean isClosed;
+
+            DirectStreamingChannel(InetSocketAddress remoteAddress, Buffer outBuffer, Buffer inBuffer)
+            {
+                this.remoteAddress = remoteAddress;
+                this.in = new In(inBuffer);
+                this.out = new Out(outBuffer);
+            }
+
+            public StreamingDataOutputPlus acquireOut()
+            {
+                return out.acquire();
+            }
+
+            @Override
+            public synchronized Future<?> send(Send send) throws IOException
+            {
+                class Factory implements IntFunction<StreamingDataOutputPlus>
+                {
+                    ByteBuffer buffer;
+                    @Override
+                    public StreamingDataOutputPlus apply(int size)
+                    {
+                        buffer = ByteBuffer.allocate(size);
+                        return new StreamingDataOutputPlusFixed(buffer);
+                    }
+                }
+                Factory factory = new Factory();
+                send.send(factory);
+                factory.buffer.flip();
+                try (StreamingDataOutputPlus out = acquireOut())
+                {
+                    out.write(factory.buffer);
+                }
+                return ImmediateFuture.success(true);
+            }
+
+            @Override
+            public Object id()
+            {
+                return id;
+            }
+
+            @Override
+            public String description()
+            {
+                return remoteAddress.getAddress().getHostAddress() + "/in@" + id;
+            }
+
+            public StreamingDataInputPlus in()
+            {
+                in.thread = Thread.currentThread();
+                return in;
+            }
+
+            public InetSocketAddress peer()
+            {
+                return remoteAddress;
+            }
+
+            @Override
+            public InetSocketAddress connectedTo()
+            {
+                return remoteAddress;
+            }
+
+            @Override
+            public boolean connected()
+            {
+                return true;
+            }
+
+            @Override
+            public Future<?> close()
+            {
+                in.realClose();
+                out.realClose();
+                synchronized (this)
+                {
+                    if (!isClosed)
+                    {
+                        isClosed = true;
+                        if (onClose != null)
+                            onClose.run();
+                    }
+                }
+                return ImmediateFuture.success(null);
+            }
+
+            @Override
+            public synchronized void onClose(Runnable runOnClose)
+            {
+                if (isClosed) runOnClose.run();
+                else if (onClose == null) onClose = runOnClose;
+                else { Runnable tmp = onClose; onClose = () -> { tmp.run(); runOnClose.run(); }; }
+            }
+        }
+
+        private final DirectStreamingChannel outToRecipient, outToOriginator;
+
+        DirectConnection(int protocolVersion, InetSocketAddress originator, InetSocketAddress recipient)
+        {
+            this.protocolVersion = protocolVersion;
+            Buffer buffer1 = new Buffer(), buffer2 = new Buffer();
+            outToRecipient = new DirectStreamingChannel(recipient, buffer1, buffer2);
+            outToOriginator = new DirectStreamingChannel(originator, buffer2, buffer1);
+        }
+
+        StreamingChannel get(InetSocketAddress remoteAddress)
+        {
+            if (remoteAddress.equals(outToOriginator.remoteAddress)) return outToOriginator;
+            else if (remoteAddress.equals(outToRecipient.remoteAddress)) return outToRecipient;
+            else throw new IllegalArgumentException();
+        }
+    }
+
+    public class Factory implements StreamingChannel.Factory
+    {
+        final InetSocketAddress from;
+        Factory(InetSocketAddress from)
+        {
+            this.from = from;
+        }
+
+        @Override
+        public StreamingChannel create(InetSocketAddress to, int messagingVersion, StreamingChannel.Kind kind)
+        {
+            DirectConnection connection = new DirectConnection(messagingVersion, from, to);
+            IInvokableInstance instance = cluster.get(to);
+            instance.unsafeAcceptOnThisThread((channel, version) -> executorFactory().startThread(channel.description(), new StreamDeserializingTask(null, channel, version)),
+                         connection.get(from), messagingVersion);
+            return connection.get(to);
+        }
+    }
+
+    final ICluster<IInvokableInstance> cluster;
+    final int protocolVersion;
+
+    private DirectStreamingConnectionFactory(ICluster<IInvokableInstance> cluster)
+    {
+        this.cluster = cluster;
+        // we don't invoke this on the host ClassLoader as it initiates state like DatabaseDescriptor,
+        // potentially leading to resource leaks on the hosts (particularly in validateHeader which runs on the host threads)
+        this.protocolVersion = current_version;
+    }
+
+    public static Function<IInvokableInstance, StreamingChannel.Factory> create(ICluster<IInvokableInstance> cluster)
+    {
+        return cluster.get(1).unsafeApplyOnThisThread(c -> new DirectStreamingConnectionFactory(c)::get, cluster);
+    }
+
+    public static void setup(ICluster<IInvokableInstance> cluster)
+    {
+        Function<IInvokableInstance, StreamingChannel.Factory> streamingConnectionFactory = create(cluster);
+        cluster.stream().forEach(i -> i.unsafeAcceptOnThisThread(StreamingChannel.Factory.Global::unsafeSet, streamingConnectionFactory.apply(i)));
+    }
+
+    public Factory get(IInvokableInstance instance)
+    {
+        return new Factory(instance.config().broadcastAddress());
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 3f67b2f..18c1573 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -19,12 +19,12 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.file.FileSystem;
 import java.security.Permission;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -49,6 +49,7 @@ import org.apache.cassandra.auth.AuthCache;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.ExecutorLocals;
+import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.SharedExecutorPool;
 import org.apache.cassandra.concurrent.Stage;
@@ -67,8 +68,6 @@ import org.apache.cassandra.db.SystemKeyspaceMigrator40;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionLogger;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.action.GossipHelper;
@@ -85,9 +84,8 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
 import org.apache.cassandra.distributed.shared.Metrics;
-import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.distributed.shared.ThrowingRunnable;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.hints.DTestSerializer;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.index.SecondaryIndexManager;
@@ -130,6 +128,7 @@ import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.Closeable;
 import org.apache.cassandra.utils.DiagnosticSnapshotService;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
@@ -141,6 +140,7 @@ import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
 import org.apache.cassandra.utils.memory.BufferPools;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
@@ -149,18 +149,27 @@ import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCass
 import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
+/**
+ * This class is instantiated on the relevant classloader, so its methods invoke the correct target classes automatically
+ */
 public class Instance extends IsolatedExecutor implements IInvokableInstance
 {
     public final IInstanceConfig config;
     private volatile boolean initialized = false;
     private final long startedAt;
 
-    // should never be invoked directly, so that it is instantiated on other class loader;
-    // only visible for inheritance
+    @Deprecated
     Instance(IInstanceConfig config, ClassLoader classLoader)
     {
-        super("node" + config.num(), classLoader);
+        this(config, classLoader, null);
+    }
+
+    Instance(IInstanceConfig config, ClassLoader classLoader, FileSystem fileSystem)
+    {
+        super("node" + config.num(), classLoader, executorFactory().pooled("isolatedExecutor", Integer.MAX_VALUE));
         this.config = config;
+        if (fileSystem != null)
+            File.unsafeSetFilesystem(fileSystem);
         Object clusterId = Objects.requireNonNull(config.get(Constants.KEY_DTEST_API_CLUSTER_ID), "cluster_id is not defined");
         ClusterIDDefiner.setId("cluster-" + clusterId);
         InstanceIDDefiner.setInstanceId(config.num());
@@ -230,31 +239,34 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     @Override
     public SimpleQueryResult executeInternalWithResult(String query, Object... args)
     {
-        return sync(() -> {
-            ClientWarn.instance.captureWarnings();
-            CoordinatorWarnings.init();
-            try
-            {
-                QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
-                ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
-                                                           QueryProcessor.makeInternalOptions(prepared.statement, args));
-                CoordinatorWarnings.done();
-
-                if (result != null)
-                    result.setWarnings(ClientWarn.instance.getWarnings());
-                return RowUtil.toQueryResult(result);
-            }
-            catch (Exception | Error e)
-            {
-                CoordinatorWarnings.done();
-                throw e;
-            }
-            finally
-            {
-                CoordinatorWarnings.reset();
-                ClientWarn.instance.resetWarnings();
-            }
-        }).call();
+        return sync(() -> unsafeExecuteInternalWithResult(query, args)).call();
+    }
+
+    public static SimpleQueryResult unsafeExecuteInternalWithResult(String query, Object ... args)
+    {
+        ClientWarn.instance.captureWarnings();
+        CoordinatorWarnings.init();
+        try
+        {
+            QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
+            ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
+                                                                     QueryProcessor.makeInternalOptions(prepared.statement, args));
+            CoordinatorWarnings.done();
+
+            if (result != null)
+                result.setWarnings(ClientWarn.instance.getWarnings());
+            return RowUtil.toQueryResult(result);
+        }
+        catch (Exception | Error e)
+        {
+            CoordinatorWarnings.done();
+            throw e;
+        }
+        finally
+        {
+            CoordinatorWarnings.reset();
+            ClientWarn.instance.resetWarnings();
+        }
     }
 
     @Override
@@ -297,20 +309,19 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }).run();
     }
 
-    private void registerMockMessaging(ICluster cluster)
+    private void registerMockMessaging(ICluster<?> cluster)
     {
         MessagingService.instance().outboundSink.add((message, to) -> {
-            InetSocketAddress toAddr = fromCassandraInetAddressAndPort(to);
-            IInstance toInstance = cluster.get(toAddr);
-            if (toInstance != null)
-                toInstance.receiveMessage(serializeMessage(message.from(), to, message));
+            cluster.deliverMessage(to, serializeMessage(message.from(), to, message));
             return false;
         });
     }
 
-    private void registerInboundFilter(ICluster cluster)
+    private void registerInboundFilter(ICluster<?> cluster)
     {
         MessagingService.instance().inboundSink.add(message -> {
+            if (!cluster.filters().hasInbound())
+                return true;
             if (isShutdown())
                 return false;
             IMessage serialized = serializeMessage(message.from(), toCassandraInetAddressAndPort(broadcastAddress()), message);
@@ -343,7 +354,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         sync(JVMStabilityInspector::uncaughtException).accept(thread, throwable);
     }
 
-    private static IMessage serializeMessage(InetAddressAndPort from, InetAddressAndPort to, Message<?> messageOut)
+    public static IMessage serializeMessage(InetAddressAndPort from, InetAddressAndPort to, Message<?> messageOut)
     {
         int fromVersion = MessagingService.instance().versions.get(from);
         int toVersion = MessagingService.instance().versions.get(to);
@@ -434,7 +445,20 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     @Override
     public void receiveMessage(IMessage message)
     {
-        sync(() -> {
+        sync(receiveMessageRunnable(message)).accept(false);
+    }
+
+    @Override
+    public void receiveMessageWithInvokingThread(IMessage message)
+    {
+        if (classLoader != Thread.currentThread().getContextClassLoader())
+            throw new IllegalStateException("Must be invoked by a Thread utilising the node's class loader");
+        receiveMessageRunnable(message).accept(true);
+    }
+
+    private SerializableConsumer<Boolean> receiveMessageRunnable(IMessage message)
+    {
+        return runOnCaller -> {
             if (message.version() > MessagingService.current_version)
             {
                 throw new IllegalStateException(String.format("Node%d received message version %d but current version is %d",
@@ -446,15 +470,26 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
             Message<?> messageIn = deserializeMessage(message);
             Message.Header header = messageIn.header;
             TraceState state = Tracing.instance.initializeFromMessage(header);
-            if (state != null) state.trace("{} message received from {}", header.verb, header.from);
-            header.verb.stage.execute(ExecutorLocals.create(state), () -> MessagingService.instance().inboundSink.accept(messageIn)
-            );
-        }).run();
+            if (state != null)
+                state.trace("{} message received from {}", header.verb, header.from);
+
+            if (runOnCaller)
+            {
+                try (Closeable close = ExecutorLocals.create(state))
+                {
+                    MessagingService.instance().inboundSink.accept(messageIn);
+                }
+            }
+            else
+            {
+                header.verb.stage.executor().execute(ExecutorLocals.create(state), () -> MessagingService.instance().inboundSink.accept(messageIn));
+            }
+        };
     }
 
     public int getMessagingVersion()
     {
-        return callsOnInstance(() -> MessagingService.current_version).call();
+        return MessagingService.current_version;
     }
 
     @Override
@@ -466,26 +501,29 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     @Override
     public String getReleaseVersionString()
     {
-        return callsOnInstance(() -> FBUtilities.getReleaseVersionString()).call();
+        return FBUtilities.getReleaseVersionString();
     }
 
     public void flush(String keyspace)
     {
-        runOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(keyspace).flush()));
+        FBUtilities.waitOnFutures(Keyspace.open(keyspace).flush());
     }
 
     public void forceCompact(String keyspace, String table)
     {
-        runOnInstance(() -> {
-            try
-            {
-                Keyspace.open(keyspace).getColumnFamilyStore(table).forceMajorCompaction();
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        });
+        try
+        {
+            Keyspace.open(keyspace).getColumnFamilyStore(table).forceMajorCompaction();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public ExecutorPlus executorFor(int verbId)
+    {
+        return Verb.fromId(verbId).stage.executor();
     }
 
     @Override
@@ -602,6 +640,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
                     StorageService.instance.setUpDistributedSystemKeyspaces();
                     StorageService.instance.setNormalModeUnsafe();
+                    Gossiper.instance.register(StorageService.instance);
                 }
 
                 // Populate tokenMetadata for the second time,
@@ -610,7 +649,6 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
                 SystemKeyspace.finishStartup();
 
-                StorageService.instance.doAuthSetup(false);
                 CassandraDaemon.getInstanceForTesting().completeSetup();
 
                 if (config.has(NATIVE_PROTOCOL))
@@ -624,6 +662,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                     throw new IllegalStateException(String.format("%s != %s", FBUtilities.getBroadcastAddressAndPort(), broadcastAddress()));
 
                 ActiveRepairService.instance.start();
+                CassandraDaemon.getInstanceForTesting().completeSetup();
             }
             catch (Throwable t)
             {
@@ -636,6 +675,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         initialized = true;
     }
 
+    @Override
+    public void postStartup()
+    {
+        StorageService.instance.doAuthSetup(false);
+    }
 
     private void mkdirs()
     {
@@ -648,90 +692,13 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
     private Config loadConfig(IInstanceConfig overrides)
     {
-        Map<String,Object> params = ((InstanceConfig) overrides).getParams();
+        Map<String, Object> params = overrides.getParams();
         boolean check = true;
         if (overrides.get(Constants.KEY_DTEST_API_CONFIG_CHECK) != null)
             check = (boolean) overrides.get(Constants.KEY_DTEST_API_CONFIG_CHECK);
         return YamlConfigurationLoader.fromMap(params, check, Config.class);
     }
 
-    public static void addToRing(boolean bootstrapping, IInstance peer)
-    {
-        try
-        {
-            IInstanceConfig config = peer.config();
-            IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
-            Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
-            InetAddressAndPort addressAndPort = toCassandraInetAddressAndPort(peer.broadcastAddress());
-
-            UUID hostId = config.hostId();
-            Gossiper.runInGossipStageBlocking(() -> {
-                Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1);
-                Gossiper.instance.injectApplicationState(addressAndPort,
-                                                         ApplicationState.TOKENS,
-                                                         new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
-                StorageService.instance.onChange(addressAndPort,
-                                                 ApplicationState.STATUS,
-                                                 bootstrapping
-                                                 ? new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
-                                                 : new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
-                Gossiper.instance.realMarkAlive(addressAndPort, Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
-            });
-            int messagingVersion = peer.isShutdown()
-                    ? MessagingService.current_version
-                    : Math.min(MessagingService.current_version, peer.getMessagingVersion());
-            MessagingService.instance().versions.set(addressAndPort, messagingVersion);
-
-            assert bootstrapping || StorageService.instance.getTokenMetadata().isMember(addressAndPort);
-            PendingRangeCalculatorService.instance.blockUntilFinished();
-        }
-        catch (Throwable e) // UnknownHostException
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static void removeFromRing(IInstance peer)
-    {
-        try
-        {
-            IInstanceConfig config = peer.config();
-            IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
-            Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
-            InetAddressAndPort addressAndPort = toCassandraInetAddressAndPort(peer.broadcastAddress());
-
-            Gossiper.runInGossipStageBlocking(() -> {
-                StorageService.instance.onChange(addressAndPort,
-                        ApplicationState.STATUS,
-                        new VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token), 0L));
-            });
-        }
-        catch (Throwable e) // UnknownHostException
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static void addToRingNormal(IInstance peer)
-    {
-        addToRing(false, peer);
-        assert StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(peer.broadcastAddress()));
-    }
-
-    public static void addToRingBootstrapping(IInstance peer)
-    {
-        addToRing(true, peer);
-    }
-
-    private static void initializeRing(ICluster cluster)
-    {
-        for (int i = 1 ; i <= cluster.size() ; ++i)
-            addToRing(false, cluster.get(i));
-
-        for (int i = 1; i <= cluster.size(); ++i)
-            assert StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(cluster.get(i).broadcastAddress()));
-    }
-
     public Future<Void> shutdown()
     {
         return shutdown(true);
@@ -769,7 +736,6 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 () -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
                                 () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
                                 () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
-                                () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES),
                                 () -> BufferPools.shutdownLocalCleaner(1L, MINUTES),
                                 () -> Ref.shutdownReferenceReaper(1L, MINUTES),
                                 () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
@@ -786,11 +752,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 (IgnoreThrowingRunnable) () -> MessagingService.instance().shutdown(1L, MINUTES, false, true)
             );
             error = parallelRun(error, executor,
-                                () -> GlobalEventExecutor.INSTANCE.awaitInactivity(1l, MINUTES),
+                                () -> { try { GlobalEventExecutor.INSTANCE.awaitInactivity(1L, MINUTES); } catch (IllegalStateException ignore) {} },
                                 () -> Stage.shutdownAndWait(1L, MINUTES),
                                 () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
             );
             error = parallelRun(error, executor,
+                                () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES),
                                 () -> shutdownAndWait(Collections.singletonList(JMXBroadcastExecutor.executor))
             );
 
@@ -817,7 +784,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     @Override
     public Metrics metrics()
     {
-        return callOnInstance(() -> new InstanceMetrics(CassandraMetricsRegistry.Metrics));
+        return new InstanceMetrics(CassandraMetricsRegistry.Metrics);
     }
 
     @Override
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 81e254d..9978697 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.impl;
 
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
@@ -28,24 +29,18 @@ import java.util.UUID;
 import java.util.function.Function;
 
 import com.vdurmont.semver4j.Semver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.shared.Shared;
 import org.apache.cassandra.distributed.upgrade.UpgradeTestBase;
-import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.SimpleSeedProvider;
 
 @Shared
 public class InstanceConfig implements IInstanceConfig
 {
-    private static final Object NULL = new Object();
-    private static final Logger logger = LoggerFactory.getLogger(InstanceConfig.class);
-
     public final int num;
     public int num() { return num; }
 
@@ -130,7 +125,6 @@ public class InstanceConfig implements IInstanceConfig
         this.broadcastAddressAndPort = copy.broadcastAddressAndPort;
     }
 
-
     @Override
     public InetSocketAddress broadcastAddress()
     {
@@ -193,18 +187,12 @@ public class InstanceConfig implements IInstanceConfig
 
     public InstanceConfig set(String fieldName, Object value)
     {
-        if (value == null)
-            value = NULL;
         getParams(fieldName).put(fieldName, value);
         return this;
     }
 
-    private InstanceConfig forceSet(String fieldName, Object value)
+    public InstanceConfig forceSet(String fieldName, Object value)
     {
-        if (value == null)
-            value = NULL;
-
-        // test value
         getParams(fieldName).put(fieldName, value);
         return this;
     }
@@ -251,7 +239,7 @@ public class InstanceConfig implements IInstanceConfig
     public static InstanceConfig generate(int nodeNum,
                                           INodeProvisionStrategy provisionStrategy,
                                           NetworkTopology networkTopology,
-                                          File root,
+                                          Path root,
                                           String token,
                                           int datadirCount)
     {
@@ -273,9 +261,9 @@ public class InstanceConfig implements IInstanceConfig
                                   provisionStrategy.nativeTransportPort(nodeNum));
     }
 
-    private static String[] datadirs(int datadirCount, File root, int nodeNum)
+    private static String[] datadirs(int datadirCount, Path root, int nodeNum)
     {
-        String datadirFormat = String.format("%s/node%d/data%%d", root.path(), nodeNum);
+        String datadirFormat = String.format("%s/node%d/data%%d", root, nodeNum);
         String [] datadirs = new String[datadirCount];
         for (int i = 0; i < datadirs.length; i++)
             datadirs[i] = String.format(datadirFormat, i);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
index e7ca49b..99fc75d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
@@ -43,6 +43,7 @@ public class InstanceKiller extends JVMStabilityInspector.Killer
         // the bad part is that System.exit kills the JVM, so all code which calls kill won't hit the
         // next line; yet in in-JVM dtests System.exit is not desirable, so need to rely on a runtime exception
         // as a means to try to stop execution
+        // TODO (now): this is only used for one dtest, and can cause infinite loops with Simulator in e.g. AbstractCommitLogSegmentManager (failing its first assert, invoking the handler, throwing this exception, restarting)
         throw new InstanceShutdown();
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index a612781..2779110 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -28,8 +28,8 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URLClassLoader;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -44,7 +44,8 @@ import java.util.function.Supplier;
 import org.slf4j.LoggerFactory;
 
 import ch.qos.logback.classic.LoggerContext;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.Throwables;
@@ -55,15 +56,33 @@ public class IsolatedExecutor implements IIsolatedExecutor
 {
     final ExecutorService isolatedExecutor;
     private final String name;
-    private final ClassLoader classLoader;
-    private final Method deserializeOnInstance;
+    final ClassLoader classLoader;
+    private final DynamicFunction<Serializable> transfer;
 
-    IsolatedExecutor(String name, ClassLoader classLoader)
+    IsolatedExecutor(String name, ClassLoader classLoader, ExecutorFactory executorFactory)
+    {
+        this(name, classLoader, executorFactory.pooled("isolatedExecutor", Integer.MAX_VALUE));
+    }
+
+    IsolatedExecutor(String name, ClassLoader classLoader, ExecutorService executorService)
     {
         this.name = name;
-        this.isolatedExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("isolatedExecutor", Thread.NORM_PRIORITY, classLoader, new ThreadGroup(name)));
+        this.isolatedExecutor = executorService;
         this.classLoader = classLoader;
-        this.deserializeOnInstance = lookupDeserializeOneObject(classLoader);
+        this.transfer = transferTo(classLoader);
+    }
+
+    protected IsolatedExecutor(IsolatedExecutor from, ExecutorService executor)
+    {
+        this.name = from.name;
+        this.isolatedExecutor = executor;
+        this.classLoader = from.classLoader;
+        this.transfer = from.transfer;
+    }
+
+    public IIsolatedExecutor with(ExecutorService executor)
+    {
+        return new IsolatedExecutor(this, executor);
     }
 
     public Future<Void> shutdown()
@@ -93,6 +112,8 @@ public class IsolatedExecutor implements IIsolatedExecutor
                 LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
                 loggerContext.stop();
 
+                FastThreadLocal.destroy();
+
                 // Close the instance class loader after shutting down the isolatedExecutor and logging
                 // in case error handling triggers loading additional classes
                 ((URLClassLoader) classLoader).close();
@@ -120,6 +141,9 @@ public class IsolatedExecutor implements IIsolatedExecutor
     public <I1, I2> BiFunction<I1, I2, Future<?>> async(BiConsumer<I1, I2> consumer) { return (a, b) -> isolatedExecutor.submit(() -> consumer.accept(a, b)); }
     public <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer) { return (a, b) -> waitOn(async(consumer).apply(a, b)); }
 
+    public <I1, I2, I3> TriFunction<I1, I2, I3, Future<?>> async(TriConsumer<I1, I2, I3> consumer) { return (a, b, c) -> isolatedExecutor.submit(() -> consumer.accept(a, b, c)); }
+    public <I1, I2, I3> TriConsumer<I1, I2, I3> sync(TriConsumer<I1, I2, I3> consumer) { return (a, b, c) -> waitOn(async(consumer).apply(a, b, c)); }
+
     public <I, O> Function<I, Future<O>> async(Function<I, O> f) { return (a) -> isolatedExecutor.submit(() -> f.apply(a)); }
     public <I, O> Function<I, O> sync(Function<I, O> f) { return (a) -> waitOn(async(f).apply(a)); }
 
@@ -129,17 +153,42 @@ public class IsolatedExecutor implements IIsolatedExecutor
     public <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> async(TriFunction<I1, I2, I3, O> f) { return (a, b, c) -> isolatedExecutor.submit(() -> f.apply(a, b, c)); }
     public <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f) { return (a, b, c) -> waitOn(async(f).apply(a, b, c)); }
 
-    public <E extends Serializable> E transfer(E object)
+    public <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, Future<O>> async(QuadFunction<I1, I2, I3, I4, O> f) { return (a, b, c, d) -> isolatedExecutor.submit(() -> f.apply(a, b, c, d)); }
+    public <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, O> sync(QuadFunction<I1, I2, I3, I4, O> f) { return (a, b, c, d) -> waitOn(async(f).apply(a, b, c, d)); }
+
+    public <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, Future<O>> async(QuintFunction<I1, I2, I3, I4, I5, O> f) { return (a, b, c, d, e) -> isolatedExecutor.submit(() -> f.apply(a, b, c, d, e)); }
+    public <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, O> sync(QuintFunction<I1, I2, I3, I4, I5, O> f) { return (a, b, c, d,e ) -> waitOn(async(f).apply(a, b, c, d, e)); }
+
+    public Executor executor()
+    {
+        return isolatedExecutor;
+    }
+
+    public <T extends Serializable> T transfer(T in)
     {
-        return (E) transferOneObject(object, classLoader, deserializeOnInstance);
+        return transfer.apply(in);
     }
 
-    static <E extends Serializable> E transferAdhoc(E object, ClassLoader classLoader)
+    public static <T extends Serializable> T transferAdhoc(T object, ClassLoader classLoader)
     {
-        return transferOneObject(object, classLoader, lookupDeserializeOneObject(classLoader));
+        return transferOneObjectAdhoc(object, classLoader, lookupDeserializeOneObject(classLoader));
     }
 
-    private static <E extends Serializable> E transferOneObject(E object, ClassLoader classLoader, Method deserializeOnInstance)
+    private static final SerializableFunction<byte[], Object> DESERIALIZE_ONE_OBJECT = IsolatedExecutor::deserializeOneObject;
+
+    public static DynamicFunction<Serializable> transferTo(ClassLoader classLoader)
+    {
+        SerializableFunction<byte[], Object> deserializeOneObject = transferAdhoc(DESERIALIZE_ONE_OBJECT, classLoader);
+        return new DynamicFunction<Serializable>()
+        {
+            public <T extends Serializable> T apply(T in)
+            {
+                return (T) deserializeOneObject.apply(serializeOneObject(in));
+            }
+        };
+    }
+
+    private static <T extends Serializable> T transferOneObjectAdhoc(T object, ClassLoader classLoader, Method deserializeOnInstance)
     {
         byte[] bytes = serializeOneObject(object);
         try
@@ -148,11 +197,11 @@ public class IsolatedExecutor implements IIsolatedExecutor
             if (onInstance.getClass().getClassLoader() != classLoader)
                 throw new IllegalStateException(onInstance + " seemingly from wrong class loader: " + onInstance.getClass().getClassLoader() + ", but expected " + classLoader);
 
-            return (E) onInstance;
+            return (T) onInstance;
         }
         catch (IllegalAccessException | InvocationTargetException e)
         {
-            throw new RuntimeException("Error while transfering object to " + classLoader, e);
+            throw new RuntimeException(e);
         }
     }
 
@@ -214,22 +263,4 @@ public class IsolatedExecutor implements IIsolatedExecutor
         }
     }
 
-    public interface ThrowingRunnable
-    {
-        public void run() throws Throwable;
-
-        public static Runnable toRunnable(ThrowingRunnable runnable)
-        {
-            return () -> {
-                try
-                {
-                    runnable.run();
-                }
-                catch (Throwable throwable)
-                {
-                    throw new RuntimeException(throwable);
-                }
-            };
-        }
-    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Query.java b/test/distributed/org/apache/cassandra/distributed/impl/Query.java
new file mode 100644
index 0000000..1b03996
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Query.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.distributed.impl.Coordinator.toCassandraCL;
+
+// TODO: maybe just keep with Simulator?
+public class Query implements IIsolatedExecutor.SerializableCallable<Object[][]>
+{
+    final String query;
+    final long timestamp;
+    final org.apache.cassandra.distributed.api.ConsistencyLevel commitConsistencyOrigin;
+    final org.apache.cassandra.distributed.api.ConsistencyLevel serialConsistencyOrigin;
+    final Object[] boundValues;
+
+    public Query(String query, long timestamp, org.apache.cassandra.distributed.api.ConsistencyLevel commitConsistencyOrigin, org.apache.cassandra.distributed.api.ConsistencyLevel serialConsistencyOrigin, Object[] boundValues)
+    {
+        this.query = query;
+        this.timestamp = timestamp;
+        this.commitConsistencyOrigin = commitConsistencyOrigin;
+        this.serialConsistencyOrigin = serialConsistencyOrigin;
+        this.boundValues = boundValues;
+    }
+
+    public Object[][] call()
+    {
+        ConsistencyLevel commitConsistency = toCassandraCL(commitConsistencyOrigin);
+        ConsistencyLevel serialConsistency = serialConsistencyOrigin == null ? null : toCassandraCL(serialConsistencyOrigin);
+        ClientState clientState = Coordinator.makeFakeClientState();
+        CQLStatement prepared = QueryProcessor.getStatement(query, clientState);
+        List<ByteBuffer> boundBBValues = new ArrayList<>();
+        for (Object boundValue : boundValues)
+            boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
+
+        prepared.validate(QueryState.forInternalCalls().getClientState());
+
+        // Start capturing warnings on this thread. Note that this will implicitly clear out any previous
+        // warnings as it sets a new State instance on the ThreadLocal.
+        ClientWarn.instance.captureWarnings();
+
+        ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
+                                             QueryOptions.create(commitConsistency,
+                                                                 boundBBValues,
+                                                                 false,
+                                                                 Integer.MAX_VALUE,
+                                                                 null,
+                                                                 serialConsistency,
+                                                                 ProtocolVersion.V4,
+                                                                 null,
+                                                                 timestamp,
+                                                                 FBUtilities.nowInSeconds()),
+                                             System.nanoTime());
+
+        // Collect warnings reported during the query.
+        if (res != null)
+            res.setWarnings(ClientWarn.instance.getWarnings());
+
+        return RowUtil.toQueryResult(res).toObjectArrays();
+    }
+
+    public String toString()
+    {
+        return String.format(query.replaceAll("\\?", "%s") + " AT " + commitConsistencyOrigin, boundValues);
+    }
+
+    static org.apache.cassandra.db.ConsistencyLevel toCassandraCL(org.apache.cassandra.distributed.api.ConsistencyLevel cl)
+    {
+        return org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal());
+    }
+
+    static final org.apache.cassandra.distributed.api.ConsistencyLevel[] API_CLs = org.apache.cassandra.distributed.api.ConsistencyLevel.values();
+    static org.apache.cassandra.distributed.api.ConsistencyLevel fromCassandraCL(org.apache.cassandra.db.ConsistencyLevel cl)
+    {
+        return API_CLs[cl.ordinal()];
+    }
+
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java b/test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java
new file mode 100644
index 0000000..fd166f5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.UUID;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static java.util.Collections.singleton;
+import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
+import static org.apache.cassandra.locator.InetAddressAndPort.getByAddress;
+
+public class UnsafeGossipHelper
+{
+    public static class HostInfo implements Serializable
+    {
+        final InetSocketAddress address;
+        final UUID hostId;
+        final String tokenString;
+        final int messagingVersion;
+        final boolean isShutdown;
+
+        private HostInfo(InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown)
+        {
+            this.address = address;
+            this.hostId = hostId;
+            this.tokenString = tokenString;
+            this.messagingVersion = messagingVersion;
+            this.isShutdown = isShutdown;
+        }
+
+        private HostInfo(IInstance instance)
+        {
+            this(instance, instance.config().hostId(), instance.config().getString("initial_token"));
+        }
+
+        private HostInfo(IInstance instance, UUID hostId, String tokenString)
+        {
+            this(instance.broadcastAddress(), hostId, tokenString, instance.getMessagingVersion(), instance.isShutdown());
+        }
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingRunner(IIsolatedExecutor.SerializableBiFunction<VersionedValue.VersionedValueFactory, Collection<Token>, VersionedValue> statusFactory, InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown)
+    {
+        return () -> {
+            try
+            {
+                IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+                InetAddressAndPort addressAndPort = getByAddress(address);
+                Token token;
+                if (FBUtilities.getBroadcastAddressAndPort().equals(addressAndPort))
+                {
+                    if (tokenString == null)
+                    {
+                        token = Iterables.getOnlyElement(SystemKeyspace.getSavedTokens());
+                    }
+                    else
+                    {
+                        token = DatabaseDescriptor.getPartitioner().getTokenFactory().fromString(tokenString);
+                        SystemKeyspace.setLocalHostId(hostId);
+                        SystemKeyspace.updateTokens(singleton(token));
+                    }
+                }
+                else
+                {
+                    if (tokenString == null)
+                        throw new IllegalArgumentException();
+
+                    token = DatabaseDescriptor.getPartitioner().getTokenFactory().fromString(tokenString);
+                }
+
+                Gossiper.runInGossipStageBlocking(() -> {
+                    EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(addressAndPort);
+                    if (state == null)
+                    {
+                        Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1);
+                        state = Gossiper.instance.getEndpointStateForEndpoint(addressAndPort);
+                        Gossiper.instance.realMarkAlive(addressAndPort, state);
+                    }
+
+                    state.addApplicationState(ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(singleton(token)));
+                    VersionedValue status = statusFactory.apply(new VersionedValue.VersionedValueFactory(partitioner), singleton(token));
+                    state.addApplicationState(ApplicationState.STATUS_WITH_PORT, status);
+                    StorageService.instance.onChange(addressAndPort, ApplicationState.STATUS_WITH_PORT, status);
+                });
+
+                int setMessagingVersion = isShutdown
+                                          ? MessagingService.current_version
+                                          : Math.min(MessagingService.current_version, messagingVersion);
+                MessagingService.instance().versions.set(addressAndPort, setMessagingVersion);
+
+                PendingRangeCalculatorService.instance.blockUntilFinished();
+            }
+            catch (Throwable e) // UnknownHostException
+            {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(IInstance peer)
+    {
+        return addToRingNormalRunner(new HostInfo(peer));
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(IInstance peer, UUID hostId, String tokenString)
+    {
+        return addToRingNormalRunner(new HostInfo(peer, hostId, tokenString));
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(HostInfo info)
+    {
+        return addToRingNormalRunner(info.address, info.hostId, info.tokenString, info.messagingVersion, info.isShutdown);
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown)
+    {
+        return addToRingRunner(VersionedValue.VersionedValueFactory::normal, address, hostId, tokenString, messagingVersion, isShutdown);
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingRunner(IIsolatedExecutor.SerializableBiFunction<VersionedValue.VersionedValueFactory, Collection<Token>, VersionedValue> statusFactory, HostInfo info)
+    {
+        return addToRingRunner(statusFactory, info.address, info.hostId, info.tokenString, info.messagingVersion, info.isShutdown);
+    }
+
+    // reset gossip state so we know of the node being alive only
+    public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(IInstance instance)
+    {
+        return removeFromRingRunner(new HostInfo(instance));
+    }
+
+    // reset gossip state so we know of the node being alive only
+    public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(HostInfo info)
+    {
+        return removeFromRingRunner(info.address, info.hostId, info.tokenString);
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(InetSocketAddress address, UUID hostId, String tokenString)
+    {
+        return () -> {
+
+            try
+            {
+                IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+                Token token = partitioner.getTokenFactory().fromString(tokenString);
+                InetAddressAndPort addressAndPort = toCassandraInetAddressAndPort(address);
+
+                Gossiper.runInGossipStageBlocking(() -> {
+                    StorageService.instance.onChange(addressAndPort,
+                                                     ApplicationState.STATUS,
+                                                     new VersionedValue.VersionedValueFactory(partitioner).left(singleton(token), 0L));
+                    Gossiper.instance.unsafeAnnulEndpoint(addressAndPort);
+                    Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1);
+                    Gossiper.instance.realMarkAlive(addressAndPort, Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
+                });
+                PendingRangeCalculatorService.instance.blockUntilFinished();
+            }
+            catch (Throwable e) // UnknownHostException
+            {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingBootstrappingRunner(IInstance peer)
+    {
+        return addToRingRunner(VersionedValue.VersionedValueFactory::bootstrapping, new HostInfo(peer));
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingBootstrapReplacingRunner(IInstance peer, IInvokableInstance replacing, UUID hostId, String tokenString)
+    {
+        return addToRingBootstrapReplacingRunner(peer, replacing.broadcastAddress(), hostId, tokenString);
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingBootstrapReplacingRunner(IInstance peer, InetSocketAddress replacingAddress, UUID hostId, String tokenString)
+    {
+        return addToRingRunner((factory, ignore) -> factory.bootReplacingWithPort(getByAddress(replacingAddress)), new HostInfo(peer, hostId, tokenString));
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingNormalReplacedRunner(IInstance peer, IInstance replaced)
+    {
+        return addToRingNormalReplacedRunner(peer, replaced.broadcastAddress());
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingNormalReplacedRunner(IInstance peer, InetSocketAddress replacedAddress)
+    {
+        return addToRingRunner((factory, ignore) -> factory.bootReplacingWithPort(getByAddress(replacedAddress)), new HostInfo(peer, null, null));
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingLeavingRunner(IInstance peer)
+    {
+        return addToRingRunner(VersionedValue.VersionedValueFactory::leaving, new HostInfo(peer, null, null));
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable addToRingLeftRunner(IInstance peer)
+    {
+        return addToRingRunner((factory, tokens) -> factory.left(tokens, Long.MAX_VALUE), new HostInfo(peer, null, null));
+    }
+
+    public static void removeFromRing(IInstance peer)
+    {
+        removeFromRingRunner(peer).run();
+    }
+
+    public static void addToRingNormal(IInstance peer)
+    {
+        addToRingNormalRunner(peer).run();
+        assert StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(peer.broadcastAddress()));
+    }
+
+    public static void addToRingBootstrapping(IInstance peer)
+    {
+        addToRingBootstrappingRunner(peer).run();
+    }
+
+    public static IIsolatedExecutor.SerializableRunnable markShutdownRunner(InetSocketAddress address)
+    {
+        return () -> {
+            IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+            Gossiper.runInGossipStageBlocking(() -> {
+                EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(getByAddress(address));
+                VersionedValue status = new VersionedValue.VersionedValueFactory(partitioner).shutdown(true);
+                state.addApplicationState(ApplicationState.STATUS, status);
+                state.getHeartBeatState().forceHighestPossibleVersionUnsafe();
+                StorageService.instance.onChange(getByAddress(address), ApplicationState.STATUS, status);
+            });
+        };
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index 91d26f8..8dd5977 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IMessageFilters;
-import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.distributed.impl.UnsafeGossipHelper;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.UUIDGen;
@@ -384,7 +384,7 @@ public class CASTest extends TestBaseImpl
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 
             // make it so {1} is unaware (yet) that {4} is an owner of the token
-            cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+            cluster.get(1).acceptsOnInstance(UnsafeGossipHelper::removeFromRing).accept(cluster.get(4));
 
             int pk = pk(cluster, 1, 2);
 
@@ -396,7 +396,7 @@ public class CASTest extends TestBaseImpl
                     row(true));
 
             for (int i = 1 ; i <= 3 ; ++i)
-                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+                cluster.get(i).acceptsOnInstance(UnsafeGossipHelper::addToRingNormal).accept(cluster.get(4));
 
             // {4} reads from !{2} => {3, 4}
             cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(4).to(2).drop();
@@ -425,7 +425,7 @@ public class CASTest extends TestBaseImpl
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 
             // make it so {1} is unaware (yet) that {4} is an owner of the token
-            cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+            cluster.get(1).acceptsOnInstance(UnsafeGossipHelper::removeFromRing).accept(cluster.get(4));
 
             // {4} promises, accepts and commits on !{2} => {3, 4}
             int pk = pk(cluster, 1, 2);
@@ -466,8 +466,8 @@ public class CASTest extends TestBaseImpl
 
             // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
             for (int i = 1 ; i <= 4 ; ++i)
-                cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
-            cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+                cluster.get(1).acceptsOnInstance(UnsafeGossipHelper::removeFromRing).accept(cluster.get(4));
+            cluster.get(4).acceptsOnInstance(UnsafeGossipHelper::addToRingBootstrapping).accept(cluster.get(4));
 
             int pk = pk(cluster, 1, 2);
 
@@ -480,7 +480,7 @@ public class CASTest extends TestBaseImpl
 
             // finish topology change
             for (int i = 1 ; i <= 4 ; ++i)
-                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+                cluster.get(i).acceptsOnInstance(UnsafeGossipHelper::addToRingNormal).accept(cluster.get(4));
 
             // {3} reads from !{2} => {3, 4}
             cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
@@ -510,8 +510,8 @@ public class CASTest extends TestBaseImpl
 
             // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
             for (int i = 1 ; i <= 4 ; ++i)
-                cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
-            cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+                cluster.get(1).acceptsOnInstance(UnsafeGossipHelper::removeFromRing).accept(cluster.get(4));
+            cluster.get(4).acceptsOnInstance(UnsafeGossipHelper::addToRingBootstrapping).accept(cluster.get(4));
 
             int pk = pk(cluster, 1, 2);
 
@@ -524,7 +524,7 @@ public class CASTest extends TestBaseImpl
 
             // finish topology change
             for (int i = 1 ; i <= 4 ; ++i)
-                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+                cluster.get(i).acceptsOnInstance(UnsafeGossipHelper::addToRingNormal).accept(cluster.get(4));
 
             // {3} reads from !{2} => {3, 4}
             cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
@@ -562,8 +562,8 @@ public class CASTest extends TestBaseImpl
 
             // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
             for (int i = 1 ; i <= 4 ; ++i)
-                cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
-            cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+                cluster.get(1).acceptsOnInstance(UnsafeGossipHelper::removeFromRing).accept(cluster.get(4));
+            cluster.get(4).acceptsOnInstance(UnsafeGossipHelper::addToRingBootstrapping).accept(cluster.get(4));
 
             int pk = pk(cluster, 1, 2);
 
@@ -589,7 +589,7 @@ public class CASTest extends TestBaseImpl
 
             // finish topology change
             for (int i = 1 ; i <= 4 ; ++i)
-                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+                cluster.get(i).acceptsOnInstance(UnsafeGossipHelper::addToRingNormal).accept(cluster.get(4));
 
             // {3} reads from !{2} => {3, 4}
             cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
@@ -626,8 +626,8 @@ public class CASTest extends TestBaseImpl
 
             // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
             for (int i = 1 ; i <= 4 ; ++i)
-                cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
-            cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+                cluster.get(1).acceptsOnInstance(UnsafeGossipHelper::removeFromRing).accept(cluster.get(4));
+            cluster.get(4).acceptsOnInstance(UnsafeGossipHelper::addToRingBootstrapping).accept(cluster.get(4));
 
             int pk = pk(cluster, 1, 2);
 
@@ -653,7 +653,7 @@ public class CASTest extends TestBaseImpl
 
             // finish topology change
             for (int i = 1 ; i <= 4 ; ++i)
-                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+                cluster.get(i).acceptsOnInstance(UnsafeGossipHelper::addToRingNormal).accept(cluster.get(4));
 
             // {3} reads from !{2} => {3, 4}
             cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 316a5d5..6fb5805 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.impl.Instance;
 import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import org.apache.cassandra.distributed.shared.ThrowingRunnable;
 import org.apache.cassandra.distributed.shared.Versions;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -108,7 +109,7 @@ public class UpgradeTestBase extends DistributedTestBase
         }
     }
 
-    public static class TestCase implements Instance.ThrowingRunnable
+    public static class TestCase implements ThrowingRunnable
     {
         private final Versions versions;
         private final List<TestVersions> upgrade = new ArrayList<>();
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
index bbe06a8..7c9c7d4 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.distributed.impl.IsolatedExecutor;
+import org.apache.cassandra.distributed.shared.ThrowingRunnable;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -610,7 +611,7 @@ public class RangeTombstoneListTest
         }
     }
 
-    private void assertHasException(IsolatedExecutor.ThrowingRunnable block, Consumer<Throwable> verifier)
+    private void assertHasException(ThrowingRunnable block, Consumer<Throwable> verifier)
     {
         try
         {
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/ImmediateFutureTest.java b/test/unit/org/apache/cassandra/utils/concurrent/ImmediateFutureTest.java
index b042a28..f1ad6d5 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/ImmediateFutureTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/ImmediateFutureTest.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.impl.IsolatedExecutor;
+import org.apache.cassandra.distributed.shared.ThrowingRunnable;
 
 public class ImmediateFutureTest
 {
@@ -88,7 +89,7 @@ public class ImmediateFutureTest
         testSimple(p, isCancelled);
     }
 
-    private static void assertFailure(IsolatedExecutor.ThrowingRunnable run, Predicate<Throwable> test)
+    private static void assertFailure(ThrowingRunnable run, Predicate<Throwable> test)
     {
         Throwable failure = null;
         try

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org