You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/27 18:24:27 UTC

[cassandra] branch cassandra-3.0 updated (f2c9b4c -> c2cfebf)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from f2c9b4c  Merge branch 'cassandra-2.2' into cassandra-3.0
     new 1f72cc6  Extract in-jvm-dtest API
     new c2cfebf  Merge branch 'cassandra-2.2' into cassandra-3.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build.xml                                          |   5 +
 src/java/org/apache/cassandra/net/MessageOut.java  |   1 +
 .../org/apache/cassandra/net/MessagingService.java |   2 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |  11 +
 src/java/org/apache/cassandra/tools/NodeTool.java  |   4 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  32 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  32 +-
 .../apache/cassandra/distributed/api/Feature.java  |  24 --
 .../apache/cassandra/distributed/api/ICluster.java |  36 --
 .../cassandra/distributed/api/ICoordinator.java    |  36 --
 .../cassandra/distributed/api/IInstance.java       |  57 ----
 .../cassandra/distributed/api/IInstanceConfig.java |  60 ----
 .../distributed/api/IIsolatedExecutor.java         | 126 -------
 .../apache/cassandra/distributed/api/IListen.java  |  28 --
 .../apache/cassandra/distributed/api/IMessage.java |  37 --
 .../cassandra/distributed/api/IMessageFilters.java |  56 ----
 .../distributed/impl/AbstractCluster.java          | 373 +++++----------------
 .../cassandra/distributed/impl/Coordinator.java    |  56 ++--
 .../impl/DelegatingInvokableInstance.java          |  11 +-
 .../distributed/impl/DistributedTestSnitch.java    |  31 +-
 .../distributed/impl/IInvokableInstance.java       |  67 ----
 .../distributed/impl/IUpgradeableInstance.java     |   1 +
 .../cassandra/distributed/impl/Instance.java       | 272 ++++++++++++---
 .../distributed/impl/InstanceClassLoader.java      | 130 -------
 .../cassandra/distributed/impl/InstanceConfig.java |  98 +++---
 .../distributed/impl/InstanceKiller.java}          |  38 +--
 .../distributed/impl/IsolatedExecutor.java         |   4 +
 .../apache/cassandra/distributed/impl/Listen.java  |   1 -
 .../cassandra/distributed/impl/MessageFilters.java | 168 ----------
 .../impl/{Message.java => MessageImpl.java}        |  27 +-
 .../distributed/impl/NetworkTopology.java          | 137 --------
 .../apache/cassandra/distributed/impl/RowUtil.java |   1 +
 .../cassandra/distributed/impl/TracingUtil.java    |   2 +-
 .../cassandra/distributed/impl/Versions.java       | 190 -----------
 .../mock/nodetool/InternalNodeProbe.java           |  33 +-
 .../mock/nodetool/InternalNodeProbeFactory.java    |  11 +-
 .../cassandra/distributed/test/BootstrapTest.java  |  50 +--
 .../test/DistributedReadWritePathTest.java         | 300 -----------------
 .../distributed/test/DistributedTestBase.java      | 166 ---------
 .../distributed/test/GossipSettlesTest.java        |  16 +-
 .../cassandra/distributed/test/GossipTest.java     |   4 +-
 .../distributed/test/MessageFiltersTest.java       |  85 +++--
 .../distributed/test/MessageForwardingTest.java    |  10 +-
 .../distributed/test/NativeProtocolTest.java       |  49 +--
 .../distributed/test/NetworkTopologyTest.java      |  40 ++-
 .../cassandra/distributed/test/NodeToolTest.java   |   2 +-
 .../distributed/test/ResourceLeakTest.java         |  13 +-
 ...SettlesTest.java => SharedClusterTestBase.java} |  32 +-
 .../distributed/test/SimpleReadWriteTest.java      | 276 +++++++++++++++
 .../cassandra/distributed/test/TestBaseImpl.java   |  47 +++
 .../upgrade/CompactStorage2to3UpgradeTest.java     |  33 +-
 .../upgrade/MixedModeReadRepairTest.java           |   8 +-
 .../cassandra/distributed/upgrade/UpgradeTest.java |  57 ++--
 .../distributed/upgrade/UpgradeTestBase.java       |  21 +-
 .../apache/cassandra/LogbackStatusListener.java    |   2 +-
 55 files changed, 1133 insertions(+), 2276 deletions(-)
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/Feature.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/ICluster.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IInstance.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IListen.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IMessage.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/IInvokableInstance.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
 copy test/{unit/org/apache/cassandra/utils/KillerForTests.java => distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java} (52%)
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
 rename test/distributed/org/apache/cassandra/distributed/impl/{Message.java => MessageImpl.java} (74%)
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/impl/Versions.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
 delete mode 100644 test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
 copy test/distributed/org/apache/cassandra/distributed/test/{GossipSettlesTest.java => SharedClusterTestBase.java} (61%)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c2cfebf44f93af061131d73e4dcbf2a9ff582fe8
Merge: f2c9b4c 1f72cc6
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Fri Mar 27 19:04:38 2020 +0100

    Merge branch 'cassandra-2.2' into cassandra-3.0

 build.xml                                          |   5 +
 src/java/org/apache/cassandra/net/MessageOut.java  |   1 +
 .../org/apache/cassandra/net/MessagingService.java |   2 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |  11 +
 src/java/org/apache/cassandra/tools/NodeTool.java  |   4 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  32 +-
 .../cassandra/distributed/UpgradeableCluster.java  |  32 +-
 .../apache/cassandra/distributed/api/Feature.java  |  24 --
 .../cassandra/distributed/api/ICoordinator.java    |  36 --
 .../cassandra/distributed/api/IInstance.java       |  57 ----
 .../cassandra/distributed/api/IInstanceConfig.java |  60 ----
 .../distributed/api/IIsolatedExecutor.java         | 126 -------
 .../apache/cassandra/distributed/api/IListen.java  |  28 --
 .../apache/cassandra/distributed/api/IMessage.java |  37 --
 .../cassandra/distributed/api/IMessageFilters.java |  56 ----
 .../distributed/impl/AbstractCluster.java          | 373 +++++----------------
 .../cassandra/distributed/impl/Coordinator.java    |  56 ++--
 .../impl/DelegatingInvokableInstance.java          |  11 +-
 .../distributed/impl/DistributedTestSnitch.java    |  31 +-
 .../distributed/impl/IInvokableInstance.java       |  67 ----
 .../distributed/impl/IUpgradeableInstance.java     |   1 +
 .../cassandra/distributed/impl/Instance.java       | 272 ++++++++++++---
 .../distributed/impl/InstanceClassLoader.java      | 130 -------
 .../cassandra/distributed/impl/InstanceConfig.java |  98 +++---
 .../cassandra/distributed/impl/InstanceKiller.java |  50 +++
 .../distributed/impl/IsolatedExecutor.java         |   4 +
 .../apache/cassandra/distributed/impl/Listen.java  |   1 -
 .../cassandra/distributed/impl/MessageFilters.java | 168 ----------
 .../impl/{Message.java => MessageImpl.java}        |  27 +-
 .../distributed/impl/NetworkTopology.java          | 137 --------
 .../apache/cassandra/distributed/impl/RowUtil.java |   1 +
 .../cassandra/distributed/impl/TracingUtil.java    |   2 +-
 .../cassandra/distributed/impl/Versions.java       | 190 -----------
 .../mock/nodetool/InternalNodeProbe.java           |  33 +-
 .../mock/nodetool/InternalNodeProbeFactory.java    |  11 +-
 .../cassandra/distributed/test/BootstrapTest.java  |  50 +--
 .../test/DistributedReadWritePathTest.java         | 300 -----------------
 .../distributed/test/DistributedTestBase.java      | 166 ---------
 .../distributed/test/GossipSettlesTest.java        |  16 +-
 .../cassandra/distributed/test/GossipTest.java     |   4 +-
 .../distributed/test/MessageFiltersTest.java       |  85 +++--
 .../distributed/test/MessageForwardingTest.java    |  10 +-
 .../distributed/test/NativeProtocolTest.java       |  49 +--
 .../distributed/test/NetworkTopologyTest.java      |  40 ++-
 .../cassandra/distributed/test/NodeToolTest.java   |   2 +-
 .../distributed/test/ResourceLeakTest.java         |  13 +-
 .../SharedClusterTestBase.java}                    |  38 ++-
 .../distributed/test/SimpleReadWriteTest.java      | 276 +++++++++++++++
 .../cassandra/distributed/test/TestBaseImpl.java   |  47 +++
 .../upgrade/CompactStorage2to3UpgradeTest.java     |  33 +-
 .../upgrade/MixedModeReadRepairTest.java           |   8 +-
 .../cassandra/distributed/upgrade/UpgradeTest.java |  57 ++--
 .../distributed/upgrade/UpgradeTestBase.java       |  21 +-
 .../apache/cassandra/LogbackStatusListener.java    |   2 +-
 54 files changed, 1170 insertions(+), 2221 deletions(-)

diff --cc build.xml
index a40cb7d,ed9c1a2..2527883
--- a/build.xml
+++ b/build.xml
@@@ -520,7 -534,19 +524,8 @@@
                  artifactId="cassandra-parent"
                  version="${version}"/>
          <dependency groupId="junit" artifactId="junit"/>
+         <dependency groupId="org.mockito" artifactId="mockito-core" />
 -        <dependency groupId="org.apache.pig" artifactId="pig">
 -          <exclusion groupId="xmlenc" artifactId="xmlenc"/>
 -          <exclusion groupId="tomcat" artifactId="jasper-runtime"/>
 -          <exclusion groupId="tomcat" artifactId="jasper-compiler"/>
 -          <exclusion groupId="org.eclipse.jdt" artifactId="core"/>
 -          <exclusion groupId="net.sf.kosmosfs" artifactId="kfs"/>
 -          <exclusion groupId="hsqldb" artifactId="hsqldb"/>
 -          <exclusion groupId="antlr" artifactId="antlr"/>
 -        </dependency>
 -        <!-- TODO CASSANDRA-9543
          <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
 -        -->
          <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
          <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
          <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
diff --cc src/java/org/apache/cassandra/net/MessageOut.java
index ce190cb,1e291c2..09ff63b
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@@ -30,6 -30,6 +30,7 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.TypeSizes;
  import org.apache.cassandra.io.IVersionedSerializer;
++import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.FBUtilities;
diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 55dbee1,05c8af8..82c06da
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -527,257 -553,19 +553,19 @@@ public abstract class AbstractCluster<
          }
      }
  
-     protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
-     {
-         C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader);
-     }
- 
-     public static class Builder<I extends IInstance, C extends AbstractCluster<I>>
+     private void uncaughtExceptions(Thread thread, Throwable error)
      {
-         private final Factory<I, C> factory;
-         private int nodeCount;
-         private int subnet;
-         private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
-         private TokenSupplier tokenSupplier;
-         private File root;
-         private Versions.Version version = Versions.CURRENT;
-         private Consumer<InstanceConfig> configUpdater;
- 
-         public Builder(Factory<I, C> factory)
-         {
-             this.factory = factory;
-         }
- 
-         public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
-         {
-             this.tokenSupplier = tokenSupplier;
-             return this;
-         }
- 
-         public Builder<I, C> withSubnet(int subnet)
-         {
-             this.subnet = subnet;
-             return this;
-         }
- 
-         public Builder<I, C> withNodes(int nodeCount)
-         {
-             this.nodeCount = nodeCount;
-             return this;
-         }
- 
-         public Builder<I, C> withDCs(int dcCount)
-         {
-             return withRacks(dcCount, 1);
-         }
- 
-         public Builder<I, C> withRacks(int dcCount, int racksPerDC)
-         {
-             if (nodeCount == 0)
-                 throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
- 
-             int totalRacks = dcCount * racksPerDC;
-             int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
-             return withRacks(dcCount, racksPerDC, nodesPerRack);
-         }
- 
-         public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
-         {
-             if (nodeIdTopology != null)
-                 throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
- 
-             nodeIdTopology = new HashMap<>();
-             int nodeId = 1;
-             for (int dc = 1; dc <= dcCount; dc++)
-             {
-                 for (int rack = 1; rack <= racksPerDC; rack++)
-                 {
-                     for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
-                         nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
-                 }
-             }
-             // adjust the node count to match the allocatation
-             final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
-             if (adjustedNodeCount != nodeCount)
-             {
-                 assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
-                 logger.info("Network topology of {} DCs with {} racks per DC and {} nodes per rack required increasing total nodes to {}",
-                             dcCount, racksPerDC, nodesPerRack, adjustedNodeCount);
-                 nodeCount = adjustedNodeCount;
-             }
-             return this;
-         }
- 
-         public Builder<I, C> withDC(String dcName, int nodeCount)
+         if (!(thread.getContextClassLoader() instanceof InstanceClassLoader))
          {
-             return withRack(dcName, rackName(1), nodeCount);
-         }
- 
-         public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
-         {
-             if (nodeIdTopology == null)
-             {
-                 if (nodeCount > 0)
-                     throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
- 
-                 nodeIdTopology = new HashMap<>();
-             }
-             for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
-                 nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
- 
-             nodeCount += nodesInRack;
-             return this;
-         }
- 
-         // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
-         public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
-         {
-             if (nodeIdTopology.isEmpty())
-                 throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
- 
-             IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
-                 if (nodeIdTopology.get(nodeId) == null)
-                     throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
-             });
- 
-             if (nodeCount != nodeIdTopology.size())
-             {
-                 nodeCount = nodeIdTopology.size();
-                 logger.info("Adjusting node count to {} for supplied network topology", nodeCount);
-             }
- 
-             this.nodeIdTopology = new HashMap<>(nodeIdTopology);
- 
-             return this;
-         }
- 
-         public Builder<I, C> withRoot(File root)
-         {
-             this.root = root;
-             return this;
-         }
- 
-         public Builder<I, C> withVersion(Versions.Version version)
-         {
-             this.version = version;
-             return this;
-         }
- 
-         public Builder<I, C> withConfig(Consumer<InstanceConfig> updater)
-         {
-             this.configUpdater = updater;
-             return this;
-         }
- 
-         public C createWithoutStarting() throws IOException
-         {
-             if (root == null)
-                 root = Files.createTempDirectory("dtests").toFile();
- 
-             if (nodeCount <= 0)
-                 throw new IllegalStateException("Cluster must have at least one node");
- 
-             if (nodeIdTopology == null)
-             {
-                 nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
-                                           .collect(Collectors.toMap(nodeId -> nodeId,
-                                                                     nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
-             }
- 
-             root.mkdirs();
-             setupLogging(root);
- 
-             ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
- 
-             List<InstanceConfig> configs = new ArrayList<>();
- 
-             if (tokenSupplier == null)
-                 tokenSupplier = evenlyDistributedTokens(nodeCount);
- 
-             for (int i = 0; i < nodeCount; ++i)
-             {
-                 int nodeNum = i + 1;
-                 configs.add(createInstanceConfig(nodeNum));
-             }
- 
-             return factory.newCluster(root, version, configs, sharedClassLoader);
-         }
- 
-         public InstanceConfig newInstanceConfig(C cluster)
-         {
-             return createInstanceConfig(cluster.size() + 1);
-         }
- 
-         private InstanceConfig createInstanceConfig(int nodeNum)
-         {
-             String ipPrefix = "127.0." + subnet + ".";
-             String seedIp = ipPrefix + "1";
-             String ipAddress = ipPrefix + nodeNum;
-             long token = tokenSupplier.token(nodeNum);
- 
-             NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
- 
-             InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
-             if (configUpdater != null)
-                 configUpdater.accept(config);
- 
-             return config;
-         }
- 
-         public C start() throws IOException
-         {
-             C cluster = createWithoutStarting();
-             cluster.startup();
-             return cluster;
-         }
-     }
- 
-     public static TokenSupplier evenlyDistributedTokens(int numNodes)
-     {
-         long increment = (Long.MAX_VALUE / numNodes) * 2;
-         return (int nodeId) -> {
-             assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
-                                                       nodeId, numNodes);
-             return Long.MIN_VALUE + 1 + nodeId * increment;
-         };
-     }
- 
-     public static interface TokenSupplier
-     {
-         public long token(int nodeId);
-     }
- 
-     static String dcName(int index)
-     {
-         return "datacenter" + index;
-     }
- 
-     static String rackName(int index)
-     {
-         return "rack" + index;
-     }
- 
-     private static void setupLogging(File root)
-     {
-         try
-         {
-             String testConfPath = "test/conf/logback-dtest.xml";
-             Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
- 
-             if (!logConfPath.toFile().exists())
-             {
-                 Files.copy(new File(testConfPath).toPath(),
-                            logConfPath);
-             }
- 
-             System.setProperty("logback.configurationFile", "file://" + logConfPath);
-         }
-         catch (IOException e)
-         {
-             throw new RuntimeException(e);
+             Thread.UncaughtExceptionHandler handler = previousHandler;
+             if (null != handler)
+                 handler.uncaughtException(thread, error);
+             return;
          }
+         InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader();
+         get(cl.getInstanceId()).uncaughtException(thread, error);
      }
--
++    
      @Override
      public void close()
      {
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 69c2e44,91a2aaf..e2ebef0
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -31,11 -31,15 +31,13 @@@ import org.apache.cassandra.cql3.QueryO
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.cql3.statements.SelectStatement;
- import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
  import org.apache.cassandra.distributed.api.ICoordinator;
+ import org.apache.cassandra.distributed.api.IInstance;
+ import org.apache.cassandra.distributed.api.QueryResult;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
 -import org.apache.cassandra.service.pager.Pageable;
  import org.apache.cassandra.service.pager.QueryPager;
 -import org.apache.cassandra.service.pager.QueryPagers;
  import org.apache.cassandra.transport.Server;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
@@@ -71,19 -75,23 +73,23 @@@ public class Coordinator implements ICo
          }).call();
      }
  
-     private Object[][] executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues)
+     protected org.apache.cassandra.db.ConsistencyLevel toCassandraCL(ConsistencyLevel cl)
+     {
+         return org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal());
+     }
+ 
+     private QueryResult executeInternal(String query, ConsistencyLevel consistencyLevelOrigin, Object[] boundValues)
      {
-         ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
 -        ClientState clientState = ClientState.forInternalCalls();
 +        ClientState clientState = makeFakeClientState();
          CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement;
          List<ByteBuffer> boundBBValues = new ArrayList<>();
+         ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
          for (Object boundValue : boundValues)
-         {
              boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
-         }
  
+         prepared.validate(QueryState.forInternalCalls().getClientState());
          ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
-                                              QueryOptions.create(consistencyLevel,
+                                              QueryOptions.create(toCassandraCL(consistencyLevel),
                                                                   boundBBValues,
                                                                   false,
                                                                   Integer.MAX_VALUE,
@@@ -109,38 -129,40 +127,38 @@@
              throw new IllegalArgumentException("Page size should be strictly positive but was " + pageSize);
  
          return instance.sync(() -> {
-             ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
 +            ClientState clientState = makeFakeClientState();
+             ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
 -            CQLStatement prepared = QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement;
 +            CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement;
              List<ByteBuffer> boundBBValues = new ArrayList<>();
              for (Object boundValue : boundValues)
              {
                  boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
              }
  
--            prepared.validate(QueryState.forInternalCalls().getClientState());
++            prepared.validate(clientState);
              assert prepared instanceof SelectStatement : "Only SELECT statements can be executed with paging";
  
 -            ClientState clientState = QueryState.forInternalCalls().getClientState();
              SelectStatement selectStatement = (SelectStatement) prepared;
 -            QueryOptions queryOptions = QueryOptions.create(toCassandraCL(consistencyLevel),
 -                                                            boundBBValues,
 -                                                            false,
 -                                                            pageSize,
 -                                                            null,
 -                                                            null,
 -                                                            Server.CURRENT_VERSION);
 -            Pageable pageable = selectStatement.getPageableCommand(queryOptions);
 +
-             QueryPager pager = selectStatement.getQuery(QueryOptions.create(consistencyLevel,
++            QueryPager pager = selectStatement.getQuery(QueryOptions.create(toCassandraCL(consistencyLevel),
 +                                                                            boundBBValues,
 +                                                                            false,
 +                                                                            pageSize,
 +                                                                            null,
 +                                                                            null,
 +                                                                            Server.CURRENT_VERSION),
 +                                                        FBUtilities.nowInSeconds())
 +                                     .getPager(null, Server.CURRENT_VERSION);
  
              // Usually pager fetches a single page (see SelectStatement#execute). We need to iterate over all
              // of the results lazily.
 -            QueryPager pager = QueryPagers.pager(pageable, toCassandraCL(consistencyLevel), clientState, null);
 -            Iterator<Object[]> iter = RowUtil.toObjects(selectStatement.getResultMetadata().names,
 -                                                        UntypedResultSet.create(selectStatement,
 -                                                                                pager,
 -                                                                                pageSize).iterator());
 -
 -            // We have to make sure iterator is not running on main thread.
              return new Iterator<Object[]>() {
-                 Iterator<Object[]> iter = RowUtil.toObjects(UntypedResultSet.create(selectStatement, consistencyLevel, clientState, pager,  pageSize));
++                Iterator<Object[]> iter = RowUtil.toObjects(UntypedResultSet.create(selectStatement, toCassandraCL(consistencyLevel), clientState, pager,  pageSize));
 +
                  public boolean hasNext()
                  {
 +                    // We have to make sure iterator is not running on main thread.
                      return instance.sync(() -> iter.hasNext()).call();
                  }
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 5a4dcf4,1c19bca..e8c45d8
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -34,7 -39,10 +37,11 @@@ import java.util.concurrent.TimeoutExce
  import java.util.function.BiConsumer;
  import java.util.function.Function;
  
+ import javax.management.ListenerNotFoundException;
+ import javax.management.Notification;
+ import javax.management.NotificationListener;
+ 
 +import org.apache.cassandra.batchlog.BatchlogManager;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.SharedExecutorPool;
  import org.apache.cassandra.concurrent.StageManager;
@@@ -63,13 -77,11 +74,15 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.gms.ApplicationState;
  import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.hints.HintsService;
 +import org.apache.cassandra.index.SecondaryIndexManager;
+ import org.apache.cassandra.io.IVersionedSerializer;
  import org.apache.cassandra.io.sstable.IndexSummaryManager;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataOutputBuffer;
- import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.io.util.DataOutputPlus;
+ import org.apache.cassandra.net.CompactEndpointSerializationHelper;
  import org.apache.cassandra.net.IMessageSink;
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
@@@ -247,7 -283,57 +286,57 @@@ public class Instance extends IsolatedE
              long timestamp = System.currentTimeMillis();
              out.writeInt((int) timestamp);
              messageOut.serialize(out, version);
-             return new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
+             return new MessageImpl(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     public static IMessage serializeMessage(MessageIn<?> messageIn, int id, InetSocketAddress from, InetSocketAddress to)
+     {
+         try (DataOutputBuffer out = new DataOutputBuffer(1024))
+         {
+             // Serialize header
+             int version = MessagingService.instance().getVersion(to.getAddress());
+ 
+             out.writeInt(MessagingService.PROTOCOL_MAGIC);
+             out.writeInt(id);
+             long timestamp = System.currentTimeMillis();
+             out.writeInt((int) timestamp);
+ 
+             // Serialize the message itself
+             IVersionedSerializer serializer = MessagingService.instance().verbSerializers.get(messageIn.verb);
+             CompactEndpointSerializationHelper.serialize(from.getAddress(), out);
+ 
 -            out.writeInt(messageIn.verb.ordinal());
++            out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(messageIn.verb, version).ordinal());
+             out.writeInt(messageIn.parameters.size());
+             for (Map.Entry<String, byte[]> entry : messageIn.parameters.entrySet())
+             {
+                 out.writeUTF(entry.getKey());
+                 out.writeInt(entry.getValue().length);
+                 out.write(entry.getValue());
+             }
+ 
+             if (messageIn.payload != null && serializer != MessagingService.CallbackDeterminedSerializer.instance)
+             {
+                 try (DataOutputBuffer dob = new DataOutputBuffer())
+                 {
+                     serializer.serialize(messageIn.payload, dob, version);
+ 
+                     int size = dob.getLength();
+                     out.writeInt(size);
+                     out.write(dob.getData(), 0, size);
+                 }
+             }
+             else
+             {
+                 out.writeInt(0);
+             }
+ 
+ 
+             return new MessageImpl(messageIn.verb.ordinal(), out.toByteArray(), id, version, from);
          }
          catch (IOException e)
          {
@@@ -332,6 -420,6 +423,8 @@@
              int partial = input.readInt();
  
              return Pair.create(MessageIn.read(input, version, id), partial);
++            //long currentTime = ApproximateTime.currentTimeMillis();
++            //return MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().getAddress(), input, currentTime));
          }
          catch (IOException e)
          {
@@@ -536,19 -623,15 +630,19 @@@
  
              for (int i = 0; i < tokens.size(); i++)
              {
-                 InetAddressAndPort ep = hosts.get(i);
+                 InetSocketAddress ep = hosts.get(i);
 -                Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostIds.get(i), 1);
 -                Gossiper.instance.injectApplicationState(ep.getAddress(),
 -                        ApplicationState.TOKENS,
 -                        new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i))));
 -                storageService.onChange(ep.getAddress(),
 -                        ApplicationState.STATUS,
 -                        new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
 -                Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
 +                UUID hostId = hostIds.get(i);
 +                Token token = tokens.get(i);
 +                Gossiper.runInGossipStageBlocking(() -> {
-                     Gossiper.instance.initializeNodeUnsafe(ep.address, hostId, 1);
-                     Gossiper.instance.injectApplicationState(ep.address,
++                    Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1);
++                    Gossiper.instance.injectApplicationState(ep.getAddress(),
 +                                                             ApplicationState.TOKENS,
 +                                                             new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
-                     storageService.onChange(ep.address,
++                    storageService.onChange(ep.getAddress(),
 +                                            ApplicationState.STATUS,
 +                                            new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
-                     Gossiper.instance.realMarkAlive(ep.address, Gossiper.instance.getEndpointStateForEndpoint(ep.address));
++                    Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
 +                });
                  int messagingVersion = cluster.get(ep).isShutdown()
                                         ? MessagingService.current_version
                                         : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion());
@@@ -570,11 -653,9 +664,12 @@@
          return shutdown(true);
      }
  
+     @Override
      public Future<Void> shutdown(boolean graceful)
      {
 +        if (!graceful)
 +            MessagingService.instance().shutdown(false);
 +
          Future<?> future = async((ExecutorService executor) -> {
              Throwable error = null;
  
diff --cc test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
index b76c455,625b4aa..f3eb327
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
@@@ -23,9 -23,10 +23,11 @@@ import java.lang.management.ManagementF
  import java.util.Iterator;
  import java.util.Map;
  
+ import javax.management.ListenerNotFoundException;
+ 
  import com.google.common.collect.Multimap;
  
 +import org.apache.cassandra.batchlog.BatchlogManager;
  import org.apache.cassandra.db.ColumnFamilyStoreMBean;
  import org.apache.cassandra.db.HintedHandOffManager;
  import org.apache.cassandra.db.Keyspace;
diff --cc test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index c2e9e4f,0000000..83c62c8
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@@ -1,115 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.net.InetAddress;
 +import java.util.Collection;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.locks.LockSupport;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.Iterables;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
- public class GossipTest extends DistributedTestBase
++public class GossipTest extends TestBaseImpl
 +{
 +
 +    @Test
 +    public void nodeDownDuringMove() throws Throwable
 +    {
 +        int liveCount = 1;
 +        System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 30s default
 +        System.setProperty("cassandra.consistent.rangemovement", "false");
 +        System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true");
 +        try (Cluster cluster = Cluster.build(2 + liveCount)
 +                                      .withConfig(config -> config.with(NETWORK).with(GOSSIP))
 +                                      .createWithoutStarting())
 +        {
 +            int fail = liveCount + 1;
 +            int late = fail + 1;
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +                cluster.get(i).startup();
 +            cluster.get(fail).startup();
 +            Collection<String> expectTokens = cluster.get(fail).callsOnInstance(() ->
 +                StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress())
 +                                       .stream().map(Object::toString).collect(Collectors.toList())
 +            ).call();
 +
-             InetAddress failAddress = cluster.get(fail).broadcastAddressAndPort().address;
++            InetAddress failAddress = cluster.get(fail).broadcastAddress().getAddress();
 +            // wait for NORMAL state
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +            {
 +                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
 +                    EndpointState ep;
 +                    while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                           || ep.getApplicationState(ApplicationState.STATUS) == null
 +                           || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("NORMAL"))
 +                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +                }).accept(failAddress);
 +            }
 +
 +            // set ourselves to MOVING, and wait for it to propagate
 +            cluster.get(fail).runOnInstance(() -> {
 +
 +                Token token = Iterables.getFirst(StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()), null);
 +                Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.moving(token));
 +            });
 +
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +            {
 +                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
 +                    EndpointState ep;
 +                    while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                           || (ep.getApplicationState(ApplicationState.STATUS) == null
 +                               || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING")))
 +                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +                }).accept(failAddress);
 +            }
 +
 +            cluster.get(fail).shutdown(false).get();
 +            cluster.get(late).startup();
 +            cluster.get(late).acceptsOnInstance((InetAddress endpoint) -> {
 +                EndpointState ep;
 +                while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                       || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING"))
 +                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +            }).accept(failAddress);
 +
 +            Collection<String> tokens = cluster.get(late).appliesOnInstance((InetAddress endpoint) ->
 +                StorageService.instance.getTokenMetadata().getTokens(failAddress)
 +                                       .stream().map(Object::toString).collect(Collectors.toList())
 +            ).apply(failAddress);
 +
 +            Assert.assertEquals(expectTokens, tokens);
 +        }
 +    }
 +    
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 07e7428,062f401..f4398da
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@@ -37,10 -37,27 +37,26 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessagingService;
  
- public class MessageFiltersTest extends DistributedTestBase
+ public class MessageFiltersTest extends TestBaseImpl
  {
 -
      @Test
-     public void simpleFiltersTest() throws Throwable
+     public void simpleInboundFiltersTest()
+     {
+         simpleFiltersTest(true);
+     }
+ 
+     @Test
+     public void simpleOutboundFiltersTest()
+     {
+         simpleFiltersTest(false);
+     }
+ 
+     private interface Permit
+     {
+         boolean test(int from, int to, IMessage msg);
+     }
+ 
+     private static void simpleFiltersTest(boolean inbound)
      {
          int VERB1 = MessagingService.Verb.READ.ordinal();
          int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal();
@@@ -52,21 -69,22 +68,22 @@@
          String MSG2 = "msg2";
  
          MessageFilters filters = new MessageFilters();
-         MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
 -        Permit permit = inbound ? filters::permitInbound : filters::permitOutbound;
++        Permit permit = inbound ? (from, to, msg) -> filters.permitInbound(from, to, msg) : (from, to, msg) -> filters.permitOutbound(from, to, msg);
  
-         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-         Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
-         Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
-         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop();
+         Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1)));
+         Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
          filter.off();
-         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
          filters.reset();
  
-         filters.verbs(VERB1).from(1).to(2).drop();
-         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
-         Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
-         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
-         Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+         filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop();
+         Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1)));
  
          filters.reset();
          AtomicInteger counter = new AtomicInteger();
diff --cc test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java
index 091e5f0,0000000..c502af2
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SharedClusterTestBase.java
@@@ -1,36 -1,0 +1,52 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
- package org.apache.cassandra.distributed.api;
++package org.apache.cassandra.distributed.test;
 +
- import org.apache.cassandra.locator.InetAddressAndPort;
++import java.io.IOException;
 +
- import java.util.stream.Stream;
++import org.junit.After;
++import org.junit.AfterClass;
++import org.junit.BeforeClass;
 +
- public interface ICluster
++import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ICluster;
++
++public class SharedClusterTestBase extends TestBaseImpl
 +{
++    protected static ICluster cluster;
++
++    @BeforeClass
++    public static void before() throws IOException
++    {
++        cluster = init(Cluster.build().withNodes(3).start());
++    }
 +
-     IInstance get(int i);
-     IInstance get(InetAddressAndPort endpoint);
-     int size();
-     Stream<? extends IInstance> stream();
-     Stream<? extends IInstance> stream(String dcName);
-     Stream<? extends IInstance> stream(String dcName, String rackName);
-     IMessageFilters filters();
++    @AfterClass
++    public static void after() throws Exception
++    {
++        cluster.close();
++    }
 +
++    @After
++    public void afterEach()
++    {
++        cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
++        init(cluster);
++    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 0000000,0000000..f1f8674
new file mode 100644
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -1,0 -1,0 +1,276 @@@
++package org.apache.cassandra.distributed.test;
++
++import org.junit.Assert;
++import org.junit.Test;
++
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ICluster;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
++
++import static org.junit.Assert.assertEquals;
++
++import static org.apache.cassandra.distributed.shared.AssertUtils.*;
++
++// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
++public class SimpleReadWriteTest extends SharedClusterTestBase
++{
++    @Test
++    public void coordinatorReadTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++
++        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
++        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
++        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
++
++        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
++                                                  ConsistencyLevel.ALL,
++                                                  1),
++                   row(1, 1, 1),
++                   row(1, 2, 2),
++                   row(1, 3, 3));
++    }
++
++    @Test
++    public void largeMessageTest() throws Throwable
++    {
++        int largeMessageThreshold = 1024 * 64;
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
++        StringBuilder builder = new StringBuilder();
++        for (int i = 0; i < largeMessageThreshold; i++)
++            builder.append('a');
++        String s = builder.toString();
++        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
++                                       ConsistencyLevel.ALL,
++                                       s);
++        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
++                                                  ConsistencyLevel.ALL,
++                                                  1),
++                   row(1, 1, s));
++    }
++
++    @Test
++    public void coordinatorWriteTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++
++        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
++                                       ConsistencyLevel.QUORUM);
++
++        for (int i = 0; i < 3; i++)
++        {
++            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
++                       row(1, 1, 1));
++        }
++
++        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
++                                                  ConsistencyLevel.QUORUM),
++                   row(1, 1, 1));
++    }
++
++    @Test
++    public void readRepairTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++
++        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
++        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
++
++        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
++
++        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
++                                                  ConsistencyLevel.ALL), // ensure node3 in preflist
++                   row(1, 1, 1));
++
++        // Verify that data got repaired to the third node
++        assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
++                   row(1, 1, 1));
++    }
++
++    @Test
++    public void writeWithSchemaDisagreement() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
++
++        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++
++        // Introduce schema disagreement
++        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
++
++        Exception thrown = null;
++        try
++        {
++            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
++                                           ConsistencyLevel.QUORUM);
++        }
++        catch (RuntimeException e)
++        {
++            thrown = e;
++        }
++
++        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
++        Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
++    }
++
++    @Test
++    public void readWithSchemaDisagreement() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
++
++        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++
++        // Introduce schema disagreement
++        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
++
++        Exception thrown = null;
++        try
++        {
++            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
++                                                      ConsistencyLevel.ALL),
++                       row(1, 1, 1, null));
++        }
++        catch (Exception e)
++        {
++            thrown = e;
++        }
++
++        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
++        Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
++    }
++
++    @Test
++    public void simplePagedReadsTest() throws Throwable
++    {
++
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++
++        int size = 100;
++        Object[][] results = new Object[size][];
++        for (int i = 0; i < size; i++)
++        {
++            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
++                                           ConsistencyLevel.QUORUM,
++                                           i, i);
++            results[i] = new Object[]{ 1, i, i };
++        }
++
++        // Make sure paged read returns same results with different page sizes
++        for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
++        {
++            assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
++                                                                ConsistencyLevel.QUORUM,
++                                                                pageSize),
++                       results);
++        }
++    }
++
++    @Test
++    public void pagingWithRepairTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++
++        int size = 100;
++        Object[][] results = new Object[size][];
++        for (int i = 0; i < size; i++)
++        {
++            // Make sure that data lands on different nodes and not coordinator
++            cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
++                                                            i, i);
++
++            results[i] = new Object[]{ 1, i, i };
++        }
++
++        // Make sure paged read returns same results with different page sizes
++        for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
++        {
++            assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
++                                                                ConsistencyLevel.ALL,
++                                                                pageSize),
++                       results);
++        }
++
++        assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
++                   results);
++    }
++
++    @Test
++    public void pagingTests() throws Throwable
++    {
++        try (ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
++        {
++            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++            singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++
++            for (int i = 0; i < 10; i++)
++            {
++                for (int j = 0; j < 10; j++)
++                {
++                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
++                                                   ConsistencyLevel.QUORUM,
++                                                   i, j, i + i);
++                    singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
++                                                      ConsistencyLevel.QUORUM,
++                                                      i, j, i + i);
++                }
++            }
++
++            int[] pageSizes = new int[]{ 1, 2, 3, 5, 10, 20, 50 };
++            String[] statements = new String[]{ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
++                                                "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
++                                                "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3",
++                                                "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
++                                                "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
++            };
++            for (String statement : statements)
++            {
++                for (int pageSize : pageSizes)
++                {
++                    assertRows(cluster.coordinator(1)
++                                      .executeWithPaging(statement,
++                                                         ConsistencyLevel.QUORUM, pageSize),
++                               singleNode.coordinator(1)
++                                         .executeWithPaging(statement,
++                                                            ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
++                }
++            }
++        }
++    }
++
++    @Test
++    public void metricsCountQueriesTest() throws Throwable
++    {
++        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
++        for (int i = 0; i < 100; i++)
++            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
++
++        long readCount1 = readCount((IInvokableInstance) cluster.get(1));
++        long readCount2 = readCount((IInvokableInstance) cluster.get(2));
++        for (int i = 0; i < 100; i++)
++            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
++
++        readCount1 = readCount((IInvokableInstance) cluster.get(1)) - readCount1;
++        readCount2 = readCount((IInvokableInstance) cluster.get(2)) - readCount2;
++        assertEquals(readCount1, readCount2);
++        assertEquals(100, readCount1);
++    }
++
++    private long readCount(IInvokableInstance instance)
++    {
++        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
++    }
++}
diff --cc test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 0000000,a89a352..0e0561a
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@@ -1,0 -1,49 +1,47 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import org.junit.After;
+ import org.junit.BeforeClass;
+ 
+ import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInstance;
+ import org.apache.cassandra.distributed.shared.Builder;
+ import org.apache.cassandra.distributed.shared.DistributedTestBase;
+ 
+ public class TestBaseImpl extends DistributedTestBase
+ {
 -    protected static final TestBaseImpl impl = new TestBaseImpl();
 -
+     @After
+     public void afterEach() {
+         super.afterEach();
+     }
+ 
+     @BeforeClass
+     public static void beforeClass() throws Throwable {
+         ICluster.setup();
+     }
+ 
+     @Override
+     public <I extends IInstance, C extends ICluster> Builder<I, C> builder() {
+         // This is definitely not the smartest solution, but given the complexity of the alternatives and low risk, we can just rely on the
+         // fact that this code is going to work accross _all_ versions.
+         return (Builder<I, C>) org.apache.cassandra.distributed.Cluster.build();
+     }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
index 5c45d52,0000000..f138861
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage2to3UpgradeTest.java
@@@ -1,102 -1,0 +1,99 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.upgrade;
 +
 +import org.junit.Test;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICoordinator;
- import org.apache.cassandra.distributed.impl.Versions;
- import org.apache.cassandra.distributed.test.DistributedTestBase;
++import org.apache.cassandra.distributed.shared.DistributedTestBase;
++import org.apache.cassandra.distributed.shared.Versions;
++import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 +
 +public class CompactStorage2to3UpgradeTest extends UpgradeTestBase
 +{
 +    @Test
 +    public void multiColumn() throws Throwable
 +    {
 +        new TestCase()
 +        .upgrade(Versions.Major.v22, Versions.Major.v30)
 +        .setup(cluster -> {
 +            assert cluster.size() == 3;
 +            int rf = cluster.size() - 1;
 +            assert rf == 2;
 +            cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
 +            cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v1 int, v2 text, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
 +            ICoordinator coordinator = cluster.coordinator(1);
 +            // these shouldn't be replicated by the 3rd node
 +            coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (3, 3, '3')", ConsistencyLevel.ALL);
 +            coordinator.execute("INSERT INTO ks.tbl (pk, v1, v2) VALUES (9, 9, '9')", ConsistencyLevel.ALL);
-             for (int i=0; i<cluster.size(); i++)
++            for (int i = 0; i < cluster.size(); i++)
 +            {
-                 int nodeNum = i+1;
++                int nodeNum = i + 1;
 +                System.out.println(String.format("****** node %s: %s", nodeNum, cluster.get(nodeNum).config()));
 +            }
- 
 +        })
 +        .runAfterNodeUpgrade(((cluster, node) -> {
 +            if (node != 2)
 +                return;
 +
 +            Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
 +            Object[][] expected = {
-                 DistributedTestBase.row(9, 9, "9"),
-                 DistributedTestBase.row(3, 3, "3")
++            row(9, 9, "9"),
++            row(3, 3, "3")
 +            };
-             DistributedTestBase.assertRows(rows, expected);
- 
++            assertRows(rows, expected);
 +        })).run();
 +    }
 +
 +    @Test
 +    public void singleColumn() throws Throwable
 +    {
 +        new TestCase()
 +        .upgrade(Versions.Major.v22, Versions.Major.v30)
 +        .setup(cluster -> {
 +            assert cluster.size() == 3;
 +            int rf = cluster.size() - 1;
 +            assert rf == 2;
 +            cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + (cluster.size() - 1) + "};");
 +            cluster.schemaChange("CREATE TABLE ks.tbl (pk int, v int, PRIMARY KEY (pk)) WITH COMPACT STORAGE");
 +            ICoordinator coordinator = cluster.coordinator(1);
 +            // these shouldn't be replicated by the 3rd node
 +            coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (3, 3)", ConsistencyLevel.ALL);
 +            coordinator.execute("INSERT INTO ks.tbl (pk, v) VALUES (9, 9)", ConsistencyLevel.ALL);
-             for (int i=0; i<cluster.size(); i++)
++            for (int i = 0; i < cluster.size(); i++)
 +            {
-                 int nodeNum = i+1;
++                int nodeNum = i + 1;
 +                System.out.println(String.format("****** node %s: %s", nodeNum, cluster.get(nodeNum).config()));
 +            }
- 
 +        })
 +        .runAfterNodeUpgrade(((cluster, node) -> {
 +
 +            if (node < 2)
 +                return;
 +
 +            Object[][] rows = cluster.coordinator(3).execute("SELECT * FROM ks.tbl LIMIT 2", ConsistencyLevel.ALL);
 +            Object[][] expected = {
-                 DistributedTestBase.row(9, 9),
-                 DistributedTestBase.row(3, 3)
++            row(9, 9),
++            row(3, 3)
 +            };
-             DistributedTestBase.assertRows(rows, expected);
- 
++            assertRows(rows, expected);
 +        })).run();
 +    }
- }
++}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index 31f4b84,e69e38a..b98829d
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@@ -20,11 -20,10 +20,11 @@@ package org.apache.cassandra.distribute
  
  import org.junit.Test;
  
- import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.distributed.impl.Versions;
- import org.apache.cassandra.distributed.test.DistributedTestBase;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.shared.DistributedTestBase;
+ import org.apache.cassandra.distributed.shared.Versions;
  
- import static org.apache.cassandra.distributed.impl.Versions.find;
+ import static org.apache.cassandra.distributed.shared.Versions.find;
  
  public class MixedModeReadRepairTest extends UpgradeTestBase
  {
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 5a927fc,93ae78e..81e580d
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@@ -18,16 -18,13 +18,16 @@@
  
  package org.apache.cassandra.distributed.upgrade;
  
 +import java.util.Iterator;
 +
 +import com.google.common.collect.Iterators;
  import org.junit.Test;
  
- import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.distributed.impl.Versions;
- import org.apache.cassandra.distributed.test.DistributedTestBase;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.shared.Versions;
  
- import static junit.framework.Assert.assertEquals;
+ import junit.framework.Assert;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.*;
  
  public class UpgradeTest extends UpgradeTestBase
  {
@@@ -36,55 -33,22 +36,54 @@@
      public void upgradeTest() throws Throwable
      {
          new TestCase()
-             .upgrade(Versions.Major.v22, Versions.Major.v30)
-             .setup((cluster) -> {
-                 cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 -        .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
++        .upgrade(Versions.Major.v22, Versions.Major.v30)
+         .setup((cluster) -> {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
  
-                 cluster.get(1).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-                 cluster.get(2).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
-                 cluster.get(3).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-             })
-             .runAfterClusterUpgrade((cluster) -> {
-                 DistributedTestBase.assertRows(cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
-                                                                               ConsistencyLevel.ALL,
-                                                                               1),
-                                                DistributedTestBase.row(1, 1, 1),
-                                                DistributedTestBase.row(1, 2, 2),
-                                                DistributedTestBase.row(1, 3, 3));
-             }).run();
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+             cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+         })
+         .runAfterClusterUpgrade((cluster) -> {
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 -                                                      ConsistencyLevel.ALL,
 -                                                      1),
 -                       row(1, 1, 1),
 -                       row(1, 2, 2),
 -                       row(1, 3, 3));
++                                                                          ConsistencyLevel.ALL,
++                                                                          1),
++                                           row(1, 1, 1),
++                                           row(1, 2, 2),
++                                           row(1, 3, 3));
+         }).run();
      }
  
 +    @Test
 +    public void mixedModePagingTest() throws Throwable
 +    {
 +        new TestCase()
 +        .upgrade(Versions.Major.v22, Versions.Major.v30)
 +        .nodes(2)
 +        .nodesToUpgrade(2)
 +        .setup((cluster) -> {
-             cluster.schemaChange("ALTER KEYSPACE " + DistributedTestBase.KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
-             cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) with compact storage");
++            cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
++            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) with compact storage");
 +            for (int i = 0; i < 100; i++)
 +                for (int j = 0; j < 200; j++)
-                     cluster.coordinator(2).execute("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j);
-             cluster.forEach((i) -> i.flush(DistributedTestBase.KEYSPACE));
++                    cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j);
++            cluster.forEach((i) -> i.flush(KEYSPACE));
 +            for (int i = 0; i < 100; i++)
 +                for (int j = 10; j < 30; j++)
-                     cluster.coordinator(2).execute("DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j);
-             cluster.forEach((i) -> i.flush(DistributedTestBase.KEYSPACE));
++                    cluster.coordinator(2).execute("DELETE FROM " + KEYSPACE + ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j);
++            cluster.forEach((i) -> i.flush(KEYSPACE));
 +        })
 +        .runAfterClusterUpgrade((cluster) -> {
 +            for (int i = 0; i < 100; i++)
 +            {
 +                for (int pageSize = 10; pageSize < 100; pageSize++)
 +                {
-                     Iterator<Object[]> res = cluster.coordinator(1).executeWithPaging("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
++                    Iterator<Object[]> res = cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
 +                                                                                      ConsistencyLevel.ALL,
 +                                                                                      pageSize, i);
-                     assertEquals(180, Iterators.size(res));
++                    Assert.assertEquals(180, Iterators.size(res));
 +                }
 +            }
 +        }).run();
 +    }
- 
- }
+ }
diff --cc test/unit/org/apache/cassandra/LogbackStatusListener.java
index d16058b,0000000..1f95bd4
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/LogbackStatusListener.java
+++ b/test/unit/org/apache/cassandra/LogbackStatusListener.java
@@@ -1,538 -1,0 +1,538 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.io.PrintStream;
 +import java.io.UnsupportedEncodingException;
 +import java.util.Locale;
 +
 +import org.slf4j.ILoggerFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import ch.qos.logback.classic.Level;
 +import ch.qos.logback.classic.LoggerContext;
 +import ch.qos.logback.classic.spi.LoggerContextListener;
 +import ch.qos.logback.core.status.Status;
 +import ch.qos.logback.core.status.StatusListener;
- import org.apache.cassandra.distributed.impl.InstanceClassLoader;
++import org.apache.cassandra.distributed.shared.InstanceClassLoader;
 +
 +/*
 + * Listen for logback readiness and then redirect stdout/stderr to logback
 + */
 +public class LogbackStatusListener implements StatusListener, LoggerContextListener
 +{
 +
 +    public static final PrintStream originalOut = System.out;
 +    public static final PrintStream originalErr = System.err;
 +
 +    private volatile boolean hadPreInstallError = false;
 +    private volatile boolean haveInstalled = false;
 +    private volatile boolean haveRegisteredListener = false;
 +
 +    private PrintStream replacementOut;
 +    private PrintStream replacementErr;
 +
 +    @Override
 +    public void addStatusEvent(Status s)
 +    {
 +        if (!haveInstalled && (s.getLevel() != 0 || s.getEffectiveLevel() != 0))
 +        {
 +            // if we encounter an error during setup, we're not sure what state we're in, so we just don't switch
 +            // we should log this fact, though, so that we know that we're not necessarily capturing stdout
 +            LoggerFactory.getLogger(LogbackStatusListener.class)
 +                         .warn("Encountered non-info status in logger setup; aborting stdout capture: '" + s.getMessage() + '\'');
 +            hadPreInstallError = true;
 +        }
 +
 +        if (hadPreInstallError)
 +            return;
 +
 +        if (s.getMessage().startsWith("Registering current configuration as safe fallback point"))
 +        {
 +            onStart(null);
 +        }
 +
 +        if (haveInstalled && !haveRegisteredListener)
 +        {
 +            // we register ourselves as a listener after the fact, because we enable ourselves before the LoggerFactory
 +            // is properly initialised, hence before it can accept any LoggerContextListener registrations
 +            tryRegisterListener();
 +        }
 +
 +        if (s.getMessage().equals("Logback context being closed via shutdown hook"))
 +        {
 +            onStop(null);
 +        }
 +    }
 +
 +    private static PrintStream wrapLogger(Logger logger, PrintStream original, String encodingProperty, boolean error) throws Exception
 +    {
 +        final String encoding = System.getProperty(encodingProperty);
 +        OutputStream os = new ToLoggerOutputStream(logger, encoding, error);
 +        return encoding != null ? new WrappedPrintStream(os, true, encoding, original)
 +                                : new WrappedPrintStream(os, true, original);
 +    }
 +
 +    private static class ToLoggerOutputStream extends ByteArrayOutputStream
 +    {
 +        final Logger logger;
 +        final String encoding;
 +        final boolean error;
 +
 +        private ToLoggerOutputStream(Logger logger, String encoding, boolean error)
 +        {
 +            this.logger = logger;
 +            this.encoding = encoding;
 +            this.error = error;
 +        }
 +
 +        @Override
 +        public void flush() throws IOException
 +        {
 +            try
 +            {
 +                //Filter out stupid PrintStream empty flushes
 +                if (size() == 0) return;
 +
 +                //Filter out newlines, log framework provides its own
 +                if (size() == 1)
 +                {
 +                    byte[] bytes = toByteArray();
 +                    if (bytes[0] == 0xA)
 +                        return;
 +                }
 +
 +                //Filter out Windows newline
 +                if (size() == 2)
 +                {
 +                    byte[] bytes = toByteArray();
 +                    if (bytes[0] == 0xD && bytes[1] == 0xA)
 +                        return;
 +                }
 +
 +                String statement;
 +                if (encoding != null)
 +                    statement = new String(toByteArray(), encoding);
 +                else
 +                    statement = new String(toByteArray());
 +
 +                if (error)
 +                    logger.error(statement);
 +                else
 +                    logger.info(statement);
 +            }
 +            finally
 +            {
 +                reset();
 +            }
 +        }
 +    };
 +
 +    private static class WrappedPrintStream extends PrintStream
 +    {
 +        private long asyncAppenderThreadId = Long.MIN_VALUE;
 +        private final PrintStream original;
 +
 +        public WrappedPrintStream(OutputStream out, boolean autoFlush, PrintStream original)
 +        {
 +            super(out, autoFlush);
 +            this.original = original;
 +        }
 +
 +        public WrappedPrintStream(OutputStream out, boolean autoFlush, String encoding, PrintStream original) throws UnsupportedEncodingException
 +        {
 +            super(out, autoFlush, encoding);
 +            this.original = original;
 +        }
 +
 +        /*
 +         * Long and the short of it is that we don't want to serve logback a fake System.out/err.
 +         * ConsoleAppender is replaced so it always goes to the real System.out/err, but logback itself
 +         * will at times try to log to System.out/err when it has issues.
 +         *
 +         * Now here is the problem. There is a deadlock if a thread logs to System.out, blocks on the async
 +         * appender queue, and the async appender thread tries to log to System.out directly as part of some
 +         * internal logback issue.
 +         *
 +         * So to prevent this we have to exhaustively check before locking in the PrintStream and forward
 +         * to real System.out/err if it is the async appender
 +         */
 +        private boolean isAsyncAppender()
 +        {
 +            //Set the thread id based on the name
 +            Thread currentThread = Thread.currentThread();
 +            long currentThreadId = currentThread.getId();
 +            if (asyncAppenderThreadId == Long.MIN_VALUE &&
 +                currentThread.getName().equals("AsyncAppender-Worker-ASYNC") &&
 +                !InstanceClassLoader.wasLoadedByAnInstanceClassLoader(currentThread.getClass()))
 +            {
 +                asyncAppenderThreadId = currentThreadId;
 +            }
 +            if (currentThreadId == asyncAppenderThreadId)
 +                original.println("Was in async appender");
 +            return currentThreadId == asyncAppenderThreadId;
 +        }
 +
 +        @Override
 +        public void flush()
 +        {
 +            if (isAsyncAppender())
 +                original.flush();
 +            else
 +                super.flush();
 +        }
 +
 +        @Override
 +        public void close()
 +        {
 +            if (isAsyncAppender())
 +                original.close();
 +            else
 +                super.flush();
 +        }
 +
 +        @Override
 +        public void write(int b)
 +        {
 +            if (isAsyncAppender())
 +                original.write(b);
 +            else
 +                super.write(b);
 +        }
 +
 +        @Override
 +        public void write(byte[] buf, int off, int len)
 +        {
 +            if (isAsyncAppender())
 +                original.write(buf, off, len);
 +            else
 +                super.write(buf, off, len);
 +        }
 +
 +        @Override
 +        public void print(boolean b)
 +        {
 +            if (isAsyncAppender())
 +                original.print(b);
 +            else
 +                super.print(b);
 +        }
 +
 +        @Override
 +        public void print(char c)
 +        {
 +            if (isAsyncAppender())
 +                original.print(c);
 +            else
 +                super.print(c);
 +        }
 +
 +        @Override
 +        public void print(int i)
 +        {
 +            if (isAsyncAppender())
 +                original.print(i);
 +            else
 +                super.print(i);
 +        }
 +
 +        @Override
 +        public void print(long l)
 +        {
 +            if (isAsyncAppender())
 +                original.print(l);
 +            else
 +                super.print(l);
 +        }
 +
 +        @Override
 +        public void print(float f)
 +        {
 +            if (isAsyncAppender())
 +                original.print(f);
 +            else
 +                super.print(f);
 +        }
 +
 +        @Override
 +        public void print(double d)
 +        {
 +            if (isAsyncAppender())
 +                original.print(d);
 +            else
 +                super.print(d);
 +        }
 +
 +        @Override
 +        public void print(char[] s)
 +        {
 +            if(isAsyncAppender())
 +                original.println(s);
 +            else
 +                super.print(s);
 +        }
 +
 +        @Override
 +        public void print(String s)
 +        {
 +            if (isAsyncAppender())
 +                original.print(s);
 +            else
 +                super.print(s);
 +        }
 +
 +        @Override
 +        public void print(Object obj)
 +        {
 +            if (isAsyncAppender())
 +                original.print(obj);
 +            else
 +                super.print(obj);
 +        }
 +
 +        @Override
 +        public void println()
 +        {
 +            if (isAsyncAppender())
 +                original.println();
 +            else
 +                super.println();
 +        }
 +
 +        @Override
 +        public void println(boolean v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(char v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(int v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(long v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(float v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(double v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(char[] v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(String v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public void println(Object v)
 +        {
 +            if (isAsyncAppender())
 +                original.println(v);
 +            else
 +                super.println(v);
 +        }
 +
 +        @Override
 +        public PrintStream printf(String format, Object... args)
 +        {
 +            if (isAsyncAppender())
 +                return original.printf(format, args);
 +            else
 +                return super.printf(format, args);
 +        }
 +
 +        @Override
 +        public PrintStream printf(Locale l, String format, Object... args)
 +        {
 +            if (isAsyncAppender())
 +                return original.printf(l, format, args);
 +            else
 +                return super.printf(l, format, args);
 +        }
 +
 +        @Override
 +        public PrintStream format(String format, Object... args)
 +        {
 +            if (isAsyncAppender())
 +                return original.format(format, args);
 +            else
 +                return super.format(format, args);
 +        }
 +
 +        @Override
 +        public PrintStream format(Locale l, String format, Object... args)
 +        {
 +            if (isAsyncAppender())
 +                return original.format(l, format, args);
 +            else
 +                return super.format(l, format, args);
 +        }
 +
 +        @Override
 +        public PrintStream append(CharSequence csq)
 +        {
 +            if (isAsyncAppender())
 +                return original.append(csq);
 +            else
 +                return super.append(csq);
 +        }
 +
 +        @Override
 +        public PrintStream append(CharSequence csq, int start, int end)
 +        {
 +            if (isAsyncAppender())
 +                return original.append(csq, start, end);
 +            else
 +                return super.append(csq, start, end);
 +        }
 +
 +        @Override
 +        public PrintStream append(char c)
 +        {
 +            if (isAsyncAppender())
 +                return original.append(c);
 +            else
 +                return super.append(c);
 +        }    }
 +
 +    public boolean isResetResistant()
 +    {
 +        return false;
 +    }
 +
 +    public synchronized void onStart(LoggerContext loggerContext)
 +    {
 +        if (!hadPreInstallError && !haveInstalled)
 +        {
 +            if (InstanceClassLoader.wasLoadedByAnInstanceClassLoader(getClass())
 +                || System.out.getClass().getName().contains("LogbackStatusListener"))
 +            {
 +                // don't operate if we're a dtest node, or if we're not the first to swap System.out for some other reason
 +                hadPreInstallError = true;
 +                return;
 +            }
 +            try
 +            {
 +                Logger stdoutLogger = LoggerFactory.getLogger("stdout");
 +                Logger stderrLogger = LoggerFactory.getLogger("stderr");
 +
 +                replacementOut = wrapLogger(stdoutLogger, originalOut, "sun.stdout.encoding", false);
 +                System.setOut(replacementOut);
 +                replacementErr = wrapLogger(stderrLogger, originalErr, "sun.stderr.encoding", true);
 +                System.setErr(replacementErr);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            haveInstalled = true;
 +        }
 +    }
 +
 +    public synchronized void onReset(LoggerContext loggerContext)
 +    {
 +        onStop(loggerContext);
 +    }
 +
 +    public synchronized void onStop(LoggerContext loggerContext)
 +    {
 +        if (haveInstalled)
 +        {
 +            if (replacementOut != null) replacementOut.flush();
 +            if (replacementErr != null) replacementErr.flush();
 +            System.setErr(originalErr);
 +            System.setOut(originalOut);
 +            hadPreInstallError = false;
 +            haveInstalled = false;
 +            haveRegisteredListener = false;
 +            if (haveRegisteredListener)
 +            {
 +                ((LoggerContext)LoggerFactory.getILoggerFactory()).removeListener(this);
 +            }
 +        }
 +    }
 +
 +    public void onLevelChange(ch.qos.logback.classic.Logger logger, Level level)
 +    {
 +    }
 +
 +    private synchronized void tryRegisterListener()
 +    {
 +        if (haveInstalled && !haveRegisteredListener)
 +        {
 +            ILoggerFactory factory = LoggerFactory.getILoggerFactory();
 +            if (factory instanceof LoggerContext)
 +            {
 +                ((LoggerContext) factory).addListener(this);
 +                haveRegisteredListener = true;
 +            }
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org