You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/20 15:49:26 UTC

[cassandra-in-jvm-dtest-api] 01/03: Introduce the extracted in-JVM DTest API

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to annotated tag 0.0.1-1
in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git

commit 10a3729e259c47101da2c53505177f5159e603da
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Feb 24 12:06:09 2020 +0100

    Introduce the extracted in-JVM DTest API
    
    Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-15539.
---
 pom.xml                                            |  94 +++++++
 .../distributed/api/ConsistencyLevel.java          |  34 +++
 .../apache/cassandra/distributed/api/Feature.java  |  24 ++
 .../apache/cassandra/distributed/api/ICluster.java |  41 +++
 .../cassandra/distributed/api/ICoordinator.java    |  34 +++
 .../cassandra/distributed/api/IInstance.java       |  66 +++++
 .../cassandra/distributed/api/IInstanceConfig.java | 101 +++++++
 .../distributed/api/IInvokableInstance.java        |  67 +++++
 .../distributed/api/IIsolatedExecutor.java         | 128 +++++++++
 .../apache/cassandra/distributed/api/IListen.java  |  28 ++
 .../apache/cassandra/distributed/api/IMessage.java |  38 +++
 .../cassandra/distributed/api/IMessageFilters.java |  56 ++++
 .../distributed/api/IUpgradeableInstance.java      |  29 +++
 .../cassandra/distributed/api/TokenSupplier.java   |  32 +++
 .../cassandra/distributed/shared/Builder.java      | 233 +++++++++++++++++
 .../distributed/shared/DistributedTestBase.java    | 205 +++++++++++++++
 .../distributed/shared/InstanceClassLoader.java    | 116 +++++++++
 .../distributed/shared/MessageFilters.java         | 165 ++++++++++++
 .../distributed/shared/MessageFiltersTest.java     | 113 ++++++++
 .../distributed/shared/NetworkTopology.java        | 216 +++++++++++++++
 .../distributed/shared/ThrowingRunnable.java       |  38 +++
 .../cassandra/distributed/shared/Versions.java     | 201 ++++++++++++++
 .../cassandra/distributed/test/BootstrapTest.java  | 104 ++++++++
 .../test/DistributedReadWritePathTest.java         | 290 +++++++++++++++++++++
 .../distributed/test/GossipSettlesTest.java        |  43 +++
 .../distributed/test/LargeColumnTest.java          |  96 +++++++
 .../distributed/test/NativeProtocolTest.java       |  79 ++++++
 .../distributed/test/NetworkTopologyTest.java      |  98 +++++++
 .../distributed/test/SimpleReadWriteTest.java      | 270 +++++++++++++++++++
 test/conf/logback-dtest.xml                        |  77 ++++++
 30 files changed, 3116 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..4b92a62
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>apache</artifactId>
+        <version>10</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.cassandra</groupId>
+    <artifactId>dtest-api</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>In JVM Test API</name>
+    <description>In JVM Test API</description>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cassandra</groupId>
+            <artifactId>in-jvm-dtest-cassandra-tryout</artifactId>
+            <version>0.0.7-3.11-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.0</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+
+
+            <plugin>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>2.8.2</version>
+                <executions>
+                    <execution>
+                        <id>default-deploy</id>
+                        <phase>deploy</phase>
+                        <goals>
+                            <goal>deploy</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-gpg-plugin</artifactId>
+                <version>1.5</version>
+                <executions>
+                    <execution>
+                        <id>sign-artifacts</id>
+                        <phase>verify</phase>
+                        <goals>
+                            <goal>sign</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+
+    <scm>
+        <connection>scm:git:git@github.com:apache/cassandra-in-jvm-dtests.git</connection>
+        <developerConnection>scm:git:git@github.com:apache/cassandra-in-jvm-dtests.git</developerConnection>
+        <url>git@github.com:apache/cassandra-in-jvm-dtests.git</url>
+        <tag>HEAD</tag>
+    </scm>
+</project>
+
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
new file mode 100644
index 0000000..3c057f8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public enum  ConsistencyLevel {
+    ANY,
+    ONE,
+    TWO,
+    THREE,
+    QUORUM,
+    ALL,
+    LOCAL_QUORUM,
+    EACH_QUORUM,
+    SERIAL,
+    LOCAL_SERIAL,
+    LOCAL_ONE,
+    NODE_LOCAL
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Feature.java b/src/main/java/org/apache/cassandra/distributed/api/Feature.java
new file mode 100644
index 0000000..b4ba036
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/Feature.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public enum Feature
+{
+    NETWORK, GOSSIP, NATIVE_PROTOCOL
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
new file mode 100644
index 0000000..6d86e99
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import java.util.stream.Stream;
+
+public interface ICluster<I extends IInstance> extends AutoCloseable
+{
+    void startup();
+    I bootstrap(IInstanceConfig config);
+    I get(int i);
+    I get(NetworkTopology.AddressAndPort endpoint);
+    ICoordinator coordinator(int node);
+    void schemaChange(String query);
+    void schemaChange(String statement, int instance);
+
+    int size();
+
+    Stream<I> stream();
+    Stream<I> stream(String dcName);
+    Stream<I> stream(String dcName, String rackName);
+    IMessageFilters filters();
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
new file mode 100644
index 0000000..da17df6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+// The cross-version API requires that a Coordinator can be constructed without any constructor arguments
+public interface ICoordinator
+{
+    Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+    Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);
+
+    Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+    Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+    IInstance instance();
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
new file mode 100644
index 0000000..dec3549
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+// The cross-version API requires that an Instance has a constructor signature of (IInstanceConfig, ClassLoader)
+public interface IInstance extends IIsolatedExecutor
+{
+    ICoordinator coordinator();
+    IListen listen();
+
+    void schemaChangeInternal(String query);
+    public Object[][] executeInternal(String query, Object... args);
+
+    IInstanceConfig config();
+    NetworkTopology.AddressAndPort broadcastAddress();
+    UUID schemaVersion();
+
+    void startup();
+    boolean isShutdown();
+    Future<Void> shutdown();
+    Future<Void> shutdown(boolean graceful);
+
+    int liveMemberCount();
+
+    int nodetool(String... commandAndArgs);
+    void uncaughtException(Thread t, Throwable e);
+
+    /**
+     * Return the number of times the instance tried to call {@link System#exit(int)}.
+     *
+     * When the instance is shutdown, this state should be saved, but in case not possible should return {@code -1}
+     * to indicate "unknown".
+     */
+    long killAttempts();
+
+    // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
+    void startup(ICluster cluster);
+    void receiveMessage(IMessage message);
+
+    int getMessagingVersion();
+    void setMessagingVersion(NetworkTopology.AddressAndPort addressAndPort, int version);
+
+    void flush(String keyspace);
+    void forceCompact(String keyspace, String table);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
new file mode 100644
index 0000000..05969f8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.Function;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
+
+public interface IInstanceConfig
+{
+    IInstanceConfig with(Feature featureFlag);
+    IInstanceConfig with(Feature... flags);
+
+    int num();
+    UUID hostId();
+    NetworkTopology.AddressAndPort broadcastAddress();
+    NetworkTopology networkTopology();
+
+    String localRack();
+    String localDatacenter();
+
+    /**
+     * write the specified parameters to the Config object; we do not specify Config as the type to support a Config
+     * from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but
+     * must use the reflection API to modify the state
+     */
+    void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>>executor);
+
+    /**
+     * Validates whether the config properties are within range of accepted values.
+     */
+    void validate();
+    IInstanceConfig set(String fieldName, Object value);
+    Object get(String fieldName);
+    String getString(String fieldName);
+    int getInt(String fieldName);
+    boolean has(Feature featureFlag);
+
+    public IInstanceConfig forVersion(Versions.Major major);
+
+    public static class ParameterizedClass
+    {
+        public static final String CLASS_NAME = "class_name";
+        public static final String PARAMETERS = "parameters";
+
+        public String class_name;
+        public Map<String, String> parameters;
+
+        public ParameterizedClass(String class_name, Map<String, String> parameters)
+        {
+            this.class_name = class_name;
+            this.parameters = parameters;
+        }
+
+        @SuppressWarnings("unchecked")
+        public ParameterizedClass(Map<String, ?> p)
+        {
+            this((String)p.get(CLASS_NAME),
+                 p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
+        }
+
+        @Override
+        public boolean equals(Object that)
+        {
+            return that instanceof ParameterizedClass && equals((ParameterizedClass) that);
+        }
+
+        public boolean equals(ParameterizedClass that)
+        {
+            return Objects.equals(class_name, that.class_name) && Objects.equals(parameters, that.parameters);
+        }
+
+        @Override
+        public String toString()
+        {
+            return class_name + (parameters == null ? "" : parameters.toString());
+        }
+    }
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
new file mode 100644
index 0000000..1dbc4cc
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.Serializable;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.cassandra.distributed.api.IInstance;
+
+/**
+ * This version is only supported for a Cluster running the same code as the test environment, and permits
+ * ergonomic cross-node behaviours, without editing the cross-version API.
+ *
+ * A lambda can be written tto be invoked on any or all of the nodes.
+ *
+ * The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will
+ * not be the same in the alternate version.  Even were it not, there would likely be a runtime linkage error given
+ * any code divergence.
+ */
+public interface IInvokableInstance extends IInstance
+{
+    public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
+    public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
+    public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }
+
+    public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
+    public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
+    public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
+
+    public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
+    public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
+
+    public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
+    public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
+
+    public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
+    public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
+
+    public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
+    public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
+
+    public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
+    public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
+
+    public <E extends Serializable> E transfer(E object);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
new file mode 100644
index 0000000..d811b17
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Represents a clean way to handoff evaluation of some work to an executor associated
+ * with a node's lifetime.
+ *
+ * There is no transfer of execution to the parallel class hierarchy.
+ *
+ * Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class
+ * to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself.
+ * Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary.
+ */
+public interface IIsolatedExecutor
+{
+    public interface CallableNoExcept<O> extends Callable<O> { public O call(); }
+    public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { }
+    public interface SerializableRunnable extends Runnable, Serializable {}
+    public interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
+    public interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
+    public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
+    public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
+    public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
+    public interface TriFunction<I1, I2, I3, O>
+    {
+        O apply(I1 i1, I2 i2, I3 i3);
+    }
+    public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { }
+
+    Future<Void> shutdown();
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <O> CallableNoExcept<O> sync(CallableNoExcept<O> call);
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    CallableNoExcept<Future<?>> async(Runnable run);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    Runnable sync(Runnable run);
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    <I> Function<I, Future<?>> async(Consumer<I> consumer);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I> Consumer<I> sync(Consumer<I> consumer);
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    <I1, I2> BiFunction<I1, I2, Future<?>> async(BiConsumer<I1, I2> consumer);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer);
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    <I, O> Function<I, Future<O>> async(Function<I, O> f);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I, O> Function<I, O> sync(Function<I, O> f);
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    <I1, I2, O> BiFunction<I1, I2, Future<O>> async(BiFunction<I1, I2, O> f);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I1, I2, O> BiFunction<I1, I2, O> sync(BiFunction<I1, I2, O> f);
+
+    /**
+     * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+     */
+    <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> async(TriFunction<I1, I2, I3, O> f);
+
+    /**
+     * Convert the execution to one performed synchronously on the IsolatedExecutor
+     */
+    <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IListen.java b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
new file mode 100644
index 0000000..c2e8dd6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface IListen
+{
+    public interface Cancel { void cancel(); }
+
+    Cancel schema(Runnable onChange);
+
+    Cancel liveMembers(Runnable onChange);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
new file mode 100644
index 0000000..da536cc
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import java.io.Serializable;
+
+/**
+ * A cross-version interface for delivering internode messages via message sinks.
+ *
+ * Message implementations should be serializable so we could load into instances.
+ */
+public interface IMessage extends Serializable
+{
+    int verb();
+    byte[] bytes();
+    // TODO: need to make this a long
+    int id();
+    int version();
+    NetworkTopology.AddressAndPort from();
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
new file mode 100644
index 0000000..01fe972
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface IMessageFilters
+{
+    public interface Filter
+    {
+        Filter off();
+        Filter on();
+    }
+
+    public interface Builder
+    {
+        Builder from(int ... nums);
+        Builder to(int ... nums);
+
+        /**
+         * Every message for which matcher returns `true` will be _dropped_ (assuming all
+         * other matchers in the chain will return `true` as well).
+         */
+        Builder messagesMatching(Matcher filter);
+        Filter drop();
+    }
+
+    public interface Matcher
+    {
+        boolean matches(int from, int to, IMessage message);
+    }
+
+    Builder verbs(int... verbs);
+    Builder allVerbs();
+    void reset();
+
+    /**
+     * {@code true} value returned by the implementation implies that the message was
+     * not matched by any filters and therefore should be delivered.
+     */
+    boolean permit(int from, int to, IMessage msg);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
new file mode 100644
index 0000000..da864e0
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.shared.Versions;
+
+// this lives outside the api package so that we do not have to worry about inter-version compatibility
+public interface IUpgradeableInstance extends IInstance
+{
+    // only to be invoked while the node is shutdown!
+    public void setVersion(Versions.Version version);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
new file mode 100644
index 0000000..a714cd5
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface TokenSupplier {
+    long token(int nodeId);
+
+    static TokenSupplier evenlyDistributedTokens(int numNodes) {
+        long increment = (Long.MAX_VALUE / numNodes) * 2;
+        return (int nodeId) -> {
+            assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
+                                                      nodeId, numNodes);
+            return Long.MIN_VALUE + 1 + nodeId * increment;
+        };
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
new file mode 100644
index 0000000..6f6adfb
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
+
+public abstract class Builder<I extends IInstance, C extends ICluster> {
+
+    private final int BROADCAST_PORT = 7012;
+
+    public interface Factory<I extends IInstance, C extends ICluster> {
+        C newCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader);
+    }
+
+    private final Factory<I, C> factory;
+    private int nodeCount;
+    private int subnet;
+    private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
+    private TokenSupplier tokenSupplier;
+    private File root;
+    private Versions.Version version;
+    private Consumer<IInstanceConfig> configUpdater;
+
+    public Builder(Factory<I, C> factory) {
+        this.factory = factory;
+    }
+
+    public C start() throws IOException {
+        C cluster = createWithoutStarting();
+        cluster.startup();
+        return cluster;
+    }
+
+    public C createWithoutStarting() throws IOException {
+        if (root == null)
+            root = Files.createTempDirectory("dtests").toFile();
+
+        if (nodeCount <= 0)
+            throw new IllegalStateException("Cluster must have at least one node");
+
+        if (nodeIdTopology == null) {
+            nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
+                    .collect(Collectors.toMap(nodeId -> nodeId,
+                                              nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
+        }
+
+        root.mkdirs();
+
+        ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
+
+        List<IInstanceConfig> configs = new ArrayList<>();
+
+        // TODO: make token allocation strategy configurable
+        if (tokenSupplier == null)
+            tokenSupplier = evenlyDistributedTokens(nodeCount);
+
+        for (int i = 0; i < nodeCount; ++i) {
+            int nodeNum = i + 1;
+            configs.add(createInstanceConfig(nodeNum));
+        }
+
+        return factory.newCluster(root, version, configs, sharedClassLoader);
+    }
+
+    public IInstanceConfig newInstanceConfig(C cluster) {
+        return createInstanceConfig(cluster.size() + 1);
+    }
+
+    protected IInstanceConfig createInstanceConfig(int nodeNum) {
+        String ipPrefix = "127.0." + subnet + ".";
+        String seedIp = ipPrefix + "1";
+        String ipAddress = ipPrefix + nodeNum;
+        long token = tokenSupplier.token(nodeNum);
+
+        NetworkTopology topology = NetworkTopology.build(ipPrefix, BROADCAST_PORT, nodeIdTopology);
+
+        IInstanceConfig config = generateConfig(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
+        if (configUpdater != null)
+            configUpdater.accept(config);
+
+        return config;
+    }
+
+    protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp);
+
+    public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) {
+        this.tokenSupplier = tokenSupplier;
+        return this;
+    }
+
+    public Builder<I, C> withSubnet(int subnet) {
+        this.subnet = subnet;
+        return this;
+    }
+
+    public Builder<I, C> withNodes(int nodeCount) {
+        this.nodeCount = nodeCount;
+        return this;
+    }
+
+    public Builder<I, C> withDCs(int dcCount) {
+        return withRacks(dcCount, 1);
+    }
+
+    public Builder<I, C> withRacks(int dcCount, int racksPerDC) {
+        if (nodeCount == 0)
+            throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
+
+        int totalRacks = dcCount * racksPerDC;
+        int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
+        return withRacks(dcCount, racksPerDC, nodesPerRack);
+    }
+
+    public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack) {
+        if (nodeIdTopology != null)
+            throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
+
+        nodeIdTopology = new HashMap<>();
+        int nodeId = 1;
+        for (int dc = 1; dc <= dcCount; dc++) {
+            for (int rack = 1; rack <= racksPerDC; rack++) {
+                for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
+                    nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
+            }
+        }
+        // adjust the node count to match the allocatation
+        final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
+        if (adjustedNodeCount != nodeCount) {
+            assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
+            System.out.println(String.format("Network topology of %s DCs with %s racks per DC and %s nodes per rack required increasing total nodes to %s",
+                                             dcCount, racksPerDC, nodesPerRack, adjustedNodeCount));
+            nodeCount = adjustedNodeCount;
+        }
+        return this;
+    }
+
+    public Builder<I, C> withDC(String dcName, int nodeCount) {
+        return withRack(dcName, rackName(1), nodeCount);
+    }
+
+    public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack) {
+        if (nodeIdTopology == null) {
+            if (nodeCount > 0)
+                throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
+
+            nodeIdTopology = new HashMap<>();
+        }
+        for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
+            nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
+
+        nodeCount += nodesInRack;
+        return this;
+    }
+
+    // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
+    public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) {
+        if (nodeIdTopology.isEmpty())
+            throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
+
+        IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
+            if (nodeIdTopology.get(nodeId) == null)
+                throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
+        });
+
+        if (nodeCount != nodeIdTopology.size()) {
+            nodeCount = nodeIdTopology.size();
+            System.out.println(String.format("Adjusting node count to %s for supplied network topology", nodeCount));
+        }
+
+        this.nodeIdTopology = new HashMap<>(nodeIdTopology);
+
+        return this;
+    }
+
+    public Builder<I, C> withRoot(File root) {
+        this.root = root;
+        return this;
+    }
+
+    public Builder<I, C> withVersion(Versions.Version version) {
+        this.version = version;
+        return this;
+    }
+
+    public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater) {
+        this.configUpdater = updater;
+        return this;
+    }
+
+    static String dcName(int index)
+    {
+        return "datacenter" + index;
+    }
+
+    static String rackName(int index)
+    {
+        return "rack" + index;
+    }
+}
+
+
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
new file mode 100644
index 0000000..89dc0cd
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class DistributedTestBase
+{
+    @After
+    public void afterEach()
+    {
+        System.runFinalization();
+        System.gc();
+    }
+
+    public abstract <I extends IInstance, C extends ICluster> Builder<I, C> builder();
+
+    public static String KEYSPACE = "distributed_test_keyspace";
+
+    // TODO: move this to Cluster.java?
+    public static void nativeLibraryWorkaround()
+    {
+        // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
+        // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
+        // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
+        System.setProperty("cassandra.disable_tcactive_openssl", "true");
+        System.setProperty("io.netty.transport.noNative", "true");
+    }
+
+    public static void processReaperWorkaround() throws Throwable {
+        // Make sure the 'process reaper' thread is initially created under the main classloader,
+        // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
+        // which prevents it from being garbage collected.
+        new ProcessBuilder().command("true").start().waitFor();
+    }
+
+    public static void setupLogging()
+    {
+        try
+        {
+            File root = Files.createTempDirectory("in-jvm-dtest").toFile();
+            root.deleteOnExit();
+            String testConfPath = "test/conf/logback-dtest.xml";
+            Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
+
+            if (!logConfPath.toFile().exists())
+            {
+                Files.copy(new File(testConfPath).toPath(),
+                           logConfPath);
+            }
+
+            System.setProperty("logback.configurationFile", "file://" + logConfPath);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @BeforeClass
+    public static void setup() throws Throwable {
+        setupLogging();
+        System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000));
+        System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+        nativeLibraryWorkaround();
+        processReaperWorkaround();
+    }
+
+    public static String withKeyspace(String replaceIn)
+    {
+        return String.format(replaceIn, KEYSPACE);
+    }
+
+    protected static <C extends ICluster<?>> C init(C cluster)
+    {
+        return init(cluster, cluster.size());
+    }
+
+    protected static <C extends ICluster<?>> C init(C cluster, int replicationFactor)
+    {
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
+        return cluster;
+    }
+
+    public static void assertRows(Object[][] actual, Object[]... expected)
+    {
+        Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
+                            expected.length, actual.length);
+
+        for (int i = 0; i < expected.length; i++)
+        {
+            Object[] expectedRow = expected[i];
+            Object[] actualRow = actual[i];
+            Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected),
+                              Arrays.equals(expectedRow, actualRow));
+        }
+    }
+
+    public static void assertRow(Object[] actual, Object... expected)
+    {
+        Assert.assertTrue(rowNotEqualErrorMessage(actual, expected),
+                          Arrays.equals(actual, expected));
+    }
+
+    public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected)
+    {
+        while (actual.hasNext() && expected.hasNext())
+            assertRow(actual.next(), expected.next());
+
+        Assert.assertEquals("Resultsets have different sizes", actual.hasNext(), expected.hasNext());
+    }
+
+    public static void assertRows(Iterator<Object[]> actual, Object[]... expected)
+    {
+        assertRows(actual, new Iterator<Object[]>() {
+
+            int i = 0;
+            @Override
+            public boolean hasNext() {
+                return i < expected.length;
+            }
+
+            @Override
+            public Object[] next() {
+                return expected[i++];
+            }
+        });
+    }
+
+    public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected)
+    {
+        return String.format("Expected: %s\nActual:%s\n",
+                             Arrays.toString(expected),
+                             Arrays.toString(actual));
+    }
+
+    public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
+    {
+        return String.format("Expected: %s\nActual: %s\n",
+                             rowsToString(expected),
+                             rowsToString(actual));
+    }
+
+    public static String rowsToString(Object[][] rows)
+    {
+        StringBuilder builder = new StringBuilder();
+        builder.append("[");
+        boolean isFirst = true;
+        for (Object[] row : rows)
+        {
+            if (isFirst)
+                isFirst = false;
+            else
+                builder.append(",");
+            builder.append(Arrays.toString(row));
+        }
+        builder.append("]");
+        return builder.toString();
+    }
+
+    public static Object[][] toObjectArray(Iterator<Object[]> iter)
+    {
+        List<Object[]> res = new ArrayList<>();
+        while (iter.hasNext())
+            res.add(iter.next());
+
+        return res.toArray(new Object[res.size()][]);
+    }
+
+    public static Object[] row(Object... expected)
+    {
+        return expected;
+    }
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
new file mode 100644
index 0000000..c37a520
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.function.Predicate;
+
+public class InstanceClassLoader extends URLClassLoader
+{
+    private static final Predicate<String> sharePackage = name ->
+               name.startsWith("org.apache.cassandra.distributed.api.")
+            || name.startsWith("org.apache.cassandra.distributed.shared.")
+            || name.startsWith("sun.")
+            || name.startsWith("oracle.")
+            || name.startsWith("com.intellij.")
+            || name.startsWith("com.sun.")
+            || name.startsWith("com.oracle.")
+            || name.startsWith("java.")
+            || name.startsWith("javax.")
+            || name.startsWith("jdk.")
+            || name.startsWith("netscape.")
+            || name.startsWith("org.xml.sax.");
+
+    private volatile boolean isClosed = false;
+    private final URL[] urls;
+    private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected
+    private final int id;
+    private final ClassLoader sharedClassLoader;
+
+    public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
+    {
+        super(urls, null);
+        this.urls = urls;
+        this.sharedClassLoader = sharedClassLoader;
+        this.generation = generation;
+        this.id = id;
+    }
+
+    public int getClusterGeneration()
+    {
+        return generation;
+    }
+
+    public int getInstanceId()
+    {
+        return id;
+    }
+
+    @Override
+    public Class<?> loadClass(String name) throws ClassNotFoundException
+    {
+        if (sharePackage.test(name))
+            return sharedClassLoader.loadClass(name);
+
+        return loadClassInternal(name);
+    }
+
+    Class<?> loadClassInternal(String name) throws ClassNotFoundException
+    {
+        if (isClosed)
+            throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name));
+
+        synchronized (getClassLoadingLock(name))
+        {
+            // First, check if the class has already been loaded
+            Class<?> c = findLoadedClass(name);
+
+            if (c == null)
+                c = findClass(name);
+
+            return c;
+        }
+    }
+
+    /**
+     * @return true iff this class was loaded by an InstanceClassLoader, and as such is used by a dtest node
+     */
+    public static boolean wasLoadedByAnInstanceClassLoader(Class<?> clazz)
+    {
+        return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
+    }
+
+    public String toString()
+    {
+        return "InstanceClassLoader{" +
+               "generation=" + generation +
+               ", id = " + id +
+               ", urls=" + Arrays.toString(urls) +
+               '}';
+    }
+
+    public void close() throws IOException
+    {
+        isClosed = true;
+        super.close();
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
new file mode 100644
index 0000000..57b8f57
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+
+public class MessageFilters implements IMessageFilters
+{
+    private final List<Filter> filters = new CopyOnWriteArrayList<>();
+
+    public boolean permit(int from, int to, IMessage msg)
+    {
+        for (Filter filter : filters)
+        {
+            if (filter.matches(from, to, msg))
+                return false;
+        }
+        return true;
+    }
+
+    public class Filter implements IMessageFilters.Filter
+    {
+        final int[] from;
+        final int[] to;
+        final int[] verbs;
+        final Matcher matcher;
+
+        Filter(int[] from, int[] to, int[] verbs, Matcher matcher)
+        {
+            if (from != null)
+            {
+                from = from.clone();
+                Arrays.sort(from);
+            }
+            if (to != null)
+            {
+                to = to.clone();
+                Arrays.sort(to);
+            }
+            if (verbs != null)
+            {
+                verbs = verbs.clone();
+                Arrays.sort(verbs);
+            }
+            this.from = from;
+            this.to = to;
+            this.verbs = verbs;
+            this.matcher = matcher;
+        }
+
+        public int hashCode()
+        {
+            return (from == null ? 0 : Arrays.hashCode(from))
+                   + (to == null ? 0 : Arrays.hashCode(to))
+                   + (verbs == null ? 0 : Arrays.hashCode(verbs));
+        }
+
+        public boolean equals(Object that)
+        {
+            return that instanceof Filter && equals((Filter) that);
+        }
+
+        public boolean equals(Filter that)
+        {
+            return Arrays.equals(from, that.from)
+                   && Arrays.equals(to, that.to)
+                   && Arrays.equals(verbs, that.verbs);
+        }
+
+        public Filter off()
+        {
+            filters.remove(this);
+            return this;
+        }
+
+        public Filter on()
+        {
+            filters.add(this);
+            return this;
+        }
+
+        public boolean matches(int from, int to, IMessage msg)
+        {
+            return (this.from == null || Arrays.binarySearch(this.from, from) >= 0)
+                   && (this.to == null || Arrays.binarySearch(this.to, to) >= 0)
+                   && (this.verbs == null || Arrays.binarySearch(this.verbs, msg.verb()) >= 0)
+                   && (this.matcher == null || this.matcher.matches(from, to, msg));
+        }
+    }
+
+    public class Builder implements IMessageFilters.Builder
+    {
+        int[] from;
+        int[] to;
+        int[] verbs;
+        Matcher matcher;
+
+        private Builder(int[] verbs)
+        {
+            this.verbs = verbs;
+        }
+
+        public Builder from(int... nums)
+        {
+            from = nums;
+            return this;
+        }
+
+        public Builder to(int... nums)
+        {
+            to = nums;
+            return this;
+        }
+
+        public IMessageFilters.Builder messagesMatching(Matcher matcher)
+        {
+            this.matcher = matcher;
+            return this;
+        }
+
+        public Filter drop()
+        {
+            return new Filter(from, to, verbs, matcher).on();
+        }
+    }
+
+
+    public Builder verbs(int... verbs)
+    {
+        return new Builder(verbs);
+    }
+
+    @Override
+    public Builder allVerbs()
+    {
+        return new Builder(null);
+    }
+
+    @Override
+    public void reset()
+    {
+        filters.clear();
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java
new file mode 100644
index 0000000..6eedb16
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.IMessage;
+
+public class MessageFiltersTest
+{
+    @Test
+    public void simpleFiltersTest() throws Throwable
+    {
+        int VERB1 = 1;
+        int VERB2 = 2;
+        int VERB3 = 3;
+        int i1 = 1;
+        int i2 = 2;
+        int i3 = 3;
+        String MSG1 = "msg1";
+        String MSG2 = "msg2";
+
+        MessageFilters filters = new MessageFilters();
+        MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+
+        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
+        Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
+        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+        filter.off();
+        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        filters.reset();
+
+        filters.verbs(VERB1).from(1).to(2).drop();
+        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
+        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+
+        filters.reset();
+        AtomicInteger counter = new AtomicInteger();
+        filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> {
+            counter.incrementAndGet();
+            return Arrays.equals(msg.bytes(), MSG1.getBytes());
+        }).drop();
+        Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertEquals(counter.get(), 1);
+        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+        Assert.assertEquals(counter.get(), 2);
+
+        // filter chain gets interrupted because a higher level filter returns no match
+        Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertEquals(counter.get(), 2);
+        Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+        Assert.assertEquals(counter.get(), 2);
+        filters.reset();
+
+        filters.allVerbs().from(3, 2).to(2, 1).drop();
+        Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
+        Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
+        Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
+        Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
+        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+        filters.reset();
+
+        counter.set(0);
+        filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+            counter.incrementAndGet();
+            return false;
+        }).drop();
+        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+        Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+        Assert.assertEquals(2, counter.get());
+    }
+
+    IMessage msg(int verb, String msg)
+    {
+        return new IMessage()
+        {
+            public int verb() { return verb; }
+            public byte[] bytes() { return msg.getBytes(); }
+            public int id() { return 0; }
+            public int version() { return 0;  }
+            public NetworkTopology.AddressAndPort from() { return null; }
+            public int fromPort()
+            {
+                return 0;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
new file mode 100644
index 0000000..9a8e8f6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class NetworkTopology
+{
+    private final Map<AddressAndPort, DcAndRack> map;
+
+    public static class DcAndRack
+    {
+        private final String dc;
+        private final String rack;
+
+        private DcAndRack(String dc, String rack)
+        {
+            this.dc = dc;
+            this.rack = rack;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "DcAndRack{" +
+                   "dc='" + dc + '\'' +
+                   ", rack='" + rack + '\'' +
+                   '}';
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            DcAndRack dcAndRack = (DcAndRack) o;
+            return Objects.equals(dc, dcAndRack.dc) &&
+                   Objects.equals(rack, dcAndRack.rack);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(dc, rack);
+        }
+    }
+
+    public static class AddressAndPort implements Serializable
+    {
+        private final InetAddress address;
+        private final int port;
+
+        public AddressAndPort(InetAddress address, int port)
+        {
+            this.address = address;
+            this.port = port;
+        }
+
+        public int getPort()
+        {
+            return port;
+        }
+
+        public InetAddress getAddress()
+        {
+            return address;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "AddressAndPort{" +
+                   "address=" + address +
+                   ", port=" + port +
+                   '}';
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            AddressAndPort that = (AddressAndPort) o;
+            return port == that.port &&
+                   Objects.equals(address, that.address);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(address, port);
+        }
+    }
+
+    public static DcAndRack dcAndRack(String dc, String rack)
+    {
+        return new DcAndRack(dc, rack);
+    }
+
+    public static AddressAndPort addressAndPort(InetAddress address, int port)
+    {
+        return new AddressAndPort(address, port);
+    }
+
+    public static AddressAndPort addressAndPort(String address, int port)
+    {
+        try
+        {
+            return new AddressAndPort(InetAddress.getByName(address), port);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new IllegalArgumentException("Unknown address '" + address + '\'');
+        }
+    }
+
+    private NetworkTopology()
+    {
+        map = new HashMap<>();
+    }
+
+    @SuppressWarnings("WeakerAccess")
+    public NetworkTopology(NetworkTopology networkTopology)
+    {
+        map = new HashMap<>(networkTopology.map);
+    }
+
+    public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology)
+    {
+        final NetworkTopology topology = new NetworkTopology();
+
+        for (int nodeId = 1; nodeId <= nodeIdTopology.size(); nodeId++)
+        {
+            String broadcastAddress = ipPrefix + nodeId;
+
+            DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
+            if (dcAndRack == null)
+                throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap");
+
+            topology.put(addressAndPort(broadcastAddress, broadcastPort), dcAndRack);
+        }
+        return topology;
+    }
+
+    public DcAndRack put(AddressAndPort addressAndPort, DcAndRack value)
+    {
+        return map.put(addressAndPort, value);
+    }
+
+    public String localRack(NetworkTopology.AddressAndPort key)
+    {
+        DcAndRack p = map.get(key);
+        if (p == null)
+            return null;
+        return p.rack;
+    }
+
+    public String localDC(NetworkTopology.AddressAndPort key)
+    {
+        DcAndRack p = map.get(key);
+        if (p == null)
+            return null;
+        return p.dc;
+    }
+
+    public boolean contains(NetworkTopology.AddressAndPort key)
+    {
+        return map.containsKey(key);
+    }
+
+    public String toString()
+    {
+        return "NetworkTopology{" + map + '}';
+    }
+
+
+    public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount,
+                                                                                  String dc,
+                                                                                  String rack)
+    {
+        return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack));
+    }
+
+    public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount,
+                                                                          IntFunction<DcAndRack> dcAndRackSupplier)
+    {
+
+        return IntStream.rangeClosed(1, nodeCount).boxed()
+                        .collect(Collectors.toMap(nodeId -> nodeId,
+                                                  dcAndRackSupplier::apply));
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
new file mode 100644
index 0000000..01f8d29
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+public interface ThrowingRunnable
+{
+    public void run() throws Throwable;
+
+    public static Runnable toRunnable(ThrowingRunnable runnable)
+    {
+        return () -> {
+            try
+            {
+                runnable.run();
+            }
+            catch (Throwable throwable)
+            {
+                throw new RuntimeException(throwable);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
new file mode 100644
index 0000000..30c0c7c
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class Versions
+{
+    public static final String PROPERTY_PREFIX = "cassandra.";
+
+    public static URL[] getClassPath()
+    {
+        // In Java 9 the default system ClassLoader got changed from URLClassLoader to AppClassLoader which
+        // does not extend URLClassLoader nor does it give access to the class path array.
+        // Java requires the system property 'java.class.path' to be setup with the classpath seperated by :
+        // so this logic parses that string into the URL[] needed to build later
+        String cp = System.getProperty("java.class.path");
+        assert cp != null && !cp.isEmpty();
+        String[] split = cp.split(File.pathSeparator);
+        assert split.length > 0;
+        URL[] urls = new URL[split.length];
+        try
+        {
+            for (int i = 0; i < split.length; i++)
+                urls[i] = Paths.get(split[i]).toUri().toURL();
+        }
+        catch (MalformedURLException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return urls;
+    }
+
+    public enum Major
+    {
+        v22("2\\.2\\.([0-9]+)"),
+        v30("3\\.0\\.([0-9]+)"),
+        v3X("3\\.([1-9]|1[01])(\\.([0-9]+))?"),
+        v4("4\\.([0-9]+)");
+        final Pattern pattern;
+        Major(String verify)
+        {
+            this.pattern = Pattern.compile(verify);
+        }
+
+        static Major fromFull(String version)
+        {
+            if (version.isEmpty())
+                throw new IllegalArgumentException(version);
+            switch (version.charAt(0))
+            {
+                case '2':
+                    if (version.startsWith("2.2"))
+                        return v22;
+                    throw new IllegalArgumentException(version);
+                case '3':
+                    if (version.startsWith("3.0"))
+                        return v30;
+                    return v3X;
+                case '4':
+                    return v4;
+                default:
+                    throw new IllegalArgumentException(version);
+            }
+        }
+
+        // verify that the version string is valid for this major version
+        boolean verify(String version)
+        {
+            return pattern.matcher(version).matches();
+        }
+
+        // sort two strings of the same major version as this enum instance
+        int compare(String a, String b)
+        {
+            Matcher ma = pattern.matcher(a);
+            Matcher mb = pattern.matcher(a);
+            if (!ma.matches()) throw new IllegalArgumentException(a);
+            if (!mb.matches()) throw new IllegalArgumentException(b);
+            int result = Integer.compare(Integer.parseInt(ma.group(1)), Integer.parseInt(mb.group(1)));
+            if (result == 0 && this == v3X && (ma.group(3) != null || mb.group(3) != null))
+            {
+                if (ma.group(3) != null && mb.group(3) != null)
+                {
+                    result = Integer.compare(Integer.parseInt(ma.group(3)), Integer.parseInt(mb.group(3)));
+                }
+                else
+                {
+                    result = ma.group(3) != null ? 1 : -1;
+                }
+            }
+            // sort descending
+            return -result;
+        }
+    }
+
+    public static class Version
+    {
+        public final Major major;
+        public final String version;
+        public final URL[] classpath;
+
+        public Version(String version, URL[] classpath)
+        {
+            this(Major.fromFull(version), version, classpath);
+        }
+        public Version(Major major, String version, URL[] classpath)
+        {
+            this.major = major;
+            this.version = version;
+            this.classpath = classpath;
+        }
+    }
+
+    private final Map<Major, List<Version>> versions;
+    public Versions(Map<Major, List<Version>> versions)
+    {
+        this.versions = versions;
+    }
+
+    public Version get(String full)
+    {
+        return versions.get(Major.fromFull(full))
+                .stream()
+                .filter(v -> full.equals(v.version))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No version " + full + " found"));
+    }
+
+    public Version getLatest(Major major)
+    {
+        return versions.get(major).stream().findFirst().orElseThrow(() -> new RuntimeException("No " + major + " versions found"));
+    }
+
+    public static Versions find()
+    {
+        final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + "test.dtest_jar_path","build");
+        final File sourceDirectory = new File(dtestJarDirectory);
+        System.out.println("Looking for dtest jars in " + sourceDirectory.getAbsolutePath());
+        final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar");
+        final Map<Major, List<Version>> versions = new HashMap<>();
+        for (Major major : Major.values())
+            versions.put(major, new ArrayList<>());
+
+        for (File file : sourceDirectory.listFiles())
+        {
+            Matcher m = pattern.matcher(file.getName());
+            if (!m.matches())
+                continue;
+            String version = m.group(1);
+            Major major = Major.fromFull(version);
+            versions.get(major).add(new Version(major, version, new URL[] { toURL(file) }));
+        }
+
+        for (Map.Entry<Major, List<Version>> e : versions.entrySet())
+        {
+            if (e.getValue().isEmpty())
+                continue;
+            Collections.sort(e.getValue(), Comparator.comparing(v -> v.version, e.getKey()::compare));
+            System.out.println("Found " + e.getValue().stream().map(v -> v.version).collect(Collectors.joining(", ")));
+        }
+
+        return new Versions(versions);
+    }
+
+    public static URL toURL(File file)
+    {
+        try
+        {
+            return file.toURI().toURL();
+        }
+        catch (MalformedURLException e)
+        {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+}
diff --git a/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java
new file mode 100644
index 0000000..e61faa6
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.DistributedTestBase.KEYSPACE;
+
+public class BootstrapTest extends TestBaseImpl
+{
+
+    @Test
+    public void bootstrapTest() throws Throwable
+    {
+        int originalNodeCount = 2;
+        int expandedNodeCount = originalNodeCount + 1;
+        Builder<IInstance, ICluster> builder = builder().withNodes(originalNodeCount)
+                                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
+                                                        .withConfig(config -> config.with(NETWORK, GOSSIP));
+
+        Map<Integer, Long> withBootstrap = null;
+        Map<Integer, Long> naturally = null;
+        try (ICluster<IInvokableInstance> cluster = builder.withNodes(originalNodeCount).start())
+        {
+            populate(cluster);
+
+            IInstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+                                            .newInstanceConfig(cluster);
+            config.set("auto_bootstrap", true);
+
+            cluster.bootstrap(config).startup();
+
+            cluster.stream().forEach(instance -> {
+                instance.nodetool("cleanup", KEYSPACE, "tbl");
+            });
+
+            withBootstrap = count(cluster);
+        }
+
+        builder = builder.withNodes(expandedNodeCount)
+                         .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                         .withConfig(config -> config.with(NETWORK, GOSSIP));
+
+        try (ICluster cluster = builder.start())
+        {
+            populate(cluster);
+            naturally = count(cluster);
+        }
+
+        Assert.assertEquals(withBootstrap, naturally);
+    }
+
+    public void populate(ICluster cluster)
+    {
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};");
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+        for (int i = 0; i < 1000; i++)
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)",
+                                           ConsistencyLevel.QUORUM,
+                                           i, i, i);
+    }
+
+    public Map<Integer, Long> count(ICluster cluster)
+    {
+        return IntStream.rangeClosed(1, cluster.size())
+                        .boxed()
+                        .collect(Collectors.toMap(nodeId -> nodeId,
+                                                  nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
new file mode 100644
index 0000000..0ce0e74
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class DistributedReadWritePathTest extends TestBaseImpl
+{
+    @Test
+    public void coordinatorReadTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                      ConsistencyLevel.ALL,
+                                                      1),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+        }
+    }
+
+    @Test
+    public void largeMessageTest() throws Throwable
+    {
+        int largeMessageThreshold = 1024 * 64;
+        try (ICluster cluster = init(builder().withNodes(2).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
+            StringBuilder builder = new StringBuilder();
+            for (int i = 0; i < largeMessageThreshold ; i++)
+                builder.append('a');
+            String s = builder.toString();
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+                                           ConsistencyLevel.ALL,
+                                           s);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                      ConsistencyLevel.ALL,
+                                                      1),
+                       row(1, 1, s));
+        }
+    }
+
+    @Test
+    public void coordinatorWriteTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
+                                           ConsistencyLevel.QUORUM);
+
+            for (int i = 0; i < 3; i++)
+            {
+                assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                           row(1, 1, 1));
+            }
+
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                      ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void readRepairTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                      ConsistencyLevel.ALL), // ensure node3 in preflist
+                       row(1, 1, 1));
+
+            // Verify that data got repaired to the third node
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void writeWithSchemaDisagreement() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+            // Introduce schema disagreement
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+            Exception thrown = null;
+            try
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+                                               ConsistencyLevel.QUORUM);
+            }
+            catch (RuntimeException e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
+            Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
+        }
+    }
+
+    @Test
+    public void readWithSchemaDisagreement() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+            // Introduce schema disagreement
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+            Exception thrown = null;
+            try
+            {
+                assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                          ConsistencyLevel.ALL),
+                           row(1, 1, 1, null));
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
+            Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
+        }
+    }
+
+    @Test
+    public void simplePagedReadsTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            int size = 100;
+            Object[][] results = new Object[size][];
+            for (int i = 0; i < size; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                               ConsistencyLevel.QUORUM,
+                                               i, i);
+                results[i] = new Object[] { 1, i, i};
+            }
+
+            // Make sure paged read returns same results with different page sizes
+            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+            {
+                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+                                                                    ConsistencyLevel.QUORUM,
+                                                                    pageSize),
+                           results);
+            }
+        }
+    }
+
+    @Test
+    public void pagingWithRepairTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            int size = 100;
+            Object[][] results = new Object[size][];
+            for (int i = 0; i < size; i++)
+            {
+                // Make sure that data lands on different nodes and not coordinator
+                cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                                i, i);
+
+                results[i] = new Object[] { 1, i, i};
+            }
+
+            // Make sure paged read returns same results with different page sizes
+            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+            {
+                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+                                                                    ConsistencyLevel.ALL,
+                                                                    pageSize),
+                           results);
+            }
+
+            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
+                       results);
+        }
+    }
+
+    @Test
+    public void pagingTests() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start());
+             ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+            singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            for (int i = 0; i < 10; i++)
+            {
+                for (int j = 0; j < 10; j++)
+                {
+                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                   ConsistencyLevel.QUORUM,
+                                                   i, j, i + i);
+                    singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                      ConsistencyLevel.QUORUM,
+                                                      i, j, i + i);
+                }
+            }
+
+            int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
+            String[] statements = new String [] {"SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
+                    "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
+                    "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl LIMIT 3",
+                    "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10)",
+                    "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
+            };
+            for (String statement : statements)
+            {
+                for (int pageSize : pageSizes)
+                {
+                    assertRows(cluster.coordinator(1)
+                                       .executeWithPaging(statement,
+                                                          ConsistencyLevel.QUORUM,  pageSize),
+                               singleNode.coordinator(1)
+                                       .executeWithPaging(statement,
+                                                          ConsistencyLevel.QUORUM,  Integer.MAX_VALUE));
+                }
+            }
+        }
+    }
+}
+
diff --git a/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java b/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java
new file mode 100644
index 0000000..e3d3c68
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ICluster;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class GossipSettlesTest extends TestBaseImpl
+{
+
+    @Test
+    public void testGossipSettles() throws Throwable
+    {
+        /* Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails */
+        try (ICluster cluster = builder().withNodes(3)
+                                         .withConfig(config -> config.with(GOSSIP).with(NETWORK))
+                                         .withSubnet(1)
+                                         .start())
+        {
+        }
+    }
+
+}
diff --git a/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java b/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java
new file mode 100644
index 0000000..3c3ee47
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class LargeColumnTest extends TestBaseImpl
+{
+    private static final Logger logger = LoggerFactory.getLogger(LargeColumnTest.class);
+
+    private static String str(int length, Random random, long seed)
+    {
+        random.setSeed(seed);
+        char[] chars = new char[length];
+        int i = 0;
+        int s = 0;
+        long v = 0;
+        while (i < length)
+        {
+            if (s == 0)
+            {
+                v = random.nextLong();
+                s = 8;
+            }
+            chars[i] = (char) (((v & 127) + 32) & 127);
+            v >>= 8;
+            --s;
+            ++i;
+        }
+        return new String(chars);
+    }
+
+    private void testLargeColumns(int nodes, int columnSize, int rowCount) throws Throwable
+    {
+        Random random = new Random();
+        long seed = ThreadLocalRandom.current().nextLong();
+        logger.info("Using seed {}", seed);
+
+        try (ICluster cluster = init(builder()
+                                     .withNodes(nodes)
+                                     .withConfig(config ->
+                                                 config.set("commitlog_segment_size_in_mb", (columnSize * 3) >> 20)
+                                                       .set("internode_application_send_queue_reserve_endpoint_capacity_in_bytes", columnSize * 2)
+                                                       .set("internode_application_send_queue_reserve_global_capacity_in_bytes", columnSize * 3)
+                                                       .set("write_request_timeout_in_ms", SECONDS.toMillis(30L))
+                                                       .set("read_request_timeout_in_ms", SECONDS.toMillis(30L))
+                                                       .set("memtable_heap_space_in_mb", 1024)
+                                     )
+                                     .start()))
+        {
+            cluster.schemaChange(String.format("CREATE TABLE %s.cf (k int, c text, PRIMARY KEY (k))", KEYSPACE));
+
+            for (int i = 0; i < rowCount; ++i)
+                cluster.coordinator(1).execute(String.format("INSERT INTO %s.cf (k, c) VALUES (?, ?);", KEYSPACE), ConsistencyLevel.ALL, i, str(columnSize, random, seed | i));
+
+            for (int i = 0; i < rowCount; ++i)
+            {
+                Object[][] results = cluster.coordinator(1).execute(String.format("SELECT k, c FROM %s.cf WHERE k = ?;", KEYSPACE), ConsistencyLevel.ALL, i);
+                Assert.assertTrue(str(columnSize, random, seed | i).equals(results[0][1]));
+            }
+        }
+    }
+
+    @Test
+    public void test() throws Throwable
+    {
+        testLargeColumns(2, 16 << 20, 5);
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java
new file mode 100644
index 0000000..c7e9b26
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.impl.RowUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.distributed.api.ICluster;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NativeProtocolTest extends TestBaseImpl
+{
+
+    @Test
+    public void withClientRequests() throws Throwable
+    {
+        try (ICluster ignored = init(builder().withNodes(3)
+                                              .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                              .start()))
+        {
+
+            try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+                 Session session = cluster.connect())
+            {
+                session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));");
+                session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);");
+                Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
+                final ResultSet resultSet = session.execute(select);
+                assertRows(RowUtil.toObjects(resultSet), row(1, 1, 1));
+                Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
+            }
+        }
+    }
+
+    @Test
+    public void withCounters() throws Throwable
+    {
+        try (ICluster ignored = init(builder().withNodes(3)
+                                              .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                              .start()))
+        {
+            final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+            Session session = cluster.connect();
+            session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck counter, PRIMARY KEY (pk));");
+            session.execute("UPDATE " + KEYSPACE + ".tbl set ck = ck + 10 where pk = 1;");
+            Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
+            final ResultSet resultSet = session.execute(select);
+            assertRows(RowUtil.toObjects(resultSet), row(1, 10L));
+            Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
+            session.close();
+            cluster.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
new file mode 100644
index 0000000..53154e3
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+public class NetworkTopologyTest extends TestBaseImpl
+{
+    @Test
+    public void namedDcTest() throws Throwable
+    {
+        try (ICluster<IInvokableInstance> cluster = builder()
+                                                    .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0")))
+                                                    .withRack("elsewhere", "firstrack", 1)
+                                                    .withRack("elsewhere", "secondrack", 2)
+                                                    .withDC("nearthere", 4)
+                                                    .start())
+        {
+            Assert.assertEquals(1, cluster.stream("somewhere").count());
+            Assert.assertEquals(1, cluster.stream("elsewhere", "firstrack").count());
+            Assert.assertEquals(2, cluster.stream("elsewhere", "secondrack").count());
+            Assert.assertEquals(3, cluster.stream("elsewhere").count());
+            Assert.assertEquals(4, cluster.stream("nearthere").count());
+
+            Set<IInstance> expect = cluster.stream().collect(Collectors.toSet());
+            Set<IInstance> result = Stream.concat(Stream.concat(cluster.stream("somewhere"),
+                                                                cluster.stream("elsewhere")),
+                                                  cluster.stream("nearthere")).collect(Collectors.toSet());
+            Assert.assertEquals(expect, result);
+        }
+    }
+
+    @Test
+    public void automaticNamedDcTest() throws Throwable
+
+    {
+        try (ICluster cluster = builder()
+                                .withRacks(2, 1, 3)
+                                .start())
+        {
+            Assert.assertEquals(6, cluster.stream().count());
+            Assert.assertEquals(3, cluster.stream("datacenter1").count());
+            Assert.assertEquals(3, cluster.stream("datacenter2", "rack1").count());
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void noCountsAfterNamingDCsTest()
+    {
+        builder().withDC("nameddc", 1)
+                 .withDCs(1);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void mustProvideNodeCountBeforeWithDCsTest()
+    {
+        builder().withDCs(1);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void noEmptyNodeIdTopologyTest()
+    {
+        builder().withNodeIdTopology(Collections.emptyMap());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void noHolesInNodeIdTopologyTest()
+    {
+        builder().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
new file mode 100644
index 0000000..70790bc
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class SimpleReadWriteTest extends TestBaseImpl
+{
+    @Test
+    public void coordinatorReadTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                     ConsistencyLevel.ALL,
+                                                     1),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+        }
+    }
+
+    @Test
+    public void coordinatorWriteTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
+                                          ConsistencyLevel.QUORUM);
+
+            for (int i = 0; i < 3; i++)
+            {
+                assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                           row(1, 1, 1));
+            }
+
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void readRepairTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL), // ensure node3 in preflist
+                       row(1, 1, 1));
+
+            // Verify that data got repaired to the third node
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void writeWithSchemaDisagreement() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+            // Introduce schema disagreement
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+            Exception thrown = null;
+            try
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+                                              ConsistencyLevel.QUORUM);
+            }
+            catch (RuntimeException e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
+            Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
+        }
+    }
+
+    @Test
+    public void readWithSchemaDisagreement() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+            // Introduce schema disagreement
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+            Exception thrown = null;
+            try
+            {
+                assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                         ConsistencyLevel.ALL),
+                           row(1, 1, 1, null));
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getMessage(), thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
+            Assert.assertTrue(thrown.getMessage(), thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
+        }
+    }
+
+    @Test
+    public void simplePagedReadsTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            int size = 100;
+            Object[][] results = new Object[size][];
+            for (int i = 0; i < size; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                               ConsistencyLevel.QUORUM,
+                                               i, i);
+                results[i] = new Object[] { 1, i, i};
+            }
+
+            // Make sure paged read returns same results with different page sizes
+            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+            {
+                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+                                                                    ConsistencyLevel.QUORUM,
+                                                                    pageSize),
+                           results);
+            }
+        }
+    }
+
+    @Test
+    public void pagingWithRepairTest() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            int size = 100;
+            Object[][] results = new Object[size][];
+            for (int i = 0; i < size; i++)
+            {
+                // Make sure that data lands on different nodes and not coordinator
+                cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                                i, i);
+
+                results[i] = new Object[] { 1, i, i};
+            }
+
+            // Make sure paged read returns same results with different page sizes
+            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+            {
+                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+                                                                    ConsistencyLevel.ALL,
+                                                                    pageSize),
+                           results);
+            }
+
+            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
+                       results);
+        }
+    }
+
+    @Test
+    public void pagingTests() throws Throwable
+    {
+        try (ICluster cluster = init(builder().withNodes(3).start());
+             ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+            singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            for (int i = 0; i < 10; i++)
+            {
+                for (int j = 0; j < 10; j++)
+                {
+                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                   ConsistencyLevel.QUORUM,
+                                                   i, j, i + i);
+                    singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                      ConsistencyLevel.QUORUM,
+                                                      i, j, i + i);
+                }
+            }
+
+            int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
+            String[] statements = new String [] {"SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
+                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
+                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl LIMIT 3",
+                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10)",
+                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
+            };
+            for (String statement : statements)
+            {
+                for (int pageSize : pageSizes)
+                {
+                    assertRows(cluster.coordinator(1)
+                                      .executeWithPaging(statement,
+                                                         ConsistencyLevel.QUORUM,  pageSize),
+                               singleNode.coordinator(1)
+                                         .executeWithPaging(statement,
+                                                            ConsistencyLevel.QUORUM,  Integer.MAX_VALUE));
+                }
+            }
+
+        }
+    }
+}
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
new file mode 100644
index 0000000..370e1e5
--- /dev/null
+++ b/test/conf/logback-dtest.xml
@@ -0,0 +1,77 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+  <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+  <!-- Shutdown hook ensures that async appender flushes -->
+  <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+  <appender name="INSTANCEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+
+    <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file>
+    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+      <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern>
+      <minIndex>1</minIndex>
+      <maxIndex>20</maxIndex>
+    </rollingPolicy>
+
+    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+      <maxFileSize>20MB</maxFileSize>
+    </triggeringPolicy>
+
+    <encoder>
+      <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+    </encoder>
+    <immediateFlush>false</immediateFlush>
+  </appender>
+
+  <appender name="INSTANCEASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
+    <discardingThreshold>0</discardingThreshold>
+    <maxFlushTime>0</maxFlushTime>
+    <queueSize>1024</queueSize>
+    <appender-ref ref="INSTANCEFILE"/>
+  </appender>
+
+  <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+    </encoder>
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>WARN</level>
+    </filter>
+  </appender>
+
+  <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+    </encoder>
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>DEBUG</level>
+    </filter>
+  </appender>
+
+  <logger name="org.apache.hadoop" level="WARN"/>
+
+  <root level="DEBUG">
+    <appender-ref ref="INSTANCEASYNCFILE" />
+    <appender-ref ref="INSTANCESTDERR" />
+    <appender-ref ref="INSTANCESTDOUT" />
+  </root>
+</configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org