You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/02/16 15:28:44 UTC

[cassandra] branch trunk updated: Improve coordination tests

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6665fc2  Improve coordination tests
6665fc2 is described below

commit 6665fc29b33abcc26aad4cecbfee88225b0a7225
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Sat Oct 31 11:13:56 2020 +0000

    Improve coordination tests
    
    patch by Andrés de la Peña; reviewed by Caleb Rackliffe, Alex Petrov and Blake Eggleston for CASSANDRA-16180
---
 src/java/org/apache/cassandra/dht/Token.java       |   9 -
 .../org/apache/cassandra/service/StorageProxy.java | 470 +---------------
 .../service/reads/AbstractReadExecutor.java        |  17 +-
 .../service/reads/range/RangeCommandIterator.java  | 291 ++++++++++
 .../service/reads/range/RangeCommands.java         | 117 ++++
 .../service/reads/range/ReplicaPlanIterator.java   | 123 +++++
 .../service/reads/range/ReplicaPlanMerger.java     |  76 +++
 .../service/reads/range/SingleRangeResponse.java   |  75 +++
 .../distributed/impl/AbstractCluster.java          |   6 +-
 .../distributed/test/LargeMessageTest.java         |  47 ++
 .../distributed/test/MetricsCountQueriesTest.java  |  57 ++
 .../cassandra/distributed/test/PagingTest.java     |  81 +++
 .../test/ReadRepairEmptyRangeTombstonesTest.java   |   1 -
 .../distributed/test/ReadRepairQueryTypesTest.java |   6 -
 .../distributed/test/ReadRepairTester.java         |   3 -
 .../distributed/test/SSTableSkippingReadTest.java  | 127 +++++
 .../distributed/test/SchemaDisagreementTest.java   | 144 +++++
 .../distributed/test/ShortReadProtectionTest.java  | 553 ++++++++++++++++---
 .../distributed/test/SimpleReadWriteTest.java      | 592 ++++++++-------------
 .../cassandra/distributed/test/TestBaseImpl.java   |  10 +
 .../upgrade/MixedModeAvailabilityTestBase.java     | 170 ++++++
 .../upgrade/MixedModeAvailabilityV22Test.java      |  41 ++
 .../upgrade/MixedModeAvailabilityV30Test.java      |  41 ++
 .../upgrade/MixedModeAvailabilityV3XTest.java      |  35 ++
 .../upgrade/MixedModeConsistencyTestBase.java      | 123 +++++
 .../upgrade/MixedModeConsistencyV22Test.java       |  41 ++
 .../upgrade/MixedModeConsistencyV30Test.java       |  41 ++
 .../upgrade/MixedModeConsistencyV3XTest.java       |  35 ++
 .../cassandra/distributed/upgrade/UpgradeTest.java |  27 -
 test/unit/org/apache/cassandra/Util.java           |   5 +
 .../cassandra/db/PartitionRangeReadTest.java       | 196 +------
 .../apache/cassandra/service/StorageProxyTest.java | 239 ---------
 .../reads/range/RangeCommandIteratorTest.java      | 180 +++++++
 .../service/reads/range/RangeCommandsTest.java     | 281 ++++++++++
 .../reads/range/ReplicaPlanIteratorTest.java       | 211 ++++++++
 .../service/reads/range/ReplicaPlanMergerTest.java | 412 ++++++++++++++
 .../service/reads/range/TokenUpdater.java          | 133 +++++
 37 files changed, 3666 insertions(+), 1350 deletions(-)

diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index ccb66fd..d8e82f8 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -159,15 +159,6 @@ public abstract class Token implements RingPosition<Token>, Serializable
         return new KeyBound(this, false);
     }
 
-    @SuppressWarnings("unchecked")
-    public <R extends RingPosition<R>> R upperBound(Class<R> klass)
-    {
-        if (klass.equals(getClass()))
-            return (R)this;
-        else
-            return (R)maxKeyBound();
-    }
-
     public static class KeyBound implements PartitionPosition
     {
         private final Token token;
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index df3a6f5..b7b9f2c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -21,11 +21,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -42,12 +40,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
@@ -58,14 +53,12 @@ import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.PartitionRangeReadCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
@@ -73,7 +66,6 @@ import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.TruncateRequest;
 import org.apache.cassandra.db.WriteType;
-import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionIterator;
@@ -82,9 +74,6 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.CasWriteTimeoutException;
 import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
@@ -102,19 +91,15 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
-import org.apache.cassandra.index.Index;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.CASClientRequestMetrics;
 import org.apache.cassandra.metrics.CASClientWriteRequestMetrics;
 import org.apache.cassandra.metrics.ClientRequestMetrics;
@@ -136,12 +121,11 @@ import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.PrepareCallback;
 import org.apache.cassandra.service.paxos.ProposeCallback;
 import org.apache.cassandra.service.reads.AbstractReadExecutor;
-import org.apache.cassandra.service.reads.DataResolver;
 import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.range.RangeCommands;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.MonotonicClock;
@@ -183,7 +167,6 @@ public class StorageProxy implements StorageProxyMBean
         }
     };
     private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read");
-    private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
     private static final ClientWriteRequestMetrics writeMetrics = new ClientWriteRequestMetrics("Write");
     private static final CASClientWriteRequestMetrics casWriteMetrics = new CASClientWriteRequestMetrics("CASWrite");
     private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
@@ -191,17 +174,6 @@ public class StorageProxy implements StorageProxyMBean
     private static final Map<ConsistencyLevel, ClientRequestMetrics> readMetricsMap = new EnumMap<>(ConsistencyLevel.class);
     private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> writeMetricsMap = new EnumMap<>(ConsistencyLevel.class);
 
-    private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
-
-    /**
-     * Introduce a maximum number of sub-ranges that the coordinator can request in parallel for range queries. Previously
-     * we would request up to the maximum number of ranges but this causes problems if the number of vnodes is large.
-     * By default we pick 10 requests per core, assuming all replicas have the same number of cores. The idea is that we
-     * don't want a burst of range requests that will back up, hurting all other queries. At the same time,
-     * we want to give range queries a chance to run if resources are available.
-     */
-    private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests", FBUtilities.getAvailableProcessors() * 10));
-
     private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = "cassandra.unsafe.disable-serial-reads-linearizability";
     private static final boolean disableSerialReadLinearizability =
         Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY, "false"));
@@ -1105,7 +1077,7 @@ public class StorageProxy implements StorageProxyMBean
         Tracing.trace("Determining replicas for atomic batch");
         long startTime = System.nanoTime();
 
-        List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
+        List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
 
         if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas()))
             throw new AssertionError("Logged batches are unsupported with transient replication");
@@ -1887,7 +1859,7 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair> repairs)
+    public static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair<?, ?>> repairs)
     {
         PartitionIterator concatenated = PartitionIterators.concat(iterators);
 
@@ -1977,7 +1949,7 @@ public class StorageProxy implements StorageProxyMBean
         // if we didn't do a read repair, return the contents of the data response, if we did do a read
         // repair, merge the full data reads
         List<PartitionIterator> results = new ArrayList<>(cmdCount);
-        List<ReadRepair> repairs = new ArrayList<>(cmdCount);
+        List<ReadRepair<?, ?>> repairs = new ArrayList<>(cmdCount);
         for (int i=0; i<cmdCount; i++)
         {
             results.add(reads[i].getResult());
@@ -2041,390 +2013,11 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    /**
-     * Estimate the number of result rows per range in the ring based on our local data.
-     * <p>
-     * This assumes that ranges are uniformly distributed across the cluster and
-     * that the queried data is also uniformly distributed.
-     */
-    private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
+    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command,
+                                                  ConsistencyLevel consistencyLevel,
+                                                  long queryStartNanoTime)
     {
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
-        Index index = command.getIndex(cfs);
-        float maxExpectedResults = index == null
-                                 ? command.limits().estimateTotalResults(cfs)
-                                 : index.getEstimatedResultRows();
-
-        // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
-        return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
-    }
-
-    @VisibleForTesting
-    public static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
-    {
-        private final Keyspace keyspace;
-        private final ConsistencyLevel consistency;
-        private final Iterator<? extends AbstractBounds<PartitionPosition>> ranges;
-        private final int rangeCount;
-
-        public RangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistency)
-        {
-            this.keyspace = keyspace;
-            this.consistency = consistency;
-
-            List<? extends AbstractBounds<PartitionPosition>> l = keyspace.getReplicationStrategy() instanceof LocalStrategy
-                                                          ? command.dataRange().keyRange().unwrap()
-                                                          : getRestrictedRanges(command.dataRange().keyRange());
-            this.ranges = l.iterator();
-            this.rangeCount = l.size();
-        }
-
-        public int rangeCount()
-        {
-            return rangeCount;
-        }
-
-        protected ReplicaPlan.ForRangeRead computeNext()
-        {
-            if (!ranges.hasNext())
-                return endOfData();
-
-            return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next(), 1);
-        }
-    }
-
-    public static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
-    {
-        private final Keyspace keyspace;
-        private final ConsistencyLevel consistency;
-        private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
-
-        public RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
-        {
-            this.keyspace = keyspace;
-            this.consistency = consistency;
-            this.ranges = Iterators.peekingIterator(iterator);
-        }
-
-        protected ReplicaPlan.ForRangeRead computeNext()
-        {
-            if (!ranges.hasNext())
-                return endOfData();
-
-            ReplicaPlan.ForRangeRead current = ranges.next();
-
-            // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
-            // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
-            // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
-            while (ranges.hasNext())
-            {
-                // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
-                // don't know how to deal with a wrapping range.
-                // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
-                // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
-                // wire compatibility, so It's likely easier not to bother;
-                if (current.range().right.isMinimum())
-                    break;
-
-                ReplicaPlan.ForRangeRead next = ranges.peek();
-                ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
-                if (merged == null)
-                    break;
-
-                current = merged;
-                ranges.next(); // consume the range we just merged since we've only peeked so far
-            }
-            return current;
-        }
-    }
-
-    private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
-    {
-        private final DataResolver resolver;
-        private final ReadCallback handler;
-        private final ReadRepair readRepair;
-        private PartitionIterator result;
-
-        private SingleRangeResponse(DataResolver resolver, ReadCallback handler, ReadRepair readRepair)
-        {
-            this.resolver = resolver;
-            this.handler = handler;
-            this.readRepair = readRepair;
-        }
-
-        private void waitForResponse() throws ReadTimeoutException
-        {
-            if (result != null)
-                return;
-
-            handler.awaitResults();
-            result = resolver.resolve();
-        }
-
-        protected RowIterator computeNext()
-        {
-            waitForResponse();
-            return result.hasNext() ? result.next() : endOfData();
-        }
-
-        public void close()
-        {
-            if (result != null)
-                result.close();
-        }
-    }
-
-    public static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
-    {
-        private final Iterator<ReplicaPlan.ForRangeRead> ranges;
-        private final int totalRangeCount;
-        private final PartitionRangeReadCommand command;
-        private final boolean enforceStrictLiveness;
-
-        private final long startTime;
-        private final long queryStartNanoTime;
-        private DataLimits.Counter counter;
-        private PartitionIterator sentQueryIterator;
-
-        private final int maxConcurrencyFactor;
-        private int concurrencyFactor;
-        // The two following "metric" are maintained to improve the concurrencyFactor
-        // when it was not good enough initially.
-        private int liveReturned;
-        private int rangesQueried;
-        private int batchesRequested = 0;
-
-        public RangeCommandIterator(Iterator<ReplicaPlan.ForRangeRead> ranges,
-                                    PartitionRangeReadCommand command,
-                                    int concurrencyFactor,
-                                    int maxConcurrencyFactor,
-                                    int totalRangeCount,
-                                    long queryStartNanoTime)
-        {
-            this.command = command;
-            this.concurrencyFactor = concurrencyFactor;
-            this.maxConcurrencyFactor = maxConcurrencyFactor;
-            this.startTime = System.nanoTime();
-            this.ranges = ranges;
-            this.totalRangeCount = totalRangeCount;
-            this.queryStartNanoTime = queryStartNanoTime;
-            this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
-        }
-
-        public RowIterator computeNext()
-        {
-            try
-            {
-                while (sentQueryIterator == null || !sentQueryIterator.hasNext())
-                {
-                    // If we don't have more range to handle, we're done
-                    if (!ranges.hasNext())
-                        return endOfData();
-
-                    // else, sends the next batch of concurrent queries (after having close the previous iterator)
-                    if (sentQueryIterator != null)
-                    {
-                        liveReturned += counter.counted();
-                        sentQueryIterator.close();
-
-                        // It's not the first batch of queries and we're not done, so we we can use what has been
-                        // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
-                        updateConcurrencyFactor();
-                    }
-                    sentQueryIterator = sendNextRequests();
-                }
-
-                return sentQueryIterator.next();
-            }
-            catch (UnavailableException e)
-            {
-                rangeMetrics.unavailables.mark();
-                throw e;
-            }
-            catch (ReadTimeoutException e)
-            {
-                rangeMetrics.timeouts.mark();
-                throw e;
-            }
-            catch (ReadFailureException e)
-            {
-                rangeMetrics.failures.mark();
-                throw e;
-            }
-        }
-
-        private void updateConcurrencyFactor()
-        {
-            liveReturned += counter.counted();
-
-            concurrencyFactor = computeConcurrencyFactor(totalRangeCount, rangesQueried, maxConcurrencyFactor, command.limits().count(), liveReturned);
-        }
-
-        @VisibleForTesting
-        public static int computeConcurrencyFactor(int totalRangeCount, int rangesQueried, int maxConcurrencyFactor, int limit, int liveReturned)
-        {
-            maxConcurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, totalRangeCount - rangesQueried));
-            if (liveReturned == 0)
-            {
-                // we haven't actually gotten any results, so query up to the limit if not results so far
-                Tracing.trace("Didn't get any response rows; new concurrent requests: {}", maxConcurrencyFactor);
-                return maxConcurrencyFactor;
-            }
-
-            // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
-            // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
-            int remainingRows = limit - liveReturned;
-            float rowsPerRange = (float)liveReturned / (float)rangesQueried;
-            int concurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, Math.round(remainingRows / rowsPerRange)));
-            logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
-                         rowsPerRange, remainingRows, concurrencyFactor);
-            return concurrencyFactor;
-        }
-
-        /**
-         * Queries the provided sub-range.
-         *
-         * @param replicaPlan the subRange to query.
-         * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
-         * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
-         * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
-         * that it's the query that "continues" whatever we're previously queried).
-         */
-        private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
-        {
-            PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
-            // If enabled, request repaired data tracking info from full replicas but
-            // only if there are multiple full replicas to compare results from
-            if (DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
-                && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
-            {
-                command.trackRepairedStatus();
-                rangeCommand.trackRepairedStatus();
-            }
-
-            ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
-            ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
-                    = ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
-            DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
-                    = new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
-            ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
-                    = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
-
-
-            if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf())
-            {
-                Stage.READ.execute(new LocalReadRunnable(rangeCommand, handler));
-            }
-            else
-            {
-                for (Replica replica : replicaPlan.contacts())
-                {
-                    Tracing.trace("Enqueuing request to {}", replica);
-                    ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica);
-                    Message<ReadCommand> message = command.createMessage(command.isTrackingRepairedStatus() && replica.isFull());
-                    MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
-                }
-            }
-
-            return new SingleRangeResponse(resolver, handler, readRepair);
-        }
-
-        private PartitionIterator sendNextRequests()
-        {
-            List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
-            List<ReadRepair> readRepairs = new ArrayList<>(concurrencyFactor);
-
-            try
-            {
-                for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
-                {
-                    ReplicaPlan.ForRangeRead range = ranges.next();
-
-                    @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below
-                    SingleRangeResponse response = query(range, i == 0);
-                    concurrentQueries.add(response);
-                    readRepairs.add(response.readRepair);
-                    // due to RangeMerger, coordinator may fetch more ranges than required by concurrency factor.
-                    rangesQueried += range.vnodeCount();
-                    i += range.vnodeCount();
-                }
-                batchesRequested++;
-            }
-            catch (Throwable t)
-            {
-                for (PartitionIterator response: concurrentQueries)
-                    response.close();
-                throw t;
-            }
-
-            Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
-            // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
-            // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
-            counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
-            return counter.applyTo(concatAndBlockOnRepair(concurrentQueries, readRepairs));
-        }
-
-        public void close()
-        {
-            try
-            {
-                if (sentQueryIterator != null)
-                    sentQueryIterator.close();
-            }
-            finally
-            {
-                long latency = System.nanoTime() - startTime;
-                rangeMetrics.addNano(latency);
-                Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
-            }
-        }
-
-        @VisibleForTesting
-        public int rangesQueried()
-        {
-            return rangesQueried;
-        }
-
-        @VisibleForTesting
-        public int batchesRequested()
-        {
-            return batchesRequested;
-        }
-    }
-
-    @SuppressWarnings("resource")
-    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
-    {
-        Tracing.trace("Computing ranges to query");
-
-        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-        RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel);
-
-        // our estimate of how many result rows there will be per-range
-        float resultsPerRange = estimateResultsPerRange(command, keyspace);
-        // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
-        // fetch enough rows in the first round
-        resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
-        int maxConcurrencyFactor = Math.min(ranges.rangeCount(), MAX_CONCURRENT_RANGE_REQUESTS);
-        int concurrencyFactor = resultsPerRange == 0.0
-                                ? 1
-                                : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
-        logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
-                     resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
-        Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
-
-        // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
-        RangeMerger mergedRanges = new RangeMerger(ranges, keyspace, consistencyLevel);
-        RangeCommandIterator rangeCommandIterator = new RangeCommandIterator(mergedRanges,
-                                                                             command,
-                                                                             concurrencyFactor,
-                                                                             maxConcurrencyFactor,
-                                                                             ranges.rangeCount(),
-                                                                             queryStartNanoTime);
-        return command.limits().filter(command.postReconciliationProcessing(rangeCommandIterator),
-                                       command.nowInSec(),
-                                       command.selectsFullPartition(),
-                                       command.metadata().enforceStrictLiveness());
+        return RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime);
     }
 
     public Map<String, List<String>> getSchemaVersions()
@@ -2503,53 +2096,6 @@ public class StorageProxy implements StorageProxyMBean
         return results;
     }
 
-    /**
-     * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
-     * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
-     */
-    static <T extends RingPosition<T>> List<AbstractBounds<T>> getRestrictedRanges(final AbstractBounds<T> queryRange)
-    {
-        // special case for bounds containing exactly 1 (non-minimum) token
-        if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
-        {
-            return Collections.singletonList(queryRange);
-        }
-
-        TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
-
-        List<AbstractBounds<T>> ranges = new ArrayList<AbstractBounds<T>>();
-        // divide the queryRange into pieces delimited by the ring and minimum tokens
-        Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
-        AbstractBounds<T> remainder = queryRange;
-        while (ringIter.hasNext())
-        {
-            /*
-             * remainder can be a range/bounds of token _or_ keys and we want to split it with a token:
-             *   - if remainder is tokens, then we'll just split using the provided token.
-             *   - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder
-             *     is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to
-             *     split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix
-             *     tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all
-             *     keys having 15 as token and B include none of those (since that is what our node owns).
-             * asSplitValue() abstracts that choice.
-             */
-            Token upperBoundToken = ringIter.next();
-            T upperBound = (T)upperBoundToken.upperBound(queryRange.left.getClass());
-            if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
-                // no more splits
-                break;
-            Pair<AbstractBounds<T>,AbstractBounds<T>> splits = remainder.split(upperBound);
-            if (splits == null)
-                continue;
-
-            ranges.add(splits.left);
-            remainder = splits.right;
-        }
-        ranges.add(remainder);
-
-        return ranges;
-    }
-
     public boolean getHintedHandoffEnabled()
     {
         return DatabaseDescriptor.hintedHandoffEnabled();
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index ae09417..8a83d3e 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -18,12 +18,6 @@
 package org.apache.cassandra.service.reads;
 
 import com.google.common.base.Preconditions;
-
-import com.google.common.base.Predicates;
-
-import org.apache.cassandra.db.transform.DuplicateRowChecker;
-import org.apache.cassandra.locator.ReplicaPlan;
-import org.apache.cassandra.locator.ReplicaPlans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,10 +25,11 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -42,10 +37,12 @@ import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -106,7 +103,7 @@ public abstract class AbstractReadExecutor
         return ((SinglePartitionReadCommand) command).partitionKey();
     }
 
-    public ReadRepair getReadRepair()
+    public ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> getReadRepair()
     {
         return readRepair;
     }
@@ -282,7 +279,7 @@ public abstract class AbstractReadExecutor
                 Replica extraReplica;
                 if (handler.resolver.isDataPresent())
                 {
-                    extraReplica = replicaPlan.firstUncontactedCandidate(Predicates.alwaysTrue());
+                    extraReplica = replicaPlan.firstUncontactedCandidate(replica -> true);
 
                     // we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
                     assert extraReplica != null;
diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
new file mode 100644
index 0000000..ae7ee60
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+{
+    private static final Logger logger = LoggerFactory.getLogger(RangeCommandIterator.class);
+
+    private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
+
+    private final CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans;
+    private final int totalRangeCount;
+    private final PartitionRangeReadCommand command;
+    private final boolean enforceStrictLiveness;
+
+    private final long startTime;
+    private final long queryStartNanoTime;
+    private DataLimits.Counter counter;
+    private PartitionIterator sentQueryIterator;
+
+    private final int maxConcurrencyFactor;
+    private int concurrencyFactor;
+    // The two following "metric" are maintained to improve the concurrencyFactor
+    // when it was not good enough initially.
+    private int liveReturned;
+    private int rangesQueried;
+    private int batchesRequested = 0;
+
+    RangeCommandIterator(CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans,
+                         PartitionRangeReadCommand command,
+                         int concurrencyFactor,
+                         int maxConcurrencyFactor,
+                         int totalRangeCount,
+                         long queryStartNanoTime)
+    {
+        this.replicaPlans = replicaPlans;
+        this.command = command;
+        this.concurrencyFactor = concurrencyFactor;
+        this.maxConcurrencyFactor = maxConcurrencyFactor;
+        this.totalRangeCount = totalRangeCount;
+        this.queryStartNanoTime = queryStartNanoTime;
+
+        startTime = System.nanoTime();
+        enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+    }
+
+    @Override
+    protected RowIterator computeNext()
+    {
+        try
+        {
+            while (sentQueryIterator == null || !sentQueryIterator.hasNext())
+            {
+                // If we don't have more range to handle, we're done
+                if (!replicaPlans.hasNext())
+                    return endOfData();
+
+                // else, sends the next batch of concurrent queries (after having close the previous iterator)
+                if (sentQueryIterator != null)
+                {
+                    liveReturned += counter.counted();
+                    sentQueryIterator.close();
+
+                    // It's not the first batch of queries and we're not done, so we we can use what has been
+                    // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
+                    updateConcurrencyFactor();
+                }
+                sentQueryIterator = sendNextRequests();
+            }
+
+            return sentQueryIterator.next();
+        }
+        catch (UnavailableException e)
+        {
+            rangeMetrics.unavailables.mark();
+            throw e;
+        }
+        catch (ReadTimeoutException e)
+        {
+            rangeMetrics.timeouts.mark();
+            throw e;
+        }
+        catch (ReadFailureException e)
+        {
+            rangeMetrics.failures.mark();
+            throw e;
+        }
+    }
+
+    private void updateConcurrencyFactor()
+    {
+        liveReturned += counter.counted();
+
+        concurrencyFactor = computeConcurrencyFactor(totalRangeCount, rangesQueried, maxConcurrencyFactor, command.limits().count(), liveReturned);
+    }
+
+    @VisibleForTesting
+    static int computeConcurrencyFactor(int totalRangeCount, int rangesQueried, int maxConcurrencyFactor, int limit, int liveReturned)
+    {
+        maxConcurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, totalRangeCount - rangesQueried));
+        if (liveReturned == 0)
+        {
+            // we haven't actually gotten any results, so query up to the limit if not results so far
+            Tracing.trace("Didn't get any response rows; new concurrent requests: {}", maxConcurrencyFactor);
+            return maxConcurrencyFactor;
+        }
+
+        // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
+        // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
+        int remainingRows = limit - liveReturned;
+        float rowsPerRange = (float) liveReturned / (float) rangesQueried;
+        int concurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, Math.round(remainingRows / rowsPerRange)));
+        logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
+                     rowsPerRange, remainingRows, concurrencyFactor);
+        return concurrencyFactor;
+    }
+
+    /**
+     * Queries the provided sub-range.
+     *
+     * @param replicaPlan the subRange to query.
+     * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
+     * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
+     * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
+     * that it's the query that "continues" whatever we're previously queried).
+     */
+    private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
+    {
+        PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
+        // If enabled, request repaired data tracking info from full replicas but
+        // only if there are multiple full replicas to compare results from
+        if (DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
+            && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
+        {
+            command.trackRepairedStatus();
+            rangeCommand.trackRepairedStatus();
+        }
+
+        ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
+        ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair =
+                ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
+        DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver =
+                new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
+        ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
+                new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
+
+        if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf())
+        {
+            Stage.READ.execute(new StorageProxy.LocalReadRunnable(rangeCommand, handler));
+        }
+        else
+        {
+            for (Replica replica : replicaPlan.contacts())
+            {
+                Tracing.trace("Enqueuing request to {}", replica);
+                ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica);
+                Message<ReadCommand> message = command.createMessage(command.isTrackingRepairedStatus() && replica.isFull());
+                MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
+            }
+        }
+
+        return new SingleRangeResponse(resolver, handler, readRepair);
+    }
+
+    private PartitionIterator sendNextRequests()
+    {
+        List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
+        List<ReadRepair<?, ?>> readRepairs = new ArrayList<>(concurrencyFactor);
+
+        try
+        {
+            for (int i = 0; i < concurrencyFactor && replicaPlans.hasNext(); )
+            {
+                ReplicaPlan.ForRangeRead replicaPlan = replicaPlans.next();
+
+                @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below
+                SingleRangeResponse response = query(replicaPlan, i == 0);
+                concurrentQueries.add(response);
+                readRepairs.add(response.getReadRepair());
+                // due to RangeMerger, coordinator may fetch more ranges than required by concurrency factor.
+                rangesQueried += replicaPlan.vnodeCount();
+                i += replicaPlan.vnodeCount();
+            }
+            batchesRequested++;
+        }
+        catch (Throwable t)
+        {
+            for (PartitionIterator response : concurrentQueries)
+                response.close();
+            throw t;
+        }
+
+        Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
+        // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor)
+        // but we don't want to enforce any particular limit at this point (this could break code than rely on
+        // postReconciliationProcessing), hence the DataLimits.NONE.
+        counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
+        return counter.applyTo(StorageProxy.concatAndBlockOnRepair(concurrentQueries, readRepairs));
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            if (sentQueryIterator != null)
+                sentQueryIterator.close();
+
+            replicaPlans.close();
+        }
+        finally
+        {
+            long latency = System.nanoTime() - startTime;
+            rangeMetrics.addNano(latency);
+            Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    @VisibleForTesting
+    int rangesQueried()
+    {
+        return rangesQueried;
+    }
+
+    @VisibleForTesting
+    int batchesRequested()
+    {
+        return batchesRequested;
+    }
+
+    @VisibleForTesting
+    int maxConcurrencyFactor()
+    {
+        return maxConcurrencyFactor;
+    }
+
+    @VisibleForTesting
+    int concurrencyFactor()
+    {
+        return concurrencyFactor;
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java
new file mode 100644
index 0000000..3452a35
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.util.VisibleForTesting;
+
+public class RangeCommands
+{
+    private static final Logger logger = LoggerFactory.getLogger(RangeCommandIterator.class);
+
+    private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
+
+    /**
+     * Introduce a maximum number of sub-ranges that the coordinator can request in parallel for range queries. Previously
+     * we would request up to the maximum number of ranges but this causes problems if the number of vnodes is large.
+     * By default we pick 10 requests per core, assuming all replicas have the same number of cores. The idea is that we
+     * don't want a burst of range requests that will back up, hurting all other queries. At the same time,
+     * we want to give range queries a chance to run if resources are available.
+     */
+    private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests",
+                                                                                            FBUtilities.getAvailableProcessors() * 10));
+
+    @SuppressWarnings("resource") // created iterators will be closed in CQL layer through the chain of transformations
+    public static PartitionIterator partitions(PartitionRangeReadCommand command,
+                                               ConsistencyLevel consistencyLevel,
+                                               long queryStartNanoTime)
+    {
+        // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
+        RangeCommandIterator rangeCommands = rangeCommandIterator(command, consistencyLevel, queryStartNanoTime);
+        return command.limits().filter(command.postReconciliationProcessing(rangeCommands),
+                                       command.nowInSec(),
+                                       command.selectsFullPartition(),
+                                       command.metadata().enforceStrictLiveness());
+    }
+
+    @VisibleForTesting
+    @SuppressWarnings("resource") // created iterators will be closed in CQL layer through the chain of transformations
+    static RangeCommandIterator rangeCommandIterator(PartitionRangeReadCommand command,
+                                                     ConsistencyLevel consistencyLevel,
+                                                     long queryStartNanoTime)
+    {
+        Tracing.trace("Computing ranges to query");
+
+        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+        ReplicaPlanIterator replicaPlans = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, consistencyLevel);
+
+        // our estimate of how many result rows there will be per-range
+        float resultsPerRange = estimateResultsPerRange(command, keyspace);
+        // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
+        // fetch enough rows in the first round
+        resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+        int maxConcurrencyFactor = Math.min(replicaPlans.size(), MAX_CONCURRENT_RANGE_REQUESTS);
+        int concurrencyFactor = resultsPerRange == 0.0
+                                ? 1
+                                : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
+        logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
+                     resultsPerRange, command.limits().count(), replicaPlans.size(), concurrencyFactor);
+        Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
+                      replicaPlans.size(), concurrencyFactor, resultsPerRange);
+
+        ReplicaPlanMerger mergedReplicaPlans = new ReplicaPlanMerger(replicaPlans, keyspace, consistencyLevel);
+        return new RangeCommandIterator(mergedReplicaPlans,
+                                        command,
+                                        concurrencyFactor,
+                                        maxConcurrencyFactor,
+                                        replicaPlans.size(),
+                                        queryStartNanoTime);
+    }
+
+    /**
+     * Estimate the number of result rows per range in the ring based on our local data.
+     * <p>
+     * This assumes that ranges are uniformly distributed across the cluster and
+     * that the queried data is also uniformly distributed.
+     */
+    @VisibleForTesting
+    static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
+    {
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
+        Index index = command.getIndex(cfs);
+        float maxExpectedResults = index == null
+                                   ? command.limits().estimateTotalResults(cfs)
+                                   : index.getEstimatedResultRows();
+
+        // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
+        return (maxExpectedResults / DatabaseDescriptor.getNumTokens())
+               / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java
new file mode 100644
index 0000000..ef88c9d
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+class ReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
+{
+    private final Keyspace keyspace;
+    private final ConsistencyLevel consistency;
+    @VisibleForTesting
+    final Iterator<? extends AbstractBounds<PartitionPosition>> ranges;
+    private final int rangeCount;
+
+    ReplicaPlanIterator(AbstractBounds<PartitionPosition> keyRange, Keyspace keyspace, ConsistencyLevel consistency)
+    {
+        this.keyspace = keyspace;
+        this.consistency = consistency;
+
+        List<? extends AbstractBounds<PartitionPosition>> l = keyspace.getReplicationStrategy() instanceof LocalStrategy
+                                                              ? keyRange.unwrap()
+                                                              : getRestrictedRanges(keyRange);
+        this.ranges = l.iterator();
+        this.rangeCount = l.size();
+    }
+
+    /**
+     * @return the number of {@link ReplicaPlan.ForRangeRead}s in this iterator
+     */
+    int size()
+    {
+        return rangeCount;
+    }
+
+    @Override
+    protected ReplicaPlan.ForRangeRead computeNext()
+    {
+        if (!ranges.hasNext())
+            return endOfData();
+
+        return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next(), 1);
+    }
+
+    /**
+     * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
+     * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
+     */
+    private static List<AbstractBounds<PartitionPosition>> getRestrictedRanges(final AbstractBounds<PartitionPosition> queryRange)
+    {
+        // special case for bounds containing exactly 1 (non-minimum) token
+        if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
+        {
+            return Collections.singletonList(queryRange);
+        }
+
+        TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
+
+        List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>();
+        // divide the queryRange into pieces delimited by the ring and minimum tokens
+        Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
+        AbstractBounds<PartitionPosition> remainder = queryRange;
+        while (ringIter.hasNext())
+        {
+            /*
+             * remainder is a range/bounds of partition positions and we want to split it with a token. We want to split
+             * using the key returned by token.maxKeyBound. For instance, if remainder is [DK(10, 'foo'), DK(20, 'bar')],
+             * and we have 3 nodes with tokens 0, 15, 30, we want to split remainder to A=[DK(10, 'foo'), 15] and
+             * B=(15, DK(20, 'bar')]. But since we can't mix tokens and keys at the same time in a range, we use
+             * 15.maxKeyBound() to have A include all keys having 15 as token and B include none of those (since that is
+             * what our node owns).
+             */
+            Token upperBoundToken = ringIter.next();
+            PartitionPosition upperBound = upperBoundToken.maxKeyBound();
+            if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
+                // no more splits
+                break;
+            Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> splits = remainder.split(upperBound);
+            if (splits == null)
+                continue;
+
+            ranges.add(splits.left);
+            remainder = splits.right;
+        }
+        ranges.add(remainder);
+
+        return ranges;
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanMerger.java b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanMerger.java
new file mode 100644
index 0000000..f2dacd0
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanMerger.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.utils.AbstractIterator;
+
+class ReplicaPlanMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
+{
+    private final Keyspace keyspace;
+    private final ConsistencyLevel consistency;
+    private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
+
+    ReplicaPlanMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
+    {
+        this.keyspace = keyspace;
+        this.consistency = consistency;
+        this.ranges = Iterators.peekingIterator(iterator);
+    }
+
+    @Override
+    protected ReplicaPlan.ForRangeRead computeNext()
+    {
+        if (!ranges.hasNext())
+            return endOfData();
+
+        ReplicaPlan.ForRangeRead current = ranges.next();
+
+        // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
+        // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
+        // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
+        while (ranges.hasNext())
+        {
+            // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
+            // don't know how to deal with a wrapping range.
+            // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
+            // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
+            // wire compatibility, so it's likely easier not to bother;
+            if (current.range().right.isMinimum())
+                break;
+
+            ReplicaPlan.ForRangeRead next = ranges.peek();
+            ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
+            if (merged == null)
+                break;
+
+            current = merged;
+            ranges.next(); // consume the range we just merged since we've only peeked so far
+        }
+        return current;
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/range/SingleRangeResponse.java b/src/java/org/apache/cassandra/service/reads/range/SingleRangeResponse.java
new file mode 100644
index 0000000..d318b41
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/SingleRangeResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.utils.AbstractIterator;
+
+class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
+{
+    private final DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver;
+    private final ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler;
+    private final ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair;
+
+    private PartitionIterator result;
+
+    SingleRangeResponse(DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver,
+                        ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler,
+                        ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair)
+    {
+        this.resolver = resolver;
+        this.handler = handler;
+        this.readRepair = readRepair;
+    }
+
+    ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> getReadRepair()
+    {
+        return readRepair;
+    }
+
+    private void waitForResponse() throws ReadTimeoutException
+    {
+        if (result != null)
+            return;
+
+        handler.awaitResults();
+        result = resolver.resolve();
+    }
+
+    @Override
+    protected RowIterator computeNext()
+    {
+        waitForResponse();
+        return result.hasNext() ? result.next() : endOfData();
+    }
+
+    @Override
+    public void close()
+    {
+        if (result != null)
+            result.close();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 09a98df..babc2c3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -27,7 +27,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -477,6 +476,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         return instances.get(node - 1).coordinator();
     }
 
+    public Stream<ICoordinator> coordinators()
+    {
+        return stream().map(IInstance::coordinator);
+    }
+
     /**
      * WARNING: we index from 1 here, for consistency with inet address!
      */
diff --git a/test/distributed/org/apache/cassandra/distributed/test/LargeMessageTest.java b/test/distributed/org/apache/cassandra/distributed/test/LargeMessageTest.java
new file mode 100644
index 0000000..392f325
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/LargeMessageTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
+
+public class LargeMessageTest extends TestBaseImpl
+{
+    @Test
+    public void testLargeMessage() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(2).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))"));
+            StringBuilder builder = new StringBuilder();
+            for (int i = 0; i < LARGE_MESSAGE_THRESHOLD; i++)
+                builder.append('a');
+            String s = builder.toString();
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, 1, ?)"), ALL, s);
+            assertRows(cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ?"), ALL, 1),
+                       row(1, 1, s));
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/MetricsCountQueriesTest.java b/test/distributed/org/apache/cassandra/distributed/test/MetricsCountQueriesTest.java
new file mode 100644
index 0000000..a742e48
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/MetricsCountQueriesTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+
+public class MetricsCountQueriesTest extends TestBaseImpl
+{
+    @Test
+    public void testMetricsCountQueries() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
+            for (int i = 0; i < 100; i++)
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"), ALL, i, i, i);
+
+            long readCount1 = readCount(cluster.get(1));
+            long readCount2 = readCount(cluster.get(2));
+            for (int i = 0; i < 100; i++)
+                cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ? and ck = ?"), ALL, i, i);
+
+            readCount1 = readCount(cluster.get(1)) - readCount1;
+            readCount2 = readCount(cluster.get(2)) - readCount2;
+            Assert.assertEquals(readCount1, readCount2);
+            Assert.assertEquals(100, readCount1);
+        }
+    }
+
+    private static long readCount(IInvokableInstance instance)
+    {
+        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PagingTest.java b/test/distributed/org/apache/cassandra/distributed/test/PagingTest.java
new file mode 100644
index 0000000..c0d8fe3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/PagingTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public class PagingTest extends TestBaseImpl
+{
+    @Test
+    public void testPaging() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(3).start());
+             Cluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
+            singleNode.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
+
+            for (int i = 0; i < 10; i++)
+            {
+                for (int j = 0; j < 10; j++)
+                {
+                    cluster.coordinator(1)
+                           .execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, ?, ?)"), QUORUM, i, j, i + i);
+                    singleNode.coordinator(1)
+                              .execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, ?, ?)"), QUORUM, i, j, i + i);
+                }
+            }
+
+            int[] pageSizes = new int[]{ 1, 2, 3, 5, 10, 20, 50, Integer.MAX_VALUE };
+            String[] statements = new String[]{ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck >= 5"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 AND ck <= 10"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 LIMIT 3"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck >= 5 LIMIT 2"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2"),
+                                                withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2"),
+                                                withKeyspace("SELECT DISTINCT pk FROM %s.tbl LIMIT 3"),
+                                                withKeyspace("SELECT DISTINCT pk FROM %s.tbl WHERE pk IN (3,5,8,10)"),
+                                                withKeyspace("SELECT DISTINCT pk FROM %s.tbl WHERE pk IN (3,5,8,10) LIMIT 2")
+            };
+            for (String statement : statements)
+            {
+                Object[][] noPagingRows = singleNode.coordinator(1).execute(statement, QUORUM);
+                for (int pageSize : pageSizes)
+                {
+                    Iterator<Object[]> pagingRows = cluster.coordinator(1).executeWithPaging(statement, QUORUM, pageSize);
+                    assertRows(Iterators.toArray(pagingRows, Object[].class), noPagingRows);
+                }
+            }
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairEmptyRangeTombstonesTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairEmptyRangeTombstonesTest.java
index a8fb978..bcc77e0 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairEmptyRangeTombstonesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairEmptyRangeTombstonesTest.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.cassandra.distributed.shared.AssertUtils.row;
-import static org.apache.cassandra.distributed.test.ReadRepairTester.BOOLEANS;
 
 
 /**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTypesTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTypesTest.java
index dfdb6c2..b79e0c4 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTypesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTypesTest.java
@@ -36,7 +36,6 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
 import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
 import static org.apache.cassandra.distributed.shared.AssertUtils.row;
-import static org.apache.cassandra.distributed.test.ReadRepairTester.BOOLEANS;
 import static org.apache.cassandra.service.reads.repair.ReadRepairStrategy.NONE;
 
 /**
@@ -1190,9 +1189,4 @@ public class ReadRepairQueryTypesTest extends TestBaseImpl
             schemaChange("DROP TABLE " + qualifiedTableName);
         }
     }
-
-    private static Object[][] rows(Object[]... rows)
-    {
-        return rows;
-    }
 }
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTester.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTester.java
index 9af884e..c3a36cb 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTester.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTester.java
@@ -41,9 +41,6 @@ import static org.apache.cassandra.distributed.test.TestBaseImpl.KEYSPACE;
  */
 public abstract class ReadRepairTester<T extends ReadRepairTester<T>>
 {
-    static final Object[][] EMPTY_ROWS = new Object[0][];
-    static final boolean[] BOOLEANS = new boolean[]{ false, true };
-
     private static final AtomicInteger seqNumber = new AtomicInteger();
 
     private final String tableName = "t_" + seqNumber.getAndIncrement();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SSTableSkippingReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/SSTableSkippingReadTest.java
new file mode 100644
index 0000000..84b90c7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SSTableSkippingReadTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+
+public class SSTableSkippingReadTest extends TestBaseImpl
+{
+    @Test
+    public void skippedSSTableWithPartitionDeletionTest() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"));
+            // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+            cluster.get(1).executeInternal(withKeyspace("DELETE FROM %s.tbl USING TIMESTAMP 1 WHERE pk = 0"));
+            // and a row from a different partition, to provide the sstable's min/max clustering
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2"));
+            cluster.get(1).flush(KEYSPACE);
+            // expect a single sstable, where minTimestamp equals the timestamp of the partition delete
+            cluster.get(1).runOnInstance(() -> {
+                Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
+                                                      .getColumnFamilyStore("tbl")
+                                                      .getLiveSSTables();
+                assertEquals("Expected a single sstable, but found " + sstables.size(), 1, sstables.size());
+                long minTimestamp = sstables.iterator().next().getMinTimestamp();
+                assertEquals("Expected min timestamp of 1, but was " + minTimestamp, 1, minTimestamp);
+            });
+
+            // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+            cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"));
+
+
+            Object[][] rows = cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk=0 AND ck > 5"), ALL);
+            assertEquals("Expected 0 rows, but found " + rows.length, 0, rows.length);
+        }
+    }
+
+    @Test
+    public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"));
+            // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+            cluster.get(1).executeInternal(withKeyspace("DELETE FROM %s.tbl USING TIMESTAMP 1 WHERE pk = 0"));
+            // and a row from a different partition, to provide the sstable's min/max clustering
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1"));
+            cluster.get(1).flush(KEYSPACE);
+            // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+            // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly,
+            // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first
+            // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+            // know what data and/or tombstones are present on other nodes
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2"));
+            cluster.get(1).flush(KEYSPACE);
+
+            // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+            cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"));
+
+            Object[][] rows = cluster.coordinator(1)
+                                     .execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk=0 AND ck > 5"), ALL);
+            // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+            // node 1 (0, 6, 6) was not.
+            assertRows(rows, new Object[]{ 0, 6, 6 });
+        }
+    }
+
+    @Test
+    public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable
+    {
+        // don't not add skipped sstables back just because the partition delete ts is < the local min ts
+
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"));
+            // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+            cluster.get(1).executeInternal(withKeyspace("DELETE FROM %s.tbl USING TIMESTAMP 1 WHERE pk = 0"));
+            // and a row from a different partition, to provide the sstable's min/max clustering
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3"));
+            cluster.get(1).flush(KEYSPACE);
+            // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+            // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable
+            // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the
+            // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would  cause the
+            // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+            // know what data and/or tombstones are present on other nodes
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2"));
+            cluster.get(1).flush(KEYSPACE);
+
+            // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+            cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"));
+
+            Object[][] rows = cluster.coordinator(1)
+                                     .execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk=0 AND ck > 5"), ALL);
+            // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+            // node 1 (0, 6, 6) was not.
+            assertRows(rows, new Object[]{ 0, 6, 6 });
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SchemaDisagreementTest.java b/test/distributed/org/apache/cassandra/distributed/test/SchemaDisagreementTest.java
new file mode 100644
index 0000000..b84ef73
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaDisagreementTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.fail;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+
+public class SchemaDisagreementTest extends TestBaseImpl
+{
+    /**
+     * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
+     */
+    @Test
+    public void writeWithSchemaDisagreement() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"));
+
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+            cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+            cluster.get(3).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+
+            // Introduce schema disagreement
+            cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl ADD v2 int"), 1);
+
+            try
+            {
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)"), ALL);
+                fail("Should have failed because of schema disagreement.");
+            }
+            catch (Exception e)
+            {
+                // for some reason, we get weird errors when trying to check class directly
+                // I suppose it has to do with some classloader manipulation going on
+                Assert.assertTrue(e.getClass().toString().contains("WriteFailureException"));
+                // we may see 1 or 2 failures in here, because of the fail-fast behavior of AbstractWriteResponseHandler
+                Assert.assertTrue(e.getMessage().contains("INCOMPATIBLE_SCHEMA from ") &&
+                                  (e.getMessage().contains("/127.0.0.2") || e.getMessage().contains("/127.0.0.3")));
+            }
+        }
+    }
+
+    /**
+     * If a node receives a mutation for a column it knows has been dropped, the write should succeed.
+     */
+    @Test
+    public void writeWithSchemaDisagreement2() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"));
+
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"));
+            cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"));
+            cluster.get(3).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"));
+            cluster.forEach((instance) -> instance.flush(KEYSPACE));
+
+            // Introduce schema disagreement
+            cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl DROP v2"), 1);
+
+            // execute a write including the dropped column where the coordinator is not yet aware of the drop
+            // all nodes should process this without error
+            cluster.coordinator(2).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)"), ALL);
+            // and flushing should also be fine
+            cluster.forEach((instance) -> instance.flush(KEYSPACE));
+            // the results of reads will vary depending on whether the coordinator has seen the schema change
+            // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
+            assertRows(cluster.coordinator(2).execute(withKeyspace("SELECT * FROM %s.tbl"), ALL),
+                       rows(row(1, 1, 1, 1), row(2, 2, 2, 2)));
+            assertRows(cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl"), ALL),
+                       rows(row(1, 1, 1), row(2, 2, 2)));
+        }
+    }
+
+    /**
+     * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed.
+     */
+    @Test
+    public void writeWithInconsequentialSchemaDisagreement() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"));
+
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+            cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+            cluster.get(3).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+
+            // Introduce schema disagreement
+            cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl ADD v2 int"), 1);
+
+            // this write shouldn't cause any problems because it doesn't write to the new column
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (2, 2, 2)"), ALL);
+        }
+    }
+
+    /**
+     * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for
+     * that column.
+     */
+    @Test
+    public void readWithSchemaDisagreement() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"));
+
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+            cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+            cluster.get(3).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+
+            // Introduce schema disagreement
+            cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl ADD v2 int"), 1);
+
+            assertRows(cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1"), ALL),
+                       new Object[][]{ new Object[]{ 1, 1, 1, null } });
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
index 23b5c14..69b074f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
@@ -18,46 +18,305 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.IntStream;
 
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.collect.Iterators.toArray;
 import static java.lang.String.format;
-import static java.util.Arrays.asList;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
-import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
 
 /**
  * Tests short read protection, the mechanism that ensures distributed queries at read consistency levels > ONE/LOCAL_ONE
  * avoid short reads that might happen when a limit is used and reconciliation accepts less rows than such limit.
  */
+@RunWith(Parameterized.class)
 public class ShortReadProtectionTest extends TestBaseImpl
 {
+    private static final int NUM_NODES = 3;
+    private static final int[] PAGE_SIZES = new int[]{ 1, 10 };
+
+    private static Cluster cluster;
+    private Tester tester;
+
+    /**
+     * The consistency level to be used in reads. Internal writes will hit the minimum number of replicas to match this
+     * consisstency level. With RF=3, this means one witten replica for CL=ALL, and two for CL=QUORUM.
+     */
+    @Parameterized.Parameter
+    public ConsistencyLevel readConsistencyLevel;
+
+    /**
+     * Whether to flush data after mutations.
+     */
+    @Parameterized.Parameter(1)
+    public boolean flush;
+
+    /**
+     * Whether paging is used for the distributed queries.
+     */
+    @Parameterized.Parameter(2)
+    public boolean paging;
+
+    @Parameterized.Parameters(name = "{index}: read_cl={0} flush={1} paging={2}")
+    public static Collection<Object[]> data()
+    {
+        List<Object[]> result = new ArrayList<>();
+        for (ConsistencyLevel readConsistencyLevel : Arrays.asList(ALL, QUORUM))
+            for (boolean flush : BOOLEANS)
+                for (boolean paging : BOOLEANS)
+                    result.add(new Object[]{ readConsistencyLevel, flush, paging });
+        return result;
+    }
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = init(Cluster.build()
+                              .withNodes(NUM_NODES)
+                              .withConfig(config -> config.set("hinted_handoff_enabled", false))
+                              .start());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Before
+    public void setupTester()
+    {
+        tester = new Tester(readConsistencyLevel, flush, paging);
+    }
+
+    @After
+    public void teardownTester()
+    {
+        tester.dropTable();
+    }
+
+    /**
+     * Tests SRP for tables with no clustering columns and with a deleted row.
+     * <p>
+     * See CASSANDRA-13880.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13880()}.
+     */
+    @Test
+    public void testSkinnyTableWithoutLiveRows()
+    {
+        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
+              .allNodes("INSERT INTO %s (id) VALUES (0) USING TIMESTAMP 0")
+              .toNode1("DELETE FROM %s WHERE id = 0")
+              .assertRows("SELECT DISTINCT id FROM %s WHERE id = 0")
+              .assertRows("SELECT id FROM %s WHERE id = 0 LIMIT 1");
+    }
+
+    /**
+     * Tests SRP for tables with no clustering columns and with alternated live and deleted rows.
+     * <p>
+     * See CASSANDRA-13747.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13747()}.
+     */
+    @Test
+    public void testSkinnyTableWithLiveRows()
+    {
+        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
+              .allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d) USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
+              .toNode1("DELETE FROM %s WHERE id IN (1, 0, 4, 6, 3)") // delete every other row
+              .assertRows("SELECT DISTINCT token(id), id FROM %s",
+                          row(token(5), 5), row(token(8), 8), row(token(2), 2), row(token(7), 7), row(token(9), 9));
+    }
+
+    /**
+     * Tests SRP for tables with no clustering columns and with complementary deleted rows.
+     * <p>
+     * See CASSANDRA-13595.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13595()}.
+     */
+    @Test
+    public void testSkinnyTableWithComplementaryDeletions()
+    {
+        tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
+              .allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d) USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
+              .toNode1("DELETE FROM %s WHERE id IN (5, 8, 2, 7, 9)") // delete every other row
+              .toNode2("DELETE FROM %s WHERE id IN (1, 0, 4, 6)") // delete every other row but the last one
+              .assertRows("SELECT id FROM %s LIMIT 1", row(3))
+              .assertRows("SELECT DISTINCT id FROM %s LIMIT 1", row(3));
+    }
+
+    /**
+     * Tests SRP when more than one row is missing.
+     * <p>
+     * See CASSANDRA-12872.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_12872()}.
+     */
+    @Test
+    public void testMultipleMissedRows()
+    {
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+              .allNodes(0, 4, i -> format("INSERT INTO %%s (pk, ck) VALUES (0, %d) USING TIMESTAMP 0", i))
+              .toNode1("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2, 3)",
+                       "INSERT INTO %s (pk, ck) VALUES (0, 5)")
+              .toNode2("INSERT INTO %s (pk, ck) VALUES (0, 4)")
+              .assertRows("SELECT ck FROM %s WHERE pk = 0 LIMIT 2", row(0), row(4));
+    }
+
+    /**
+     * Tests SRP with deleted rows at the beginning of the partition and ascending order.
+     * <p>
+     * See CASSANDRA-9460.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read()} together with
+     * {@link #testDescendingOrder()}.
+     */
+    @Test
+    public void testAscendingOrder()
+    {
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
+              .allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES (0, %d, %d) USING TIMESTAMP 0", i, i * 10))
+              .toNode1("DELETE FROM %s WHERE k=0 AND c=1")
+              .toNode2("DELETE FROM %s WHERE k=0 AND c=2")
+              .toNode3("DELETE FROM %s WHERE k=0 AND c=3")
+              .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 1", row(4, 40))
+              .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 2", row(4, 40), row(5, 50))
+              .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 3", row(4, 40), row(5, 50), row(6, 60))
+              .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 4", row(4, 40), row(5, 50), row(6, 60), row(7, 70));
+    }
+
+    /**
+     * Tests SRP behaviour with deleted rows at the end of the partition and descending order.
+     * <p>
+     * See CASSANDRA-9460.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read()} together with
+     * {@link #testAscendingOrder()}.
+     */
+    @Test
+    public void testDescendingOrder()
+    {
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
+              .allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES (0, %d, %d) USING TIMESTAMP 0", i, i * 10))
+              .toNode1("DELETE FROM %s WHERE k=0 AND c=7")
+              .toNode2("DELETE FROM %s WHERE k=0 AND c=8")
+              .toNode3("DELETE FROM %s WHERE k=0 AND c=9")
+              .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 1", row(6, 60))
+              .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 2", row(6, 60), row(5, 50))
+              .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 3", row(6, 60), row(5, 50), row(4, 40))
+              .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 4", row(6, 60), row(5, 50), row(4, 40), row(3, 30));
+    }
+
+    /**
+     * Test short reads ultimately leaving no rows alive after a partition deletion.
+     * <p>
+     * See CASSANDRA-4000 and CASSANDRA-8933.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read_delete()} and
+     * {@code consistency_test.py:TestConsistency.test_short_read_quorum_delete()}. Note that the
+     * {@link #readConsistencyLevel} test parameter ensures that both tests are covered.
+     */
+    @Test
+    public void testDeletePartition()
+    {
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
+              .allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING TIMESTAMP 0",
+                        "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
+              .toNode2("DELETE FROM %s WHERE k=0")
+              .assertRows("SELECT c, v FROM %s WHERE k=0 LIMIT 1");
+    }
+
+    /**
+     * Test short reads ultimately leaving no rows alive after a partition deletion when there is a static row.
+     */
+    @Test
+    public void testDeletePartitionWithStatic()
+    {
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, s int STATIC, PRIMARY KEY(k, c))")
+              .allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100) USING TIMESTAMP 0",
+                        "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
+              .toNode2("DELETE FROM %s WHERE k=0")
+              .assertRows("SELECT c, v FROM %s WHERE k=0 LIMIT 1");
+    }
+
+    /**
+     * Test short reads ultimately leaving no rows alive after a clustering deletion.
+     */
+    @Test
+    public void testDeleteClustering()
+    {
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
+              .allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING TIMESTAMP 0",
+                        "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
+              .toNode2("DELETE FROM %s WHERE k=0 AND c=1")
+              .assertRows("SELECT * FROM %s WHERE k=0 LIMIT 1", row(0, 2, 20))
+              .toNode2("DELETE FROM %s WHERE k=0 AND c=2")
+              .assertRows("SELECT * FROM %s WHERE k=0 LIMIT 1");
+    }
+
+    /**
+     * Test short reads ultimately leaving no rows alive after a clustering deletion when there is a static row.
+     */
+    @Test
+    public void testDeleteClusteringWithStatic()
+    {
+        tester.createTable("CREATE TABLE %s (k int, c int, v int, s int STATIC, PRIMARY KEY(k, c))")
+              .allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100) USING TIMESTAMP 0",
+                        "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
+              .toNode2("DELETE FROM %s WHERE k=0 AND c=1")
+              .assertRows("SELECT k, c, v, s FROM %s WHERE k=0 LIMIT 1", row(0, 2, 20, 100))
+              .toNode2("DELETE FROM %s WHERE k=0 AND c=2")
+              .assertRows("SELECT k, c, v, s FROM %s WHERE k=0 LIMIT 1", row(0, null, null, 100));
+    }
+
     /**
      * Test GROUP BY with short read protection, particularly when there is a limit and regular row deletions.
      * <p>
      * See CASSANDRA-15459
      */
     @Test
-    public void testGroupBySRPRegularRow() throws Throwable
+    public void testGroupByRegularRow()
     {
-        testGroupBySRP("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))",
-                       asList("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP 0",
-                              "DELETE FROM %s WHERE pk=0 AND ck=0",
-                              "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0"),
-                       asList("DELETE FROM %s WHERE pk=1 AND ck=1",
-                              "INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
-                              "DELETE FROM %s WHERE pk=2 AND ck=2"),
-                       asList("SELECT * FROM %s LIMIT 1",
-                              "SELECT * FROM %s LIMIT 10",
-                              "SELECT * FROM %s GROUP BY pk LIMIT 1",
-                              "SELECT * FROM %s GROUP BY pk LIMIT 10",
-                              "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
-                              "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+              .toNode1("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP 0",
+                       "DELETE FROM %s WHERE pk=0 AND ck=0",
+                       "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0")
+              .toNode2("DELETE FROM %s WHERE pk=1 AND ck=1",
+                       "INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
+                       "DELETE FROM %s WHERE pk=2 AND ck=2")
+              .assertRows("SELECT * FROM %s LIMIT 1")
+              .assertRows("SELECT * FROM %s LIMIT 10")
+              .assertRows("SELECT * FROM %s GROUP BY pk LIMIT 1")
+              .assertRows("SELECT * FROM %s GROUP BY pk LIMIT 10")
+              .assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 1")
+              .assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 10");
     }
 
     /**
@@ -66,51 +325,223 @@ public class ShortReadProtectionTest extends TestBaseImpl
      * See CASSANDRA-15459
      */
     @Test
-    public void testGroupBySRPStaticRow() throws Throwable
-    {
-        testGroupBySRP("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY KEY (pk, ck))",
-                       asList("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP 0",
-                              "INSERT INTO %s (pk, s) VALUES (0, null)",
-                              "INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP 0"),
-                       asList("INSERT INTO %s (pk, s) VALUES (1, null)",
-                              "INSERT INTO %s (pk, s) VALUES (0, 0) USING TIMESTAMP 0",
-                              "INSERT INTO %s (pk, s) VALUES (2, null)"),
-                       asList("SELECT * FROM %s LIMIT 1",
-                              "SELECT * FROM %s LIMIT 10",
-                              "SELECT * FROM %s GROUP BY pk LIMIT 1",
-                              "SELECT * FROM %s GROUP BY pk LIMIT 10",
-                              "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
-                              "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
-    }
-
-    private void testGroupBySRP(String createTable,
-                                List<String> node1Queries,
-                                List<String> node2Queries,
-                                List<String> coordinatorQueries) throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.build()
-                                           .withNodes(2)
-                                           .withConfig(config -> config.set("hinted_handoff_enabled", false))
-                                           .start()))
+    public void testGroupByStaticRow()
+    {
+        tester.createTable("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY KEY (pk, ck))")
+              .toNode1("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, s) VALUES (0, null)",
+                       "INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP 0")
+              .toNode2("INSERT INTO %s (pk, s) VALUES (1, null)",
+                       "INSERT INTO %s (pk, s) VALUES (0, 0) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, s) VALUES (2, null)")
+              .assertRows("SELECT * FROM %s LIMIT 1")
+              .assertRows("SELECT * FROM %s LIMIT 10")
+              .assertRows("SELECT * FROM %s GROUP BY pk LIMIT 1")
+              .assertRows("SELECT * FROM %s GROUP BY pk LIMIT 10")
+              .assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 1")
+              .assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 10");
+    }
+
+    /**
+     * See CASSANDRA-13911.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911()}.
+     */
+    @Test
+    public void testSkipEarlyTermination()
+    {
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+              .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0)")
+              .toNode2("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2)")
+              .assertRows("SELECT DISTINCT pk FROM %s", row(0));
+    }
+
+    /**
+     * A regression test to prove that we can no longer rely on {@code !singleResultCounter.isDoneForPartition()} to
+     * abort single partition SRP early if a per partition limit is set.
+     * <p>
+     * See CASSANDRA-13911.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911_rows_srp()}.
+     */
+    @Test
+    public void testSkipEarlyTerminationRows()
+    {
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+              .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (2, 0) USING TIMESTAMP 0",
+                       "DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck = 1")
+              .toNode2("INSERT INTO %s (pk, ck) VALUES (0, 2) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (0, 3) USING TIMESTAMP 0",
+                       "DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck = 0",
+                       "INSERT INTO %s (pk, ck) VALUES (2, 1) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0")
+              .assertRows("SELECT pk, ck FROM %s PER PARTITION LIMIT 2 LIMIT 3", row(0, 0), row(0, 1), row(2, 2));
+    }
+
+    /**
+     * A regression test to prove that we can no longer rely on {@code !singleResultCounter.isDone()} to abort ranged
+     * partition SRP early if a per partition limit is set.
+     * <p>
+     * See CASSANDRA-13911.
+     * <p>
+     * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911_partitions_srp()}.
+     */
+    @Test
+    public void testSkipEarlyTerminationPartitions()
+    {
+        tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+              .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP 0",
+                       "DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck IN  (0, 1)")
+              .toNode2("INSERT INTO %s (pk, ck) VALUES (0, 2) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (0, 3) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (2, 0) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (2, 1) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (4, 0) USING TIMESTAMP 0",
+                       "INSERT INTO %s (pk, ck) VALUES (4, 1) USING TIMESTAMP 0")
+              .assertRows("SELECT pk, ck FROM %s PER PARTITION LIMIT 2 LIMIT 4",
+                          row(0, 0), row(0, 1), row(4, 0), row(4, 1));
+    }
+
+    private static long token(int key)
+    {
+        return (long) Murmur3Partitioner.instance.getToken(ByteBufferUtil.bytes(key)).getTokenValue();
+    }
+
+    private static class Tester
+    {
+        private static final AtomicInteger seqNumber = new AtomicInteger();
+
+        private final ConsistencyLevel readConsistencyLevel;
+        private final boolean flush, paging;
+        private final String qualifiedTableName;
+
+        private boolean flushed = false;
+
+        private Tester(ConsistencyLevel readConsistencyLevel, boolean flush, boolean paging)
+        {
+            this.readConsistencyLevel = readConsistencyLevel;
+            this.flush = flush;
+            this.paging = paging;
+            qualifiedTableName = KEYSPACE + ".t_" + seqNumber.getAndIncrement();
+
+            assert readConsistencyLevel == ALL || readConsistencyLevel == QUORUM
+            : "Only ALL and QUORUM consistency levels are supported";
+        }
+
+        private Tester createTable(String query)
+        {
+            cluster.schemaChange(format(query) + " WITH read_repair='NONE'");
+            return this;
+        }
+
+        private Tester allNodes(int startInclusive, int endExclusive, Function<Integer, String> querySupplier)
+        {
+            IntStream.range(startInclusive, endExclusive).mapToObj(querySupplier::apply).forEach(this::allNodes);
+            return this;
+        }
+
+        private Tester allNodes(String... queries)
+        {
+            for (String query : queries)
+                allNodes(query);
+            return this;
+        }
+
+        private Tester allNodes(String query)
+        {
+            cluster.coordinator(1).execute(format(query), ALL);
+            return this;
+        }
+
+        /**
+         * Internally runs the specified write queries in the first node. If the {@link #readConsistencyLevel} is
+         * QUORUM, then the write will also be internally done in the second replica, to simulate a QUORUM write.
+         */
+        private Tester toNode1(String... queries)
+        {
+            return toNode(1, queries);
+        }
+
+        /**
+         * Internally runs the specified write queries in the second node. If the {@link #readConsistencyLevel} is
+         * QUORUM, then the write will also be internally done in the third replica, to simulate a QUORUM write.
+         */
+        private Tester toNode2(String... queries)
+        {
+            return toNode(2, queries);
+        }
+
+        /**
+         * Internally runs the specified write queries in the third node. If the {@link #readConsistencyLevel} is
+         * QUORUM, then the write will also be internally done in the first replica, to simulate a QUORUM write.
+         */
+        private Tester toNode3(String... queries)
+        {
+            return toNode(3, queries);
+        }
+
+        /**
+         * Internally runs the specified write queries in the specified node. If the {@link #readConsistencyLevel} is
+         * QUORUM the write will also be internally done in the next replica in the ring, to simulate a QUORUM write.
+         */
+        private Tester toNode(int node, String... queries)
+        {
+            IInvokableInstance replica = cluster.get(node);
+            IInvokableInstance nextReplica = readConsistencyLevel == QUORUM
+                                             ? cluster.get(node == NUM_NODES ? 1 : node + 1)
+                                             : null;
+
+            for (String query : queries)
+            {
+                String formattedQuery = format(query);
+                replica.executeInternal(formattedQuery);
+
+                if (nextReplica != null)
+                    nextReplica.executeInternal(formattedQuery);
+            }
+
+            return this;
+        }
+
+        private Tester assertRows(String query, Object[]... expectedRows)
+        {
+            if (flush && !flushed)
+            {
+                cluster.stream().forEach(n -> n.flush(KEYSPACE));
+                flushed = true;
+            }
+
+            String formattedQuery = format(query);
+            cluster.coordinators().forEach(coordinator -> {
+                if (paging)
+                {
+                    for (int fetchSize : PAGE_SIZES)
+                    {
+                        Iterator<Object[]> actualRows = coordinator.executeWithPaging(formattedQuery, readConsistencyLevel, fetchSize);
+                        AssertUtils.assertRows(toArray(actualRows, Object[].class),  expectedRows);
+                    }
+                }
+                else
+                {
+                    Object[][] actualRows = coordinator.execute(formattedQuery, readConsistencyLevel);
+                    AssertUtils.assertRows(actualRows, expectedRows);
+                }
+            });
+
+            return this;
+        }
+
+        private String format(String query)
+        {
+            return String.format(query, qualifiedTableName);
+        }
+
+        private void dropTable()
         {
-            // create the table with read repair disabled
-            String table = withKeyspace("%s.t");
-            cluster.schemaChange(format(createTable + " WITH read_repair='NONE'", table));
-
-            // populate data on node1
-            IInvokableInstance node1 = cluster.get(1);
-            for (String query : node1Queries)
-                node1.executeInternal(format(query, table));
-
-            // populate data on node2
-            IInvokableInstance node2 = cluster.get(2);
-            for (String query : node2Queries)
-                node2.executeInternal(format(query, table));
-
-            // verify the behaviour of SRP with GROUP BY queries
-            ICoordinator coordinator = cluster.coordinator(1);
-            for (String query : coordinatorQueries)
-                assertRows(coordinator.execute(format(query, table), ALL));
+            cluster.schemaChange(format("DROP TABLE IF EXISTS %s"));
         }
     }
 }
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index fab3477..7278121 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -18,445 +18,315 @@
 
 package org.apache.cassandra.distributed.test;
 
-import java.util.Set;
-
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Iterators;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-import static org.apache.cassandra.distributed.shared.AssertUtils.*;
-import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
-import static org.junit.Assert.fail;
-
-// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRow;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+/**
+ * Simple read/write tests using different types of query, paticularly when the data is spread across memory and
+ * multiple sstables and using different compressors. All available compressors are tested. Both ascending and
+ * descending clustering orders are tested. The read queries are run using every node as a coordinator, with and without
+ * paging.
+ */
+@RunWith(Parameterized.class)
 public class SimpleReadWriteTest extends TestBaseImpl
 {
-    @Test
-    public void coordinatorReadTest() throws Throwable
+    private static final int NUM_NODES = 4;
+    private static final int REPLICATION_FACTOR = 3;
+    private static final String CREATE_TABLE = "CREATE TABLE %s(k int, c int, v int, PRIMARY KEY (k, c)) " +
+                                               "WITH CLUSTERING ORDER BY (c %s) " +
+                                               "AND COMPRESSION = { 'class': '%s' } " +
+                                               "AND READ_REPAIR = 'none'";
+    private static final String[] COMPRESSORS = new String[]{ NoopCompressor.class.getSimpleName(),
+                                                              LZ4Compressor.class.getSimpleName(),
+                                                              DeflateCompressor.class.getSimpleName(),
+                                                              SnappyCompressor.class.getSimpleName(),
+                                                              ZstdCompressor.class.getSimpleName() };
+    private static final int SECOND_SSTABLE_INTERVAL = 2;
+    private static final int MEMTABLE_INTERVAL = 5;
+
+    private static final AtomicInteger seq = new AtomicInteger();
+
+    /**
+     * The sstable compressor to be used.
+     */
+    @Parameterized.Parameter
+    public String compressor;
+
+    /**
+     * Whether the clustering order is reverse.
+     */
+    @Parameterized.Parameter(1)
+    public boolean reverse;
+
+    private String tableName;
+
+    @Parameterized.Parameters(name = "{index}: compressor={0} reverse={1}")
+    public static Collection<Object[]> data()
     {
-        try (Cluster cluster = init(builder().withNodes(3).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
-                                                     ConsistencyLevel.ALL,
-                                                     1),
-                       row(1, 1, 1),
-                       row(1, 2, 2),
-                       row(1, 3, 3));
-        }
+        List<Object[]> result = new ArrayList<>();
+        for (String compressor : COMPRESSORS)
+            for (boolean reverse : BOOLEANS)
+                result.add(new Object[]{ compressor, reverse });
+        return result;
     }
 
-    @Test
-    public void largeMessageTest() throws Throwable
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
     {
-        try (Cluster cluster = init(builder().withNodes(2).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
-            StringBuilder builder = new StringBuilder();
-            for (int i = 0; i < LARGE_MESSAGE_THRESHOLD ; i++)
-                builder.append('a');
-            String s = builder.toString();
-            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
-                                           ConsistencyLevel.ALL,
-                                           s);
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
-                                                      ConsistencyLevel.ALL,
-                                                      1),
-                       row(1, 1, s));
-        }
+        cluster = init(Cluster.build(NUM_NODES).start(), REPLICATION_FACTOR);
     }
 
-    @Test
-    public void coordinatorWriteTest() throws Throwable
+    @AfterClass
+    public static void teardownCluster()
     {
-        try (Cluster cluster = init(builder().withNodes(3).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
-
-            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
-                                          ConsistencyLevel.QUORUM);
+        if (cluster != null)
+            cluster.close();
+    }
 
-            for (int i = 0; i < 3; i++)
-            {
-                assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
-                           row(1, 1, 1));
-            }
+    @Before
+    public void before()
+    {
+        // create the table
+        tableName = String.format("%s.t_%d", KEYSPACE, seq.getAndIncrement());
+        cluster.schemaChange(String.format(CREATE_TABLE, tableName, reverse ? "DESC" : "ASC", compressor));
+    }
 
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
-                                                     ConsistencyLevel.QUORUM),
-                       row(1, 1, 1));
-        }
+    @After
+    public void after()
+    {
+        cluster.schemaChange(withTable("DROP TABLE %s"));
     }
 
     /**
-     * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
+     * Simple put/get on a single partition with a few rows, reading with a single partition query.
+     * <p>
+     * Migrated from Python dtests putget_test.py:TestPutGet.test_putget[_snappy|_deflate]().
      */
     @Test
-    public void writeWithSchemaDisagreement() throws Throwable
+    public void testPartitionQuery()
     {
-        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+        int numRows = 10;
 
-            // Introduce schema disagreement
-            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+        writeRows(1, numRows);
 
-            try
-            {
-                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                              ConsistencyLevel.ALL);
-                fail("Should have failed because of schema disagreement.");
-            }
-            catch (Exception e)
-            {
-                // for some reason, we get weird errors when trying to check class directly
-                // I suppose it has to do with some classloader manipulation going on
-                Assert.assertTrue(e.getClass().toString().contains("WriteFailureException"));
-                // we may see 1 or 2 failures in here, because of the fail-fast behavior of AbstractWriteResponseHandler
-                Assert.assertTrue(e.getMessage().contains("INCOMPATIBLE_SCHEMA from ") &&
-                                  (e.getMessage().contains("/127.0.0.2") || e.getMessage().contains("/127.0.0.3")));
-
-            }
+        Object[][] rows = readRows("SELECT * FROM %s WHERE k=?", 0);
+        Assert.assertEquals(numRows, rows.length);
+        for (int c = 0; c < numRows; c++)
+        {
+            validateRow(rows[c], numRows, 0, c);
         }
     }
 
     /**
-     * If a node receives a mutation for a column it knows has been dropped, the write should succeed
+     * Simple put/get on multiple partitions with multiple rows, reading with a range query.
+     * <p>
+     * Migrated from Python dtests putget_test.py:TestPutGet.test_rangeputget().
      */
     @Test
-    public void writeWithSchemaDisagreement2() throws Throwable
+    public void testRangeQuery()
     {
-        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+        int numPartitions = 10;
+        int rowsPerPartition = 10;
+
+        writeRows(numPartitions, rowsPerPartition);
+
+        Object[][] rows = readRows("SELECT * FROM %s");
+        Assert.assertEquals(numPartitions * rowsPerPartition, rows.length);
+        for (int k = 0; k < numPartitions; k++)
         {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
-            cluster.forEach((instance) -> instance.flush(KEYSPACE));
-
-            // Introduce schema disagreement
-            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
-
-            // execute a write including the dropped column where the coordinator is not yet aware of the drop
-            // all nodes should process this without error
-            cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                               ConsistencyLevel.ALL);
-            // and flushing should also be fine
-            cluster.forEach((instance) -> instance.flush(KEYSPACE));
-            // the results of reads will vary depending on whether the coordinator has seen the schema change
-            // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
-            assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
-                       rows(row(1,1,1,1), row(2,2,2,2)));
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
-                       rows(row(1,1,1), row(2,2,2)));
+            for (int c = 0; c < rowsPerPartition; c++)
+            {
+                Object[] row = rows[k * rowsPerPartition + c];
+                validateRow(row, rowsPerPartition, k, c);
+            }
         }
     }
 
     /**
-     * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed
+     * Simple put/get on a single partition with multiple rows, reading with slice queries.
+     * <p>
+     * Migrated from Python dtests putget_test.py:TestPutGet.test_wide_row().
      */
     @Test
-    public void writeWithInconsequentialSchemaDisagreement() throws Throwable
+    public void testSliceQuery()
     {
-        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+        int numRows = 100;
 
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+        writeRows(1, numRows);
 
-            // Introduce schema disagreement
-            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+        String query = "SELECT * FROM %s WHERE k=? AND c>=? AND c<?";
+        for (int sliceSize : Arrays.asList(10, 20, 100))
+        {
+            for (int c = 0; c < numRows; c = c + sliceSize)
+            {
+                Object[][] rows = readRows(query, 0, c, c + sliceSize);
+                Assert.assertEquals(sliceSize, rows.length);
 
-            // this write shouldn't cause any problems because it doesn't write to the new column
-            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
-                                           ConsistencyLevel.ALL);
+                for (int i = 0; i < sliceSize; i++)
+                {
+                    Object[] row = rows[i];
+                    validateRow(row, numRows, 0, c + i);
+                }
+            }
         }
     }
 
     /**
-     * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for that column
+     * Simple put/get on multiple partitions with multiple rows, reading with IN queries.
      */
     @Test
-    public void readWithSchemaDisagreement() throws Throwable
+    public void testInQuery()
     {
-        try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
-
-            // Introduce schema disagreement
-            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+        int numPartitions = 10;
+        int rowsPerPartition = 10;
 
-            Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
-        }
-    }
+        writeRows(numPartitions, rowsPerPartition);
 
-    @Test
-    public void simplePagedReadsTest() throws Throwable
-    {
-        try (Cluster cluster = init(builder().withNodes(3).start()))
+        String query = "SELECT * FROM %s WHERE k IN (?, ?)";
+        for (int k = 0; k < numPartitions; k += 2)
         {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            int size = 100;
-            Object[][] results = new Object[size][];
-            for (int i = 0; i < size; i++)
-            {
-                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                               ConsistencyLevel.QUORUM,
-                                               i, i);
-                results[i] = new Object[] { 1, i, i};
-            }
+            Object[][] rows = readRows(query, k, k + 1);
+            Assert.assertEquals(rowsPerPartition * 2, rows.length);
 
-            // Make sure paged read returns same results with different page sizes
-            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+            for (int i = 0; i < 2; i++)
             {
-                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
-                                                                    ConsistencyLevel.QUORUM,
-                                                                    pageSize),
-                           results);
+                for (int c = 0; c < rowsPerPartition; c++)
+                {
+                    Object[] row = rows[i * rowsPerPartition + c];
+                    validateRow(row, rowsPerPartition, k + i, c);
+                }
             }
         }
     }
 
-    @Test
-    public void pagingWithRepairTest() throws Throwable
+    /**
+     * Writes {@code numPartitions} with {@code rowsPerPartition} each, with overrides in different sstables and memtables.
+     */
+    private void writeRows(int numPartitions, int rowsPerPartition)
     {
-        try (Cluster cluster = init(builder().withNodes(3).start()))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            int size = 100;
-            Object[][] results = new Object[size][];
-            for (int i = 0; i < size; i++)
-            {
-                // Make sure that data lands on different nodes and not coordinator
-                cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                                i, i);
-
-                results[i] = new Object[] { 1, i, i};
-            }
-
-            // Make sure paged read returns same results with different page sizes
-            for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
-            {
-                assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
-                                                                    ConsistencyLevel.ALL,
-                                                                    pageSize),
-                           results);
-            }
-
-            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
-                       results);
-        }
+        String update = withTable("UPDATE %s SET v=? WHERE k=? AND c=?");
+        ICoordinator coordinator = cluster.coordinator(1);
+
+        // insert all the partition rows in a single sstable
+        for (int c = 0; c < rowsPerPartition; c++)
+            for (int k = 0; k < numPartitions; k++)
+                coordinator.execute(update, QUORUM, c, k, c);
+        cluster.forEach(i -> i.flush(KEYSPACE));
+
+        // override some rows in a second sstable
+        for (int c = 0; c < rowsPerPartition; c += SECOND_SSTABLE_INTERVAL)
+            for (int k = 0; k < numPartitions; k++)
+                coordinator.execute(update, QUORUM, c + rowsPerPartition, k, c);
+        cluster.forEach(i -> i.flush(KEYSPACE));
+
+        // override some rows only in memtable
+        for (int c = 0; c < rowsPerPartition; c += MEMTABLE_INTERVAL)
+            for (int k = 0; k < numPartitions; k++)
+                coordinator.execute(update, QUORUM, c + rowsPerPartition * 2, k, c);
     }
 
-    @Test
-    public void pagingTests() throws Throwable
+    /**
+     * Runs the specified query in all coordinators, with and without paging.
+     */
+    private Object[][] readRows(String query, Object... boundValues)
     {
-        try (Cluster cluster = init(builder().withNodes(3).start());
-             Cluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+        query = withTable(query);
+
+        // verify that all coordinators return the same results for the query, regardless of paging
+        Object[][] lastRows = null;
+        int lastNode = 1;
+        boolean lastPaging = false;
+        for (int node = 1; node <= NUM_NODES; node++)
         {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-            singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+            ICoordinator coordinator = cluster.coordinator(node);
 
-            for (int i = 0; i < 10; i++)
+            for (boolean paging : BOOLEANS)
             {
-                for (int j = 0; j < 10; j++)
-                {
-                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                   ConsistencyLevel.QUORUM,
-                                                   i, j, i + i);
-                    singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
-                                                      ConsistencyLevel.QUORUM,
-                                                      i, j, i + i);
-                }
-            }
+                Object[][] rows = paging
+                                  ? Iterators.toArray(coordinator.executeWithPaging(query, QUORUM, 1, boundValues),
+                                                      Object[].class)
+                                  : coordinator.execute(query, QUORUM, boundValues);
 
-            int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
-            String[] statements = new String [] {"SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
-                                                 "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
-                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl LIMIT 3",
-                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10)",
-                                                 "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
-            };
-            for (String statement : statements)
-            {
-                for (int pageSize : pageSizes)
+                if (lastRows != null)
                 {
-                    assertRows(cluster.coordinator(1)
-                                      .executeWithPaging(statement,
-                                                         ConsistencyLevel.QUORUM,  pageSize),
-                               singleNode.coordinator(1)
-                                         .executeWithPaging(statement,
-                                                            ConsistencyLevel.QUORUM,  Integer.MAX_VALUE));
+                    try
+                    {
+                        assertRows(lastRows, rows);
+                    }
+                    catch (AssertionError e)
+                    {
+                        fail(String.format("Node %d %s paging has returned different results " +
+                                           "for the same query than node %d %s paging:\n%s",
+                                           node, paging ? "with" : "without",
+                                           lastNode, lastPaging ? "with" : "without",
+                                           e.getMessage()));
+                    }
                 }
-            }
-
-        }
-    }
-
-    @Test
-    public void metricsCountQueriesTest() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(2)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-            for (int i = 0; i < 100; i++)
-                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
-
-            long readCount1 = readCount(cluster.get(1));
-            long readCount2 = readCount(cluster.get(2));
-            for (int i = 0; i < 100; i++)
-                cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
-
-            readCount1 = readCount(cluster.get(1)) - readCount1;
-            readCount2 = readCount(cluster.get(2)) - readCount2;
-            Assert.assertEquals(readCount1, readCount2);
-            Assert.assertEquals(100, readCount1);
-        }
-    }
 
+                lastRows = rows;
+                lastPaging = paging;
+            }
 
-    @Test
-    public void skippedSSTableWithPartitionDeletionTest() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(2)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
-            // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
-            cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
-            // and a row from a different partition, to provide the sstable's min/max clustering
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2");
-            cluster.get(1).flush(KEYSPACE);
-            // expect a single sstable, where minTimestamp equals the timestamp of the partition delete
-            cluster.get(1).runOnInstance(() -> {
-                Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
-                                                      .getColumnFamilyStore("tbl")
-                                                      .getLiveSSTables();
-                assertEquals("Expected a single sstable, but found " + sstables.size(), 1, sstables.size());
-                long minTimestamp = sstables.iterator().next().getMinTimestamp();
-                assertEquals("Expected min timestamp of 1, but was " + minTimestamp, 1, minTimestamp);
-            });
-
-            // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
-
-
-            Object[][] rows = cluster.coordinator(1)
-                                     .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
-                                              ConsistencyLevel.ALL);
-            assertEquals("Expected 0 rows, but found " + rows.length, 0, rows.length);
+            lastNode = node;
         }
-    }
+        Assert.assertNotNull(lastRows);
 
-    @Test
-    public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
-    {
-        try (Cluster cluster = init(Cluster.create(2)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
-            // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
-            cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
-            // and a row from a different partition, to provide the sstable's min/max clustering
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1");
-            cluster.get(1).flush(KEYSPACE);
-            // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
-            // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly,
-            // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first
-            // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
-            // know what data and/or tombstones are present on other nodes
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
-            cluster.get(1).flush(KEYSPACE);
-
-            // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
-
-            Object[][] rows = cluster.coordinator(1)
-                                     .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
-                                              ConsistencyLevel.ALL);
-            // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
-            // node 1 (0, 6, 6) was not.
-            assertRows(rows, new Object[] {0, 6 ,6});
-        }
-    }
+        // undo the clustering reverse sorting to ease validation
+        if (reverse)
+            ArrayUtils.reverse(lastRows);
 
-    @Test
-    public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable
-    {
-        // don't not add skipped sstables back just because the partition delete ts is < the local min ts
+        // sort by partition key to ease validation
+        Arrays.sort(lastRows, Comparator.comparing(row -> (int) row[0]));
 
-        try (Cluster cluster = init(Cluster.create(2)))
-        {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
-            // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
-            cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
-            // and a row from a different partition, to provide the sstable's min/max clustering
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3");
-            cluster.get(1).flush(KEYSPACE);
-            // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
-            // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable
-            // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the
-            // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would  cause the
-            // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
-            // know what data and/or tombstones are present on other nodes
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
-            cluster.get(1).flush(KEYSPACE);
-
-            // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
-
-            Object[][] rows = cluster.coordinator(1)
-                                     .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
-                                              ConsistencyLevel.ALL);
-            // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
-            // node 1 (0, 6, 6) was not.
-            assertRows(rows, new Object[] {0, 6 ,6});
-        }
+        return lastRows;
     }
 
-    private long readCount(IInvokableInstance instance)
+    private void validateRow(Object[] row, int rowsPerPartition, int k, int c)
     {
-        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
+        Assert.assertNotNull(row);
+
+        if (c % MEMTABLE_INTERVAL == 0)
+            assertRow(row, row(k, c, c + rowsPerPartition * 2));
+        else if (c % SECOND_SSTABLE_INTERVAL == 0)
+            assertRow(row, row(k, c, c + rowsPerPartition));
+        else
+            assertRow(row, row(k, c, c));
     }
 
-    private static Object[][] rows(Object[]...rows)
+    private String withTable(String query)
     {
-        Object[][] r = new Object[rows.length][];
-        System.arraycopy(rows, 0, r, 0, rows.length);
-        return r;
+        return String.format(query, tableName);
     }
-
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 801df7d..37f43bd 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -42,6 +42,9 @@ import org.apache.cassandra.distributed.shared.DistributedTestBase;
 
 public class TestBaseImpl extends DistributedTestBase
 {
+    public static final Object[][] EMPTY_ROWS = new Object[0][];
+    public static final boolean[] BOOLEANS = new boolean[]{ false, true };
+
     @After
     public void afterEach() {
         super.afterEach();
@@ -60,6 +63,13 @@ public class TestBaseImpl extends DistributedTestBase
         return Cluster.build();
     }
 
+    public static Object[][] rows(Object[]...rows)
+    {
+        Object[][] r = new Object[rows.length][];
+        System.arraycopy(rows, 0, r, 0, rows.length);
+        return r;
+    }
+
     public static Object list(Object...values)
     {
         return Arrays.asList(values);
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityTestBase.java
new file mode 100644
index 0000000..8968ede
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityTestBase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static java.lang.String.format;
+
+public class MixedModeAvailabilityTestBase extends UpgradeTestBase
+{
+    private static final int NUM_NODES = 3;
+    private static final int COORDINATOR = 1;
+    private static final List<Tester> TESTERS = Arrays.asList(new Tester(ONE, ALL),
+                                                              new Tester(QUORUM, QUORUM),
+                                                              new Tester(ALL, ONE));
+
+
+    protected static void testAvailability(Versions.Major initial, Versions.Major... upgrade) throws Throwable
+    {
+        testAvailability(true, initial, upgrade);
+        testAvailability(false, initial, upgrade);
+    }
+
+    private static void testAvailability(boolean upgradedCoordinator,
+                                         Versions.Major initial,
+                                         Versions.Major... upgrade) throws Throwable
+    {
+        new TestCase()
+        .nodes(NUM_NODES)
+        .nodesToUpgrade(upgradedCoordinator ? 1 : 2)
+        .upgrade(initial, upgrade)
+        .withConfig(config -> config.set("read_request_timeout_in_ms", SECONDS.toMillis(1))
+                                    .set("write_request_timeout_in_ms", SECONDS.toMillis(1)))
+        .setup(c -> c.schemaChange(withKeyspace("CREATE TABLE %s.t (k uuid, c int, v int, PRIMARY KEY (k, c))")))
+        .runAfterNodeUpgrade((cluster, n) -> {
+
+            // using 0 to 2 down nodes...
+            for (int numNodesDown = 0; numNodesDown < NUM_NODES; numNodesDown++)
+            {
+                // disable communications to the down nodes
+                if (numNodesDown > 0)
+                    cluster.get(replica(COORDINATOR, numNodesDown)).shutdown();
+
+                // run the test cases that are compatible with the number of down nodes
+                ICoordinator coordinator = cluster.coordinator(COORDINATOR);
+                for (Tester tester : TESTERS)
+                    tester.test(coordinator, numNodesDown, upgradedCoordinator);
+            }
+        }).run();
+    }
+
+    private static int replica(int node, int depth)
+    {
+        assert depth >= 0;
+        return depth == 0 ? node : replica(node == NUM_NODES ? 1 : node + 1, depth - 1);
+    }
+
+    private static class Tester
+    {
+        private static final String INSERT = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
+        private static final String SELECT = withKeyspace("SELECT * FROM %s.t WHERE k = ?");
+
+        private final ConsistencyLevel writeConsistencyLevel;
+        private final ConsistencyLevel readConsistencyLevel;
+
+        private Tester(ConsistencyLevel writeConsistencyLevel, ConsistencyLevel readConsistencyLevel)
+        {
+            this.writeConsistencyLevel = writeConsistencyLevel;
+            this.readConsistencyLevel = readConsistencyLevel;
+        }
+
+        public void test(ICoordinator coordinator, int numNodesDown, boolean upgradedCoordinator)
+        {
+            UUID key = UUID.randomUUID();
+            Object[] row1 = row(key, 1, 10);
+            Object[] row2 = row(key, 2, 20);
+
+            try
+            {
+                // test write
+                maybeFail(WriteTimeoutException.class, numNodesDown > maxNodesDown(writeConsistencyLevel), () -> {
+                    coordinator.execute(INSERT, writeConsistencyLevel, row1);
+                    coordinator.execute(INSERT, writeConsistencyLevel, row2);
+                });
+
+                // test read
+                maybeFail(ReadTimeoutException.class, numNodesDown > maxNodesDown(readConsistencyLevel), () -> {
+                    Object[][] rows = coordinator.execute(SELECT, readConsistencyLevel, key);
+                    if (numNodesDown <= maxNodesDown(writeConsistencyLevel))
+                        assertRows(rows, row1, row2);
+                });
+            }
+            catch (Throwable t)
+            {
+                throw new AssertionError(format("Unexpected error in case %s-%s with %s coordinator and %d nodes down",
+                                                writeConsistencyLevel,
+                                                readConsistencyLevel,
+                                                upgradedCoordinator ? "upgraded" : "not upgraded",
+                                                numNodesDown), t);
+            }
+        }
+
+        private static <E extends Exception> void maybeFail(Class<E> exceptionClass, boolean shouldFail, Runnable test)
+        {
+            try
+            {
+                test.run();
+                assertFalse(shouldFail);
+            }
+            catch (Exception e)
+            {
+                // we should use exception class names due to the different classpaths
+                String className = e.getClass().getCanonicalName();
+                if (e instanceof RuntimeException && e.getCause() != null)
+                    className = e.getCause().getClass().getCanonicalName();
+
+                if (shouldFail)
+                    assertEquals(exceptionClass.getCanonicalName(), className);
+                else
+                    throw e;
+            }
+        }
+
+        private static int maxNodesDown(ConsistencyLevel cl)
+        {
+            if (cl == ONE)
+                return 2;
+
+            if (cl == QUORUM)
+                return 1;
+
+            if (cl == ALL)
+                return 0;
+
+            throw new IllegalArgumentException("Usupported consistency level: " + cl);
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV22Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV22Test.java
new file mode 100644
index 0000000..367ef5f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV22Test.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeAvailabilityTestBase} for upgrades from v22.
+ */
+public class MixedModeAvailabilityV22Test extends MixedModeAvailabilityTestBase
+{
+    @Test
+    public void testAvailabilityV22ToV30() throws Throwable
+    {
+        testAvailability(Versions.Major.v22, Versions.Major.v30);
+    }
+
+    @Test
+    public void testAvailabilityV22ToV3X() throws Throwable
+    {
+        testAvailability(Versions.Major.v22, Versions.Major.v3X);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV30Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV30Test.java
new file mode 100644
index 0000000..4e25a64
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV30Test.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeAvailabilityTestBase} for upgrades from v30.
+ */
+public class MixedModeAvailabilityV30Test extends MixedModeAvailabilityTestBase
+{
+    @Test
+    public void testAvailabilityV30ToV3X() throws Throwable
+    {
+        testAvailability(Versions.Major.v30, Versions.Major.v3X);
+    }
+
+    @Test
+    public void testAvailabilityV30ToV4() throws Throwable
+    {
+        testAvailability(Versions.Major.v30, Versions.Major.v4);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV3XTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV3XTest.java
new file mode 100644
index 0000000..622d6c6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV3XTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeAvailabilityTestBase} for upgrades from v3X.
+ */
+public class MixedModeAvailabilityV3XTest extends MixedModeAvailabilityTestBase
+{
+    @Test
+    public void testAvailabilityV3XToV4() throws Throwable
+    {
+        testAvailability(Versions.Major.v3X, Versions.Major.v4);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyTestBase.java
new file mode 100644
index 0000000..ef54d61
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+public class MixedModeConsistencyTestBase extends UpgradeTestBase
+{
+    protected static void testConsistency(Versions.Major initial, Versions.Major... upgrade) throws Throwable
+    {
+        List<Tester> testers = new ArrayList<>();
+        testers.addAll(Tester.create(1, ALL));
+        testers.addAll(Tester.create(2, ALL, QUORUM));
+        testers.addAll(Tester.create(3, ALL, QUORUM, ONE));
+
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1)
+        .upgrade(initial, upgrade)
+        .withConfig(config -> config.set("read_request_timeout_in_ms", SECONDS.toMillis(30))
+                                    .set("write_request_timeout_in_ms", SECONDS.toMillis(30)))
+        .setup(cluster -> {
+            Tester.createTable(cluster);
+            for (Tester tester : testers)
+                tester.writeRows(cluster);
+        }).runAfterNodeUpgrade((cluster, node) -> {
+            for (Tester tester : testers)
+                tester.readRows(cluster);
+        }).run();
+    }
+
+    private static class Tester
+    {
+        private final int numWrittenReplicas;
+        private final ConsistencyLevel readConsistencyLevel;
+        private final UUID partitionKey;
+
+        private Tester(int numWrittenReplicas, ConsistencyLevel readConsistencyLevel)
+        {
+            this.numWrittenReplicas = numWrittenReplicas;
+            this.readConsistencyLevel = readConsistencyLevel;
+            partitionKey = UUID.randomUUID();
+        }
+
+        private static List<Tester> create(int numWrittenReplicas, ConsistencyLevel... readConsistencyLevels)
+        {
+            return Stream.of(readConsistencyLevels)
+                         .map(readConsistencyLevel -> new Tester(numWrittenReplicas, readConsistencyLevel))
+                         .collect(Collectors.toList());
+        }
+
+        private static void createTable(UpgradeableCluster cluster)
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k uuid, c int, v int, PRIMARY KEY (k, c))"));
+        }
+
+        private void writeRows(UpgradeableCluster cluster)
+        {
+            String query = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
+            for (int i = 1; i <= numWrittenReplicas; i++)
+            {
+                IUpgradeableInstance node = cluster.get(i);
+                node.executeInternal(query, partitionKey, 1, 10);
+                node.executeInternal(query, partitionKey, 2, 20);
+                node.executeInternal(query, partitionKey, 3, 30);
+            }
+        }
+
+        private void readRows(UpgradeableCluster cluster)
+        {
+            String query = withKeyspace("SELECT * FROM %s.t WHERE k = ?");
+            int coordinator = 1;
+            try
+            {
+                for (coordinator = 1; coordinator <= cluster.size(); coordinator++)
+                {
+                    assertRows(cluster.coordinator(coordinator).execute(query, readConsistencyLevel, partitionKey),
+                               row(partitionKey, 1, 10),
+                               row(partitionKey, 2, 20),
+                               row(partitionKey, 3, 30));
+                }
+            }
+            catch (Throwable t)
+            {
+                String format = "Unexpected error reading rows with %d written replicas, CL=%s and coordinator=%s";
+                throw new AssertionError(format(format, numWrittenReplicas, readConsistencyLevel, coordinator), t);
+            }
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV22Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV22Test.java
new file mode 100644
index 0000000..ef9f766
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV22Test.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeConsistencyTestBase} for upgrades from v22.
+ */
+public class MixedModeConsistencyV22Test extends MixedModeConsistencyTestBase
+{
+    @Test
+    public void testConsistencyV22ToV30() throws Throwable
+    {
+        testConsistency(Versions.Major.v22, Versions.Major.v30);
+    }
+
+    @Test
+    public void testConsistencyV22ToV3X() throws Throwable
+    {
+        testConsistency(Versions.Major.v22, Versions.Major.v3X);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV30Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV30Test.java
new file mode 100644
index 0000000..efe94ba
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV30Test.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeConsistencyTestBase} for upgrades from v30.
+ */
+public class MixedModeConsistencyV30Test extends MixedModeConsistencyTestBase
+{
+    @Test
+    public void testConsistencyV30ToV3X() throws Throwable
+    {
+        testConsistency(Versions.Major.v30, Versions.Major.v3X);
+    }
+
+    @Test
+    public void testConsistencyV30ToV4() throws Throwable
+    {
+        testConsistency(Versions.Major.v30, Versions.Major.v4);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV3XTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV3XTest.java
new file mode 100644
index 0000000..0405716
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV3XTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeConsistencyTestBase} for upgrades from v3X.
+ */
+public class MixedModeConsistencyV3XTest extends MixedModeConsistencyTestBase
+{
+    @Test
+    public void testConsistencyV3XToV4() throws Throwable
+    {
+        testConsistency(Versions.Major.v3X, Versions.Major.v4);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 9ce7867..01ac40d 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@ -27,33 +27,6 @@ import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 
 public class UpgradeTest extends UpgradeTestBase
 {
-
-    @Test
-    public void upgradeTest() throws Throwable
-    {
-        new TestCase()
-        .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
-        .upgrade(Versions.Major.v30, Versions.Major.v3X, Versions.Major.v4)
-        .setup((cluster) -> {
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
-            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-        })
-        .runAfterNodeUpgrade((cluster, node) -> {
-            for (int i = 1; i <= cluster.size(); i++)
-            {
-                assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
-                                                          ConsistencyLevel.ALL,
-                                                          1),
-                           row(1, 1, 1),
-                           row(1, 2, 2),
-                           row(1, 3, 3));
-            }
-        }).run();
-    }
-    
     @Test
     public void simpleUpgradeWithNetworkAndGossipTest() throws Throwable
     {
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index eba9a7c..ca5ec63 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -130,6 +130,11 @@ public class Util
         return comparator.make(o);
     }
 
+    public static Token token(int key)
+    {
+        return testPartitioner().getToken(ByteBufferUtil.bytes(key));
+    }
+
     public static Token token(String key)
     {
         return testPartitioner().getToken(ByteBufferUtil.bytes(key));
diff --git a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
index 56d7ae1..986a125 100644
--- a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
@@ -20,47 +20,32 @@ package org.apache.cassandra.db;
 
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 
-import com.google.common.collect.Iterators;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.cassandra.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.ReplicaPlan;
-import org.apache.cassandra.locator.ReplicaPlans;
-import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
 
 public class PartitionRangeReadTest
 {
-    public static final String KEYSPACE1 = "PartitionRangeReadTest1";
-    public static final String KEYSPACE2 = "PartitionRangeReadTest2";
-    public static final String CF_STANDARD1 = "Standard1";
-    public static final String CF_STANDARDINT = "StandardInteger1";
-    public static final String CF_COMPACT1 = "Compact1";
+    private static final String KEYSPACE1 = "PartitionRangeReadTest1";
+    private static final String KEYSPACE2 = "PartitionRangeReadTest2";
+    private static final String CF_STANDARD1 = "Standard1";
+    private static final String CF_STANDARDINT = "StandardInteger1";
+    private static final String CF_COMPACT1 = "Compact1";
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
@@ -99,11 +84,10 @@ public class PartitionRangeReadTest
     }
 
     @Test
-    public void testCassandra6778() throws CharacterCodingException
+    public void testCassandra6778()
     {
-        String cfname = CF_STANDARDINT;
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARDINT);
         cfs.truncateBlocking();
 
         ByteBuffer col = ByteBufferUtil.bytes("val");
@@ -127,11 +111,11 @@ public class PartitionRangeReadTest
 
         // fetch by the first column name; we should get the second version of the column value
         Row row = Util.getOnlyRow(Util.cmd(cfs, "k1").includeRow(new BigInteger(new byte[]{1})).build());
-        assertTrue(row.getCell(cDef).buffer().equals(ByteBufferUtil.bytes("val2")));
+        assertEquals(ByteBufferUtil.bytes("val2"), row.getCell(cDef).buffer());
 
         // fetch by the second column name; we should get the second version of the column value
         row = Util.getOnlyRow(Util.cmd(cfs, "k1").includeRow(new BigInteger(new byte[]{0, 0, 1})).build());
-        assertTrue(row.getCell(cDef).buffer().equals(ByteBufferUtil.bytes("val2")));
+        assertEquals(ByteBufferUtil.bytes("val2"), row.getCell(cDef).buffer());
     }
 
     @Test
@@ -159,12 +143,10 @@ public class PartitionRangeReadTest
     }
 
     @Test
-    public void testRangeSliceInclusionExclusion() throws Throwable
+    public void testRangeSliceInclusionExclusion()
     {
-        String keyspaceName = KEYSPACE1;
-        String cfName = CF_STANDARD1;
-        Keyspace keyspace = Keyspace.open(keyspaceName);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
         cfs.clearUnsafe();
 
         for (int i = 0; i < 10; ++i)
@@ -184,152 +166,26 @@ public class PartitionRangeReadTest
         // Start and end inclusive
         partitions = Util.getAll(Util.cmd(cfs).fromKeyIncl("2").toKeyIncl("7").build());
         assertEquals(6, partitions.size());
-        assertTrue(partitions.get(0).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("2")));
-        assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("7")));
+        assertEquals(ByteBufferUtil.bytes("2"), partitions.get(0).iterator().next().getCell(cDef).buffer());
+        assertEquals(ByteBufferUtil.bytes("7"), partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer());
 
         // Start and end excluded
         partitions = Util.getAll(Util.cmd(cfs).fromKeyExcl("2").toKeyExcl("7").build());
         assertEquals(4, partitions.size());
-        assertTrue(partitions.get(0).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("3")));
-        assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("6")));
+        assertEquals(ByteBufferUtil.bytes("3"), partitions.get(0).iterator().next().getCell(cDef).buffer());
+        assertEquals(ByteBufferUtil.bytes("6"), partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer());
 
         // Start excluded, end included
         partitions = Util.getAll(Util.cmd(cfs).fromKeyExcl("2").toKeyIncl("7").build());
         assertEquals(5, partitions.size());
-        assertTrue(partitions.get(0).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("3")));
-        assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("7")));
+        assertEquals(ByteBufferUtil.bytes("3"), partitions.get(0).iterator().next().getCell(cDef).buffer());
+        assertEquals(ByteBufferUtil.bytes("7"), partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer());
 
         // Start included, end excluded
         partitions = Util.getAll(Util.cmd(cfs).fromKeyIncl("2").toKeyExcl("7").build());
         assertEquals(5, partitions.size());
-        assertTrue(partitions.get(0).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("2")));
-        assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("6")));
-    }
-
-    @Test
-    public void testComputeConcurrencyFactor()
-    {
-        int maxConcurrentRangeRequest = 32;
-
-        // no live row returned, fetch all remaining ranges but hit the max instead
-        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0);
-        assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
-
-        // no live row returned, fetch all remaining ranges
-        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0);
-        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
-
-        // returned half rows, fetch rangesQueried again but hit the max instead
-        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240);
-        assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
-
-        // returned half rows, fetch rangesQueried again
-        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240);
-        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
-
-        // returned most of rows, 1 more range to fetch
-        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479);
-        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
-    }
-
-    @Test
-    public void testRangeCountWithRangeMerge()
-    {
-        List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
-        int vnodeCount = 0;
-
-        Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
-        for (int i = 0; i + 1 < tokens.size(); i++)
-        {
-            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
-            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
-            vnodeCount++;
-        }
-
-        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
-        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
-        // all ranges are merged as test has only one node.
-        assertEquals(vnodeCount, mergedRange.vnodeCount());
-    }
-
-    @Test
-    public void testRangeQueried()
-    {
-        List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
-        int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges
-
-        Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
-        cfs.clearUnsafe();
-
-        int rows = 100;
-        for (int i = 0; i < rows; ++i)
-        {
-            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
-            builder.clustering("c");
-            builder.add("val", String.valueOf(i));
-            builder.build().applyUnsafe();
-        }
-        cfs.forceBlockingFlush();
-
-        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
-
-        // without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges
-        Iterator<ReplicaPlan.ForRangeRead> ranges = rangeIterator(command, keyspace, false);
-        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, System.nanoTime());
-        verifyRangeCommandIterator(data, rows, 2, vnodeCount);
-
-        // without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch
-        ranges = rangeIterator(command, keyspace, false);
-        data = new StorageProxy.RangeCommandIterator(ranges, command, vnodeCount, 1000, vnodeCount, System.nanoTime());
-        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
-
-        // without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch
-        ranges = rangeIterator(command, keyspace, false);
-        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, System.nanoTime());
-        verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount);
-
-        // with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost
-        ranges = rangeIterator(command, keyspace, true);
-        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, System.nanoTime());
-        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
-
-        // with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost
-        ranges = rangeIterator(command, keyspace, true);
-        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, System.nanoTime());
-        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
-    }
-
-    private Iterator<ReplicaPlan.ForRangeRead> rangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, boolean withRangeMerger)
-    {
-        Iterator<ReplicaPlan.ForRangeRead> ranges = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
-        if (withRangeMerger)
-            ranges = new StorageProxy.RangeMerger(ranges, keyspace, ConsistencyLevel.ONE);
-
-        return  ranges;
-    }
-
-    private void verifyRangeCommandIterator(StorageProxy.RangeCommandIterator data, int rows, int batches, int vnodeCount)
-    {
-        int num = Util.size(data);
-        assertEquals(rows, num);
-        assertEquals(batches, data.batchesRequested());
-        assertEquals(vnodeCount, data.rangesQueried());
-    }
-
-    private List<Token> setTokens(List<Integer> values)
-    {
-        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
-        List<Token> tokens = new ArrayList<>(values.size());
-        for (Integer val : values)
-            tokens.add(partitioner.getToken(ByteBufferUtil.bytes(val)));
-
-        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-        tmd.clearUnsafe();
-        tmd.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
-
-        return tokens;
+        assertEquals(ByteBufferUtil.bytes("2"), partitions.get(0).iterator().next().getCell(cDef).buffer());
+        assertEquals(ByteBufferUtil.bytes("6"), partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer());
     }
 }
 
diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
deleted file mode 100644
index 590cfeb..0000000
--- a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.service;
-
-import java.util.List;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.db.PartitionPosition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.TokenMetadata;
-
-import static org.apache.cassandra.Util.rp;
-import static org.apache.cassandra.Util.token;
-import static org.junit.Assert.assertEquals;
-
-public class StorageProxyTest
-{
-    private static Range<PartitionPosition> range(PartitionPosition left, PartitionPosition right)
-    {
-        return new Range<PartitionPosition>(left, right);
-    }
-
-    private static Bounds<PartitionPosition> bounds(PartitionPosition left, PartitionPosition right)
-    {
-        return new Bounds<PartitionPosition>(left, right);
-    }
-
-    private static ExcludingBounds<PartitionPosition> exBounds(PartitionPosition left, PartitionPosition right)
-    {
-        return new ExcludingBounds<PartitionPosition>(left, right);
-    }
-
-    private static IncludingExcludingBounds<PartitionPosition> incExBounds(PartitionPosition left, PartitionPosition right)
-    {
-        return new IncludingExcludingBounds<PartitionPosition>(left, right);
-    }
-
-    private static PartitionPosition startOf(String key)
-    {
-        return token(key).minKeyBound();
-    }
-
-    private static PartitionPosition endOf(String key)
-    {
-        return token(key).maxKeyBound();
-    }
-
-    private static Range<Token> tokenRange(String left, String right)
-    {
-        return new Range<Token>(token(left), token(right));
-    }
-
-    private static Bounds<Token> tokenBounds(String left, String right)
-    {
-        return new Bounds<Token>(token(left), token(right));
-    }
-
-    @BeforeClass
-    public static void beforeClass() throws Throwable
-    {
-        DatabaseDescriptor.daemonInitialization();
-        DatabaseDescriptor.getHintsDirectory().mkdir();
-        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-        tmd.updateNormalToken(token("1"), InetAddressAndPort.getByName("127.0.0.1"));
-        tmd.updateNormalToken(token("6"), InetAddressAndPort.getByName("127.0.0.6"));
-    }
-
-    // test getRestrictedRanges for token
-    private void testGRR(AbstractBounds<Token> queryRange, AbstractBounds<Token>... expected)
-    {
-        // Testing for tokens
-        List<AbstractBounds<Token>> restricted = StorageProxy.getRestrictedRanges(queryRange);
-        assertEquals(restricted.toString(), expected.length, restricted.size());
-        for (int i = 0; i < expected.length; i++)
-            assertEquals("Mismatch for index " + i + ": " + restricted, expected[i], restricted.get(i));
-    }
-
-    // test getRestrictedRanges for keys
-    private void testGRRKeys(AbstractBounds<PartitionPosition> queryRange, AbstractBounds<PartitionPosition>... expected)
-    {
-        // Testing for keys
-        List<AbstractBounds<PartitionPosition>> restrictedKeys = StorageProxy.getRestrictedRanges(queryRange);
-        assertEquals(restrictedKeys.toString(), expected.length, restrictedKeys.size());
-        for (int i = 0; i < expected.length; i++)
-            assertEquals("Mismatch for index " + i + ": " + restrictedKeys, expected[i], restrictedKeys.get(i));
-
-    }
-
-    @Test
-    public void testGRR() throws Throwable
-    {
-        // no splits
-        testGRR(tokenRange("2", "5"), tokenRange("2", "5"));
-        testGRR(tokenBounds("2", "5"), tokenBounds("2", "5"));
-        // single split
-        testGRR(tokenRange("2", "7"), tokenRange("2", "6"), tokenRange("6", "7"));
-        testGRR(tokenBounds("2", "7"), tokenBounds("2", "6"), tokenRange("6", "7"));
-        // single split starting from min
-        testGRR(tokenRange("", "2"), tokenRange("", "1"), tokenRange("1", "2"));
-        testGRR(tokenBounds("", "2"), tokenBounds("", "1"), tokenRange("1", "2"));
-        // single split ending with max
-        testGRR(tokenRange("5", ""), tokenRange("5", "6"), tokenRange("6", ""));
-        testGRR(tokenBounds("5", ""), tokenBounds("5", "6"), tokenRange("6", ""));
-        // two splits
-        testGRR(tokenRange("0", "7"), tokenRange("0", "1"), tokenRange("1", "6"), tokenRange("6", "7"));
-        testGRR(tokenBounds("0", "7"), tokenBounds("0", "1"), tokenRange("1", "6"), tokenRange("6", "7"));
-
-
-        // Keys
-        // no splits
-        testGRRKeys(range(rp("2"), rp("5")), range(rp("2"), rp("5")));
-        testGRRKeys(bounds(rp("2"), rp("5")), bounds(rp("2"), rp("5")));
-        testGRRKeys(exBounds(rp("2"), rp("5")), exBounds(rp("2"), rp("5")));
-        // single split testGRRKeys(range("2", "7"), range(rp("2"), endOf("6")), range(endOf("6"), rp("7")));
-        testGRRKeys(bounds(rp("2"), rp("7")), bounds(rp("2"), endOf("6")), range(endOf("6"), rp("7")));
-        testGRRKeys(exBounds(rp("2"), rp("7")), range(rp("2"), endOf("6")), exBounds(endOf("6"), rp("7")));
-        testGRRKeys(incExBounds(rp("2"), rp("7")), bounds(rp("2"), endOf("6")), exBounds(endOf("6"), rp("7")));
-        // single split starting from min
-        testGRRKeys(range(rp(""), rp("2")), range(rp(""), endOf("1")), range(endOf("1"), rp("2")));
-        testGRRKeys(bounds(rp(""), rp("2")), bounds(rp(""), endOf("1")), range(endOf("1"), rp("2")));
-        testGRRKeys(exBounds(rp(""), rp("2")), range(rp(""), endOf("1")), exBounds(endOf("1"), rp("2")));
-        testGRRKeys(incExBounds(rp(""), rp("2")), bounds(rp(""), endOf("1")), exBounds(endOf("1"), rp("2")));
-        // single split ending with max
-        testGRRKeys(range(rp("5"), rp("")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")));
-        testGRRKeys(bounds(rp("5"), rp("")), bounds(rp("5"), endOf("6")), range(endOf("6"), rp("")));
-        testGRRKeys(exBounds(rp("5"), rp("")), range(rp("5"), endOf("6")), exBounds(endOf("6"), rp("")));
-        testGRRKeys(incExBounds(rp("5"), rp("")), bounds(rp("5"), endOf("6")), exBounds(endOf("6"), rp("")));
-        // two splits
-        testGRRKeys(range(rp("0"), rp("7")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("7")));
-        testGRRKeys(bounds(rp("0"), rp("7")), bounds(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("7")));
-        testGRRKeys(exBounds(rp("0"), rp("7")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("7")));
-        testGRRKeys(incExBounds(rp("0"), rp("7")), bounds(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("7")));
-    }
-
-    @Test
-    public void testGRRExact() throws Throwable
-    {
-        // min
-        testGRR(tokenRange("1", "5"), tokenRange("1", "5"));
-        testGRR(tokenBounds("1", "5"), tokenBounds("1", "1"), tokenRange("1", "5"));
-        // max
-        testGRR(tokenRange("2", "6"), tokenRange("2", "6"));
-        testGRR(tokenBounds("2", "6"), tokenBounds("2", "6"));
-        // both
-        testGRR(tokenRange("1", "6"), tokenRange("1", "6"));
-        testGRR(tokenBounds("1", "6"), tokenBounds("1", "1"), tokenRange("1", "6"));
-
-
-        // Keys
-        // min
-        testGRRKeys(range(endOf("1"), endOf("5")), range(endOf("1"), endOf("5")));
-        testGRRKeys(range(rp("1"), endOf("5")), range(rp("1"), endOf("1")), range(endOf("1"), endOf("5")));
-        testGRRKeys(bounds(startOf("1"), endOf("5")), bounds(startOf("1"), endOf("1")), range(endOf("1"), endOf("5")));
-        testGRRKeys(exBounds(endOf("1"), rp("5")), exBounds(endOf("1"), rp("5")));
-        testGRRKeys(exBounds(rp("1"), rp("5")), range(rp("1"), endOf("1")), exBounds(endOf("1"), rp("5")));
-        testGRRKeys(exBounds(startOf("1"), endOf("5")), range(startOf("1"), endOf("1")), exBounds(endOf("1"), endOf("5")));
-        testGRRKeys(incExBounds(rp("1"), rp("5")), bounds(rp("1"), endOf("1")), exBounds(endOf("1"), rp("5")));
-        // max
-        testGRRKeys(range(endOf("2"), endOf("6")), range(endOf("2"), endOf("6")));
-        testGRRKeys(bounds(startOf("2"), endOf("6")), bounds(startOf("2"), endOf("6")));
-        testGRRKeys(exBounds(rp("2"), rp("6")), exBounds(rp("2"), rp("6")));
-        testGRRKeys(incExBounds(rp("2"), rp("6")), incExBounds(rp("2"), rp("6")));
-        // bothKeys
-        testGRRKeys(range(rp("1"), rp("6")), range(rp("1"), endOf("1")), range(endOf("1"), rp("6")));
-        testGRRKeys(bounds(rp("1"), rp("6")), bounds(rp("1"), endOf("1")), range(endOf("1"), rp("6")));
-        testGRRKeys(exBounds(rp("1"), rp("6")), range(rp("1"), endOf("1")), exBounds(endOf("1"), rp("6")));
-        testGRRKeys(incExBounds(rp("1"), rp("6")), bounds(rp("1"), endOf("1")), exBounds(endOf("1"), rp("6")));
-    }
-
-    @Test
-    public void testGRRWrapped() throws Throwable
-    {
-        // one token in wrapped range
-        testGRR(tokenRange("7", "0"), tokenRange("7", ""), tokenRange("", "0"));
-        // two tokens in wrapped range
-        testGRR(tokenRange("5", "0"), tokenRange("5", "6"), tokenRange("6", ""), tokenRange("", "0"));
-        testGRR(tokenRange("7", "2"), tokenRange("7", ""), tokenRange("", "1"), tokenRange("1", "2"));
-        // full wraps
-        testGRR(tokenRange("0", "0"), tokenRange("0", "1"), tokenRange("1", "6"), tokenRange("6", ""), tokenRange("", "0"));
-        testGRR(tokenRange("", ""), tokenRange("", "1"), tokenRange("1", "6"), tokenRange("6", ""));
-        // wrap on member tokens
-        testGRR(tokenRange("6", "6"), tokenRange("6", ""), tokenRange("", "1"), tokenRange("1", "6"));
-        testGRR(tokenRange("6", "1"), tokenRange("6", ""), tokenRange("", "1"));
-        // end wrapped
-        testGRR(tokenRange("5", ""), tokenRange("5", "6"), tokenRange("6", ""));
-
-        // Keys
-        // one token in wrapped range
-        testGRRKeys(range(rp("7"), rp("0")), range(rp("7"), rp("")), range(rp(""), rp("0")));
-        // two tokens in wrapped range
-        testGRRKeys(range(rp("5"), rp("0")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("0")));
-        testGRRKeys(range(rp("7"), rp("2")), range(rp("7"), rp("")), range(rp(""), endOf("1")), range(endOf("1"), rp("2")));
-        // full wraps
-        testGRRKeys(range(rp("0"), rp("0")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("0")));
-        testGRRKeys(range(rp(""), rp("")), range(rp(""), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")));
-        // wrap on member tokens
-        testGRRKeys(range(rp("6"), rp("6")), range(rp("6"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), endOf("1")), range(endOf("1"), rp("6")));
-        testGRRKeys(range(rp("6"), rp("1")), range(rp("6"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("1")));
-        // end wrapped
-        testGRRKeys(range(rp("5"), rp("")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")));
-    }
-
-    @Test
-    public void testGRRExactBounds() throws Throwable
-    {
-        // equal tokens are special cased as non-wrapping for bounds
-        testGRR(tokenBounds("0", "0"), tokenBounds("0", "0"));
-        // completely empty bounds match everything
-        testGRR(tokenBounds("", ""), tokenBounds("", "1"), tokenRange("1", "6"), tokenRange("6", ""));
-
-        // Keys
-        // equal tokens are special cased as non-wrapping for bounds
-        testGRRKeys(bounds(rp("0"), rp("0")), bounds(rp("0"), rp("0")));
-        // completely empty bounds match everything
-        testGRRKeys(bounds(rp(""), rp("")), bounds(rp(""), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")));
-        testGRRKeys(exBounds(rp(""), rp("")), range(rp(""), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("")));
-        testGRRKeys(incExBounds(rp(""), rp("")), bounds(rp(""), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("")));
-    }
-}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/RangeCommandIteratorTest.java b/test/unit/org/apache/cassandra/service/reads/range/RangeCommandIteratorTest.java
new file mode 100644
index 0000000..d82a503
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/RangeCommandIteratorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Iterators;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import static org.junit.Assert.assertEquals;
+
+public class RangeCommandIteratorTest
+{
+    private static final String KEYSPACE1 = "RangeCommandIteratorTest";
+    private static final String CF_STANDARD1 = "Standard1";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = setTokens(100, 200, 300, 400);
+        int vnodeCount = 0;
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+            vnodeCount++;
+        }
+
+        ReplicaPlanMerger merge = new ReplicaPlanMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(vnodeCount, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = setTokens(100, 200, 300, 400);
+        int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        AbstractBounds<PartitionPosition> keyRange = command.dataRange().keyRange();
+
+        // without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges
+        CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
+        RangeCommandIterator data = new RangeCommandIterator(replicaPlans, command, 1, 1000, vnodeCount, System.nanoTime());
+        verifyRangeCommandIterator(data, rows, 2, vnodeCount);
+
+        // without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch
+        replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
+        data = new RangeCommandIterator(replicaPlans, command, vnodeCount, 1000, vnodeCount, System.nanoTime());
+        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+
+        // without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch
+        replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
+        data = new RangeCommandIterator(replicaPlans, command, 1, 1, vnodeCount, System.nanoTime());
+        verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount);
+
+        // with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost
+        replicaPlans = replicaPlanIterator(keyRange, keyspace, true);
+        data = new RangeCommandIterator(replicaPlans, command, 1, 1000, vnodeCount, System.nanoTime());
+        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+
+        // with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost
+        replicaPlans = replicaPlanIterator(keyRange, keyspace, true);
+        data = new RangeCommandIterator(replicaPlans, command, 1, 1, vnodeCount, System.nanoTime());
+        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+    }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConcurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0);
+        assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240);
+        assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    private static List<Token> setTokens(int... values)
+    {
+        return new TokenUpdater().withKeys(values).update().getTokens();
+    }
+
+    private static CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlanIterator(AbstractBounds<PartitionPosition> keyRange,
+                                                                                   Keyspace keyspace,
+                                                                                   boolean withRangeMerger)
+    {
+        CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans = new ReplicaPlanIterator(keyRange, keyspace, ConsistencyLevel.ONE);
+        if (withRangeMerger)
+            replicaPlans = new ReplicaPlanMerger(replicaPlans, keyspace, ConsistencyLevel.ONE);
+
+        return  replicaPlans;
+    }
+
+    private static void verifyRangeCommandIterator(RangeCommandIterator data, int rows, int batches, int vnodeCount)
+    {
+        int num = Util.size(data);
+        assertEquals(rows, num);
+        assertEquals(batches, data.batchesRequested());
+        assertEquals(vnodeCount, data.rangesQueried());
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/RangeCommandsTest.java b/test/unit/org/apache/cassandra/service/reads/range/RangeCommandsTest.java
new file mode 100644
index 0000000..294be2a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/RangeCommandsTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.nio.ByteBuffer;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.AbstractReadCommandBuilder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.CachedPartition;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.apache.cassandra.db.ConsistencyLevel.ONE;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RangeCommands}.
+ */
+public class RangeCommandsTest extends CQLTester
+{
+    private static final int MAX_CONCURRENCY_FACTOR = 4;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        System.setProperty("cassandra.max_concurrent_range_requests", String.valueOf(MAX_CONCURRENCY_FACTOR));
+    }
+
+    @AfterClass
+    public static void cleanup()
+    {
+        System.clearProperty("cassandra.max_concurrent_range_requests");
+    }
+
+    @Test
+    public void tesConcurrencyFactor()
+    {
+        new TokenUpdater().withTokens("127.0.0.1", 1, 2)
+                          .withTokens("127.0.0.2", 3, 4)
+                          .update();
+
+        String table = createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)");
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(table);
+
+        // verify that a low concurrency factor is not capped by the max concurrency factor
+        PartitionRangeReadCommand command = command(cfs, 50, 50);
+        try (RangeCommandIterator partitions = RangeCommands.rangeCommandIterator(command, ONE, System.nanoTime());
+             ReplicaPlanIterator ranges = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, ONE))
+        {
+            assertEquals(2, partitions.concurrencyFactor());
+            assertEquals(MAX_CONCURRENCY_FACTOR, partitions.maxConcurrencyFactor());
+            assertEquals(5, ranges.size());
+        }
+
+        // verify that a high concurrency factor is capped by the max concurrency factor
+        command = command(cfs, 1000, 50);
+        try (RangeCommandIterator partitions = RangeCommands.rangeCommandIterator(command, ONE, System.nanoTime());
+             ReplicaPlanIterator ranges = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, ONE))
+        {
+            assertEquals(MAX_CONCURRENCY_FACTOR, partitions.concurrencyFactor());
+            assertEquals(MAX_CONCURRENCY_FACTOR, partitions.maxConcurrencyFactor());
+            assertEquals(5, ranges.size());
+        }
+
+        // with 0 estimated results per range the concurrency factor should be 1
+        command = command(cfs, 1000, 0);
+        try (RangeCommandIterator partitions = RangeCommands.rangeCommandIterator(command, ONE, System.nanoTime());
+             ReplicaPlanIterator ranges = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, ONE))
+        {
+            assertEquals(1, partitions.concurrencyFactor());
+            assertEquals(MAX_CONCURRENCY_FACTOR, partitions.maxConcurrencyFactor());
+            assertEquals(5, ranges.size());
+        }
+    }
+
+    @Test
+    public void testEstimateResultsPerRange()
+    {
+        testEstimateResultsPerRange(1);
+        testEstimateResultsPerRange(2);
+    }
+
+    private void testEstimateResultsPerRange(int rf)
+    {
+        String ks = createKeyspace(String.format("CREATE KEYSPACE %%s WITH replication={'class':'SimpleStrategy', 'replication_factor':%s}", rf));
+        String table = createTable(ks, "CREATE TABLE %s (k int PRIMARY KEY, v int)");
+        createIndex(String.format("CREATE CUSTOM INDEX ON %s.%s(v) USING '%s'", ks, table, MockedIndex.class.getName()));
+        Keyspace keyspace = Keyspace.open(ks);
+        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table);
+
+        setNumTokens(1);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 0, null, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 1, null, 1);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 10, null, 10);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 100, null, 100);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 0, 0, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 1, 0, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 10, 0, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 100, 0, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 0, 1000, 1000);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 1, 1000, 1000);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 10, 1000, 1000);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 100, 1000, 1000);
+
+        setNumTokens(5);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 0, null, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 1, null, 0.2f);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 10, null, 2);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 100, null, 20);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 0, 0, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 1, 0, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 10, 0, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 100, 0, 0);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 0, 1000, 200);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 1, 1000, 200);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 10, 1000, 200);
+        testEstimateResultsPerRange(keyspace, cfs, rf, 100, 1000, 200);
+    }
+
+    private static void testEstimateResultsPerRange(Keyspace keyspace,
+                                                    ColumnFamilyStore cfs,
+                                                    int rf,
+                                                    int commandEstimate,
+                                                    Integer indexEstimate,
+                                                    float expectedEstimate)
+    {
+        PartitionRangeReadCommand command = command(cfs, Integer.MAX_VALUE, commandEstimate, indexEstimate);
+        assertEquals(expectedEstimate / rf, RangeCommands.estimateResultsPerRange(command, keyspace), 0);
+    }
+
+    private static PartitionRangeReadCommand command(ColumnFamilyStore cfs, int limit, int commandEstimate)
+    {
+        return command(cfs, limit, commandEstimate, null);
+    }
+
+    private static PartitionRangeReadCommand command(ColumnFamilyStore cfs, int limit, int commandEstimate, Integer indexEstimate)
+    {
+        AbstractReadCommandBuilder.PartitionRangeBuilder commandBuilder = Util.cmd(cfs);
+        if (indexEstimate != null)
+        {
+            commandBuilder.filterOn("v", Operator.EQ, 0);
+            MockedIndex.estimatedResultRows = indexEstimate;
+        }
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) commandBuilder.build();
+        return command.withUpdatedLimit(new MockedDataLimits(DataLimits.cqlLimits(limit), commandEstimate));
+    }
+
+    private static void setNumTokens(int numTokens)
+    {
+        DatabaseDescriptor.getRawConfig().num_tokens = numTokens;
+    }
+
+    private static class MockedDataLimits extends DataLimits
+    {
+        private final DataLimits wrapped;
+        private final int estimateTotalResults;
+
+        public MockedDataLimits(DataLimits wrapped, int estimateTotalResults)
+        {
+            this.wrapped = wrapped;
+            this.estimateTotalResults = estimateTotalResults;
+        }
+
+        @Override
+        public float estimateTotalResults(ColumnFamilyStore cfs)
+        {
+            return estimateTotalResults;
+        }
+
+        @Override
+        public Kind kind()
+        {
+            return wrapped.kind();
+        }
+
+        @Override
+        public boolean isUnlimited()
+        {
+            return wrapped.isUnlimited();
+        }
+
+        @Override
+        public boolean isDistinct()
+        {
+            return wrapped.isDistinct();
+        }
+
+        @Override
+        public DataLimits forPaging(int pageSize)
+        {
+            return wrapped.forPaging(pageSize);
+        }
+
+        @Override
+        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
+        {
+            return wrapped.forPaging(pageSize, lastReturnedKey, lastReturnedKeyRemaining);
+        }
+
+        @Override
+        public DataLimits forShortReadRetry(int toFetch)
+        {
+            return wrapped.forShortReadRetry(toFetch);
+        }
+
+        @Override
+        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
+        {
+            return wrapped.hasEnoughLiveData(cached, nowInSec, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
+        }
+
+        @Override
+        public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
+        {
+            return wrapped.newCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
+        }
+
+        @Override
+        public int count()
+        {
+            return wrapped.count();
+        }
+
+        @Override
+        public int perPartitionCount()
+        {
+            return wrapped.perPartitionCount();
+        }
+
+        @Override
+        public DataLimits withoutState()
+        {
+            return wrapped.withoutState();
+        }
+    }
+
+    public static final class MockedIndex extends StubIndex
+    {
+        private static long estimatedResultRows = 0;
+
+        public MockedIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        @Override
+        public long getEstimatedResultRows()
+        {
+            return estimatedResultRows;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorTest.java b/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorTest.java
new file mode 100644
index 0000000..745ad4e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaConstants;
+
+import static org.apache.cassandra.Util.rp;
+import static org.apache.cassandra.Util.token;
+import static org.junit.Assert.assertEquals;
+
+public class ReplicaPlanIteratorTest
+{
+    private static final String KEYSPACE = "ReplicaPlanIteratorTest";
+    private static Keyspace keyspace;
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
+        keyspace = Keyspace.open(KEYSPACE);
+
+        new TokenUpdater().withKeys(InetAddressAndPort.getByName("127.0.0.1"), "1")
+                          .withKeys(InetAddressAndPort.getByName("127.0.0.6"), "6")
+                          .update();
+    }
+
+    @Test
+    public void testRanges()
+    {
+        // no splits
+        testRanges(range(rp("2"), rp("5")), range(rp("2"), rp("5")));
+        testRanges(bounds(rp("2"), rp("5")), bounds(rp("2"), rp("5")));
+        testRanges(exBounds(rp("2"), rp("5")), exBounds(rp("2"), rp("5")));
+        // single split testGRR(range("2", "7"), range(rp("2"), endOf("6")), range(endOf("6"), rp("7")));
+        testRanges(bounds(rp("2"), rp("7")), bounds(rp("2"), endOf("6")), range(endOf("6"), rp("7")));
+        testRanges(exBounds(rp("2"), rp("7")), range(rp("2"), endOf("6")), exBounds(endOf("6"), rp("7")));
+        testRanges(incExBounds(rp("2"), rp("7")), bounds(rp("2"), endOf("6")), exBounds(endOf("6"), rp("7")));
+        // single split starting from min
+        testRanges(range(rp(""), rp("2")), range(rp(""), endOf("1")), range(endOf("1"), rp("2")));
+        testRanges(bounds(rp(""), rp("2")), bounds(rp(""), endOf("1")), range(endOf("1"), rp("2")));
+        testRanges(exBounds(rp(""), rp("2")), range(rp(""), endOf("1")), exBounds(endOf("1"), rp("2")));
+        testRanges(incExBounds(rp(""), rp("2")), bounds(rp(""), endOf("1")), exBounds(endOf("1"), rp("2")));
+        // single split ending with max
+        testRanges(range(rp("5"), rp("")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")));
+        testRanges(bounds(rp("5"), rp("")), bounds(rp("5"), endOf("6")), range(endOf("6"), rp("")));
+        testRanges(exBounds(rp("5"), rp("")), range(rp("5"), endOf("6")), exBounds(endOf("6"), rp("")));
+        testRanges(incExBounds(rp("5"), rp("")), bounds(rp("5"), endOf("6")), exBounds(endOf("6"), rp("")));
+        // two splits
+        testRanges(range(rp("0"), rp("7")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("7")));
+        testRanges(bounds(rp("0"), rp("7")), bounds(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("7")));
+        testRanges(exBounds(rp("0"), rp("7")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("7")));
+        testRanges(incExBounds(rp("0"), rp("7")), bounds(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("7")));
+    }
+
+    @Test
+    public void testExact()
+    {
+        // min
+        testRanges(range(endOf("1"), endOf("5")), range(endOf("1"), endOf("5")));
+        testRanges(range(rp("1"), endOf("5")), range(rp("1"), endOf("1")), range(endOf("1"), endOf("5")));
+        testRanges(bounds(startOf("1"), endOf("5")), bounds(startOf("1"), endOf("1")), range(endOf("1"), endOf("5")));
+        testRanges(exBounds(endOf("1"), rp("5")), exBounds(endOf("1"), rp("5")));
+        testRanges(exBounds(rp("1"), rp("5")), range(rp("1"), endOf("1")), exBounds(endOf("1"), rp("5")));
+        testRanges(exBounds(startOf("1"), endOf("5")), range(startOf("1"), endOf("1")), exBounds(endOf("1"), endOf("5")));
+        testRanges(incExBounds(rp("1"), rp("5")), bounds(rp("1"), endOf("1")), exBounds(endOf("1"), rp("5")));
+        // max
+        testRanges(range(endOf("2"), endOf("6")), range(endOf("2"), endOf("6")));
+        testRanges(bounds(startOf("2"), endOf("6")), bounds(startOf("2"), endOf("6")));
+        testRanges(exBounds(rp("2"), rp("6")), exBounds(rp("2"), rp("6")));
+        testRanges(incExBounds(rp("2"), rp("6")), incExBounds(rp("2"), rp("6")));
+        // bothKeys
+        testRanges(range(rp("1"), rp("6")), range(rp("1"), endOf("1")), range(endOf("1"), rp("6")));
+        testRanges(bounds(rp("1"), rp("6")), bounds(rp("1"), endOf("1")), range(endOf("1"), rp("6")));
+        testRanges(exBounds(rp("1"), rp("6")), range(rp("1"), endOf("1")), exBounds(endOf("1"), rp("6")));
+        testRanges(incExBounds(rp("1"), rp("6")), bounds(rp("1"), endOf("1")), exBounds(endOf("1"), rp("6")));
+    }
+
+    @Test
+    public void testWrapped()
+    {
+        // one token in wrapped range
+        testRanges(range(rp("7"), rp("0")), range(rp("7"), rp("")), range(rp(""), rp("0")));
+        // two tokens in wrapped range
+        testRanges(range(rp("5"), rp("0")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("0")));
+        testRanges(range(rp("7"), rp("2")), range(rp("7"), rp("")), range(rp(""), endOf("1")), range(endOf("1"), rp("2")));
+        // full wraps
+        testRanges(range(rp("0"), rp("0")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("0")));
+        testRanges(range(rp(""), rp("")), range(rp(""), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")));
+        // wrap on member tokens
+        testRanges(range(rp("6"), rp("6")), range(rp("6"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), endOf("1")), range(endOf("1"), rp("6")));
+        testRanges(range(rp("6"), rp("1")), range(rp("6"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("1")));
+        // end wrapped
+        testRanges(range(rp("5"), rp("")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")));
+    }
+
+    @Test
+    public void testExactBounds()
+    {
+        // equal tokens are special cased as non-wrapping for bounds
+        testRanges(bounds(rp("0"), rp("0")), bounds(rp("0"), rp("0")));
+        // completely empty bounds match everything
+        testRanges(bounds(rp(""), rp("")), bounds(rp(""), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")));
+        testRanges(exBounds(rp(""), rp("")), range(rp(""), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("")));
+        testRanges(incExBounds(rp(""), rp("")), bounds(rp(""), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("")));
+    }
+
+    @Test
+    public void testLocalReplicationStrategy()
+    {
+        Keyspace systemKeyspace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+
+        // ranges that would produce multiple splits with not-local strategy, but not with local strategy
+        testRanges(systemKeyspace, range(rp("0"), rp("7")), range(rp("0"), rp("7")));
+        testRanges(systemKeyspace, bounds(rp("0"), rp("7")), bounds(rp("0"), rp("7")));
+        testRanges(systemKeyspace, exBounds(rp("0"), rp("7")), exBounds(rp("0"), rp("7")));
+        testRanges(systemKeyspace, incExBounds(rp("0"), rp("7")), incExBounds(rp("0"), rp("7")));
+
+        // wrapping ranges that should be unwrapped but not further splitted
+        testRanges(systemKeyspace, range(rp("7"), rp("0")), range(rp("7"), rp("")), range(rp(""), rp("0")));
+        testRanges(systemKeyspace, range(rp("7"), rp("2")), range(rp("7"), rp("")), range(rp(""), rp("2")));
+    }
+
+    @SafeVarargs
+    private final void testRanges(AbstractBounds<PartitionPosition> queryRange, AbstractBounds<PartitionPosition>... expected)
+    {
+        testRanges(keyspace, queryRange, expected);
+    }
+
+    @SafeVarargs
+    private final void testRanges(Keyspace keyspace, AbstractBounds<PartitionPosition> queryRange, AbstractBounds<PartitionPosition>... expected)
+    {
+        try (ReplicaPlanIterator iterator = new ReplicaPlanIterator(queryRange, keyspace, ConsistencyLevel.ANY))
+        {
+            List<AbstractBounds<PartitionPosition>> restrictedRanges = new ArrayList<>(expected.length);
+            while (iterator.hasNext())
+                restrictedRanges.add(iterator.next().range());
+
+            // verify range counts
+            assertEquals(expected.length, restrictedRanges.size());
+            assertEquals(expected.length, iterator.size());
+
+            // verify the ranges
+            for (int i = 0; i < expected.length; i++)
+                assertEquals("Mismatch for index " + i + ": " + restrictedRanges, expected[i], restrictedRanges.get(i));
+        }
+    }
+
+    private static Range<PartitionPosition> range(PartitionPosition left, PartitionPosition right)
+    {
+        return new Range<>(left, right);
+    }
+
+    private static Bounds<PartitionPosition> bounds(PartitionPosition left, PartitionPosition right)
+    {
+        return new Bounds<>(left, right);
+    }
+
+    private static ExcludingBounds<PartitionPosition> exBounds(PartitionPosition left, PartitionPosition right)
+    {
+        return new ExcludingBounds<>(left, right);
+    }
+
+    private static IncludingExcludingBounds<PartitionPosition> incExBounds(PartitionPosition left, PartitionPosition right)
+    {
+        return new IncludingExcludingBounds<>(left, right);
+    }
+
+    private static PartitionPosition startOf(String key)
+    {
+        return token(key).minKeyBound();
+    }
+
+    private static PartitionPosition endOf(String key)
+    {
+        return token(key).maxKeyBound();
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanMergerTest.java b/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanMergerTest.java
new file mode 100644
index 0000000..721551d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanMergerTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.Util.testPartitioner;
+import static org.apache.cassandra.db.ConsistencyLevel.ALL;
+import static org.apache.cassandra.db.ConsistencyLevel.ANY;
+import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.LOCAL_ONE;
+import static org.apache.cassandra.db.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.LOCAL_SERIAL;
+import static org.apache.cassandra.db.ConsistencyLevel.ONE;
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
+import static org.apache.cassandra.db.ConsistencyLevel.THREE;
+import static org.apache.cassandra.db.ConsistencyLevel.TWO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for {@link ReplicaPlanMerger}.
+ */
+public class ReplicaPlanMergerTest
+{
+    private static final String KEYSPACE = "ReplicaPlanMergerTest";
+    private static Keyspace keyspace;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(2));
+        keyspace = Keyspace.open(KEYSPACE);
+    }
+
+    /**
+     * Tests range merging with a single node cluster and a read consistency level that allows to merge ranges.
+     */
+    @Test
+    public void testSingleNode()
+    {
+        new TokenUpdater().withTokens(10, 20, 30, 40).update();
+
+        // with CLs requiring a single node all ranges are merged (unless they are wrapping)
+        for (ConsistencyLevel cl : Arrays.asList(ONE, LOCAL_ONE, ANY))
+        {
+            testRanges(cl, range(min(), min()));
+            testRanges(cl, range(min(), max(25)));
+            testRanges(cl, range(min(), max(40)));
+            testRanges(cl, range(min(), max(50)));
+            testRanges(cl, range(max(20), max(30)));
+            testRanges(cl, range(max(25), min()));
+            testRanges(cl, range(max(25), max(35)));
+            testRanges(cl, range(max(50), min()));
+            testRanges(cl, range(max(40), max(10)), range(max(40), min()), range(min(), max(10))); // wrapped is split
+            testRanges(cl, range(max(25), max(15)), range(max(25), min()), range(min(), max(15))); // wrapped is split
+        }
+
+        // with CLs requiring more than a single node ranges are not merged due to the RF=2
+        for (ConsistencyLevel cl : Arrays.asList(ALL, QUORUM, LOCAL_QUORUM, EACH_QUORUM, TWO, THREE, SERIAL, LOCAL_SERIAL))
+        {
+            testRanges(cl,
+                       range(min(), min()),
+                       range(min(), max(10)),
+                       range(max(10), max(20)),
+                       range(max(20), max(30)),
+                       range(max(30), max(40)),
+                       range(max(40), min()));
+            testRanges(cl,
+                       range(min(), max(25)),
+                       range(min(), max(10)), range(max(10), max(20)), range(max(20), max(25)));
+            testRanges(cl,
+                       range(min(), max(40)),
+                       range(min(), max(10)), range(max(10), max(20)), range(max(20), max(30)), range(max(30), max(40)));
+            testRanges(cl,
+                       range(min(), max(50)),
+                       range(min(), max(10)),
+                       range(max(10), max(20)),
+                       range(max(20), max(30)),
+                       range(max(30), max(40)),
+                       range(max(40), max(50)));
+            testRanges(cl,
+                       range(max(20), max(30)));
+            testRanges(cl,
+                       range(max(25), min()),
+                       range(max(25), max(30)), range(max(30), max(40)), range(max(40), min()));
+            testRanges(cl,
+                       range(max(50), min()));
+            testRanges(cl,
+                       range(max(30), max(10)), // wrapped
+                       range(max(30), max(40)), range(max(40), min()), range(min(), max(10)));
+            testRanges(cl,
+                       range(max(25), max(15)), // wrapped
+                       range(max(25), max(30)),
+                       range(max(30), max(40)),
+                       range(max(40), min()),
+                       range(min(), max(10)),
+                       range(max(10), max(15)));
+        }
+    }
+
+    /**
+     * Tests range merging with a multinode cluster when the token ranges don't overlap between replicas.
+     */
+    @Test
+    public void testMultiNodeWithContinuousRanges()
+    {
+        new TokenUpdater().withTokens("127.0.0.1", 10, 20, 30)
+                          .withTokens("127.0.0.2", 40, 50, 60)
+                          .withTokens("127.0.0.3", 70, 80, 90)
+                          .update();
+
+        // with CL=ANY the ranges are fully merged (unless they are wrapping)
+        testMultiNodeFullMerge(ANY);
+
+        // with CL=THREE the ranges are not merged at all
+        testMultiNodeNoMerge(THREE);
+
+        // with CLs requiring a single node the ranges are merged in a per-node basis
+        for (ConsistencyLevel cl : Arrays.asList(ONE, LOCAL_ONE))
+        {
+            testRanges(cl,
+                       range(min(), min()),
+                       range(min(), max(60)), range(max(60), min()));
+            testRanges(cl,
+                       range(min(), max(25)));
+            testRanges(cl,
+                       range(min(), max(40)));
+            testRanges(cl,
+                       range(min(), max(50)));
+            testRanges(cl,
+                       range(max(20), max(30)));
+            testRanges(cl,
+                       range(max(25), min()),
+                       range(max(25), max(60)), range(max(60), min()));
+            testRanges(cl,
+                       range(max(25), max(35)),
+                       range(max(25), max(35)));
+            testRanges(cl,
+                       range(max(50), min()),
+                       range(max(50), max(90)), range(max(90), min()));
+            testRanges(cl,
+                       range(max(50), max(10)), // wrapping range
+                       range(max(50), max(90)), range(max(90), min()), range(min(), max(10)));
+            testRanges(cl,
+                       range(max(25), max(15)), // wrapping range
+                       range(max(25), max(60)), range(max(60), min()), range(min(), max(15)));
+        }
+
+        // with other CLs the ranges are merged in a similar per-node basis
+        for (ConsistencyLevel cl : Arrays.asList(ALL, QUORUM, LOCAL_QUORUM, EACH_QUORUM, TWO, SERIAL, LOCAL_SERIAL))
+        {
+            testRanges(cl,
+                       range(min(), min()),
+                       range(min(), max(30)), range(max(30), max(60)), range(max(60), max(90)), range(max(90), min()));
+            testRanges(cl,
+                       range(min(), max(25)));
+            testRanges(cl,
+                       range(min(), max(40)),
+                       range(min(), max(30)), range(max(30), max(40)));
+            testRanges(cl,
+                       range(min(), max(50)),
+                       range(min(), max(30)), range(max(30), max(50)));
+            testRanges(cl,
+                       range(max(20), max(30)));
+            testRanges(cl,
+                       range(max(25), min()),
+                       range(max(25), max(30)), range(max(30), max(60)), range(max(60), max(90)), range(max(90), min()));
+            testRanges(cl,
+                       range(max(25), max(35)),
+                       range(max(25), max(30)), range(max(30), max(35)));
+            testRanges(cl,
+                       range(max(50), min()),
+                       range(max(50), max(60)), range(max(60), max(90)), range(max(90), min()));
+            testRanges(cl,
+                       range(max(50), max(10)), // wrapping range
+                       range(max(50), max(60)), range(max(60), max(90)), range(max(90), min()), range(min(), max(10)));
+            testRanges(cl,
+                       range(max(25), max(15)), // wrapping range
+                       range(max(25), max(30)),
+                       range(max(30), max(60)),
+                       range(max(60), max(90)),
+                       range(max(90), min()),
+                       range(min(), max(15)));
+        }
+    }
+
+    /**
+     * Tests range merging with a multinode cluster when the token ranges overlap between replicas.
+     */
+    @Test
+    public void testMultiNodeWithDiscontinuousRanges()
+    {
+        new TokenUpdater().withTokens("127.0.0.1", 10, 40, 70)
+                          .withTokens("127.0.0.2", 20, 50, 80)
+                          .withTokens("127.0.0.3", 30, 60, 90)
+                          .update();
+
+        // with CL=ANY the ranges are fully merged (unless they are wrapping)
+        testMultiNodeFullMerge(ANY);
+
+        // with CLs requiring a single node the ranges are merged in a per-node basis
+        for (ConsistencyLevel cl : Arrays.asList(ONE, LOCAL_ONE))
+        {
+            testRanges(cl,
+                       range(min(), min()), // full range
+                       range(min(), max(20)),
+                       range(max(20), max(40)),
+                       range(max(40), max(60)),
+                       range(max(60), max(80)),
+                       range(max(80), min()));
+            testRanges(cl,
+                       range(min(), max(25)),
+                       range(min(), max(20)), range(max(20), max(25)));
+            testRanges(cl,
+                       range(min(), max(40)),
+                       range(min(), max(20)), range(max(20), max(40)));
+            testRanges(cl,
+                       range(min(), max(50)),
+                       range(min(), max(20)), range(max(20), max(40)), range(max(40), max(50)));
+            testRanges(cl,
+                       range(max(20), max(30)));
+            testRanges(cl,
+                       range(max(25), min()),
+                       range(max(25), max(40)), range(max(40), max(60)), range(max(60), max(80)), range(max(80), min()));
+            testRanges(cl,
+                       range(max(25), max(35)));
+            testRanges(cl,
+                       range(max(50), min()),
+                       range(max(50), max(70)), range(max(70), max(90)), range(max(90), min()));
+            testRanges(cl,
+                       range(max(50), max(10)), // wrapping range
+                       range(max(50), max(70)), range(max(70), max(90)), range(max(90), min()), range(min(), max(10)));
+            testRanges(cl,
+                       range(max(25), max(15)), // wrapping range
+                       range(max(25), max(40)),
+                       range(max(40), max(60)),
+                       range(max(60), max(80)),
+                       range(max(80), min()),
+                       range(min(), max(15)));
+        }
+
+        // with other CLs the ranges are not merged at all
+        for (ConsistencyLevel cl : Arrays.asList(ALL, QUORUM, LOCAL_QUORUM, EACH_QUORUM, TWO, THREE, SERIAL, LOCAL_SERIAL))
+        {
+            testMultiNodeNoMerge(cl);
+        }
+    }
+
+    private void testMultiNodeFullMerge(ConsistencyLevel cl)
+    {
+        testRanges(cl, range(min(), min()));
+        testRanges(cl, range(min(), max(25)));
+        testRanges(cl, range(min(), max(40)));
+        testRanges(cl, range(min(), max(50)));
+        testRanges(cl, range(max(20), max(30)));
+        testRanges(cl, range(max(25), min()));
+        testRanges(cl, range(max(25), max(35)));
+        testRanges(cl, range(max(50), min()));
+        testRanges(cl, range(max(50), max(10)), range(max(50), min()), range(min(), max(10))); // wrapping range
+        testRanges(cl, range(max(25), max(15)), range(max(25), min()), range(min(), max(15))); // wrapping range
+    }
+
+    private void testMultiNodeNoMerge(ConsistencyLevel cl)
+    {
+        testRanges(cl,
+                   range(min(), min()),
+                   range(min(), max(10)),
+                   range(max(10), max(20)),
+                   range(max(20), max(30)),
+                   range(max(30), max(40)),
+                   range(max(40), max(50)),
+                   range(max(50), max(60)),
+                   range(max(60), max(70)),
+                   range(max(70), max(80)),
+                   range(max(80), max(90)),
+                   range(max(90), min()));
+        testRanges(cl,
+                   range(min(), max(25)),
+                   range(min(), max(10)), range(max(10), max(20)), range(max(20), max(25)));
+        testRanges(cl,
+                   range(min(), max(40)),
+                   range(min(), max(10)), range(max(10), max(20)), range(max(20), max(30)), range(max(30), max(40)));
+        testRanges(cl,
+                   range(min(), max(50)),
+                   range(min(), max(10)),
+                   range(max(10), max(20)),
+                   range(max(20), max(30)),
+                   range(max(30), max(40)),
+                   range(max(40), max(50)));
+        testRanges(cl,
+                   range(max(20), max(30)));
+        testRanges(cl,
+                   range(max(25), min()),
+                   range(max(25), max(30)),
+                   range(max(30), max(40)),
+                   range(max(40), max(50)),
+                   range(max(50), max(60)),
+                   range(max(60), max(70)),
+                   range(max(70), max(80)),
+                   range(max(80), max(90)),
+                   range(max(90), min()));
+        testRanges(cl,
+                   range(max(25), max(35)),
+                   range(max(25), max(30)), range(max(30), max(35)));
+        testRanges(cl,
+                   range(max(50), min()),
+                   range(max(50), max(60)),
+                   range(max(60), max(70)),
+                   range(max(70), max(80)),
+                   range(max(80), max(90)),
+                   range(max(90), min()));
+        testRanges(cl,
+                   range(max(50), max(10)), // wrapping range
+                   range(max(50), max(60)),
+                   range(max(60), max(70)),
+                   range(max(70), max(80)),
+                   range(max(80), max(90)),
+                   range(max(90), min()),
+                   range(min(), max(10)));
+        testRanges(cl,
+                   range(max(25), max(15)), // wrapping range
+                   range(max(25), max(30)),
+                   range(max(30), max(40)),
+                   range(max(40), max(50)),
+                   range(max(50), max(60)),
+                   range(max(60), max(70)),
+                   range(max(70), max(80)),
+                   range(max(80), max(90)),
+                   range(max(90), min()),
+                   range(min(), max(10)),
+                   range(max(10), max(15)));
+    }
+
+    private static PartitionPosition min()
+    {
+        return testPartitioner().getMinimumToken().minKeyBound();
+    }
+
+    private static PartitionPosition max(int key)
+    {
+        return new Murmur3Partitioner.LongToken(key).maxKeyBound();
+    }
+
+    private static Range<PartitionPosition> range(PartitionPosition left, PartitionPosition right)
+    {
+        return new Range<>(left, right);
+    }
+
+    private void testRanges(ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> queryRange)
+    {
+        testRanges(consistencyLevel, queryRange, queryRange);
+    }
+
+    @SafeVarargs
+    private final void testRanges(ConsistencyLevel consistencyLevel,
+                                  AbstractBounds<PartitionPosition> queryRange,
+                                  AbstractBounds<PartitionPosition>... expected)
+    {
+        try (ReplicaPlanIterator originals = new ReplicaPlanIterator(queryRange, keyspace, ANY); // ANY avoids endpoint erros
+             ReplicaPlanMerger merger = new ReplicaPlanMerger(originals, keyspace, consistencyLevel))
+        {
+            // collect the merged ranges
+            List<AbstractBounds<PartitionPosition>> mergedRanges = new ArrayList<>(expected.length);
+            while (merger.hasNext())
+                mergedRanges.add(merger.next().range());
+
+            assertFalse("The number of merged ranges should never be greater than the number of original ranges",
+                        mergedRanges.size() > originals.size());
+
+            // verify the merged ranges
+            assertEquals(expected.length, mergedRanges.size());
+            for (int i = 0; i < expected.length; i++)
+                assertEquals("Mismatch for index " + i + ": " + mergedRanges, expected[i], mergedRanges.get(i));
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/TokenUpdater.java b/test/unit/org/apache/cassandra/service/reads/range/TokenUpdater.java
new file mode 100644
index 0000000..7419fbb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/TokenUpdater.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Test utility class to set the partitioning tokens in the cluster.
+ *
+ * The per-endpoint tokens to be set can be specified with the {@code withTokens} and {@code withKeys} methods.
+ * The {@link #update()} method will apply the changes, cleaning the previous token metadata info. It will also
+ * initialize Gossip in all the endpoints but the local node.
+ */
+public class TokenUpdater
+{
+    private final Multimap<InetAddressAndPort, Token> endpointTokens = HashMultimap.create();
+
+    public TokenUpdater withTokens(long... values)
+    {
+        return withTokens(localEndpoint(), values);
+    }
+
+    public TokenUpdater withTokens(String endpoint, long... values)
+    {
+        return withTokens(endpointByName(endpoint), values);
+    }
+
+    public TokenUpdater withTokens(InetAddressAndPort endpoint, long... values)
+    {
+        for (long val : values)
+            endpointTokens.put(endpoint, new Murmur3Partitioner.LongToken(val));
+        return this;
+    }
+
+    public TokenUpdater withKeys(int... keys)
+    {
+        return withKeys(localEndpoint(), keys);
+    }
+
+    public TokenUpdater withKeys(String endpoint, int... values)
+    {
+        return withKeys(endpointByName(endpoint), values);
+    }
+
+    public TokenUpdater withKeys(InetAddressAndPort endpoint, int... keys)
+    {
+        for (int key : keys)
+            endpointTokens.put(endpoint, Util.token(key));
+        return this;
+    }
+
+    public TokenUpdater withKeys(String... keys)
+    {
+        return withKeys(localEndpoint(), keys);
+    }
+
+    public TokenUpdater withKeys(InetAddressAndPort endpoint, String... keys)
+    {
+        for (String key : keys)
+            endpointTokens.put(endpoint, Util.token(key));
+        return this;
+    }
+
+    public TokenUpdater update()
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        tmd.clearUnsafe();
+        tmd.updateNormalTokens(endpointTokens);
+        endpointTokens.keySet()
+                      .stream()
+                      .filter(e -> !e.equals(localEndpoint()))
+                      .forEach(e -> Gossiper.instance.initializeNodeUnsafe(e, UUID.randomUUID(), 1));
+        return this;
+    }
+
+    public List<Token> getTokens()
+    {
+        return getTokens(localEndpoint());
+    }
+
+    public List<Token> getTokens(InetAddressAndPort endpoint)
+    {
+        return ImmutableList.copyOf(endpointTokens.get(endpoint));
+    }
+
+    private static InetAddressAndPort localEndpoint()
+    {
+        return FBUtilities.getBroadcastAddressAndPort();
+    }
+
+    private static InetAddressAndPort endpointByName(String name)
+    {
+        try
+        {
+            return InetAddressAndPort.getByName(name);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org