You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/06/22 15:23:20 UTC

[cassandra] branch trunk updated (de33321 -> 9629c16)

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from de33321  switch from www.apache.org to downloads.apache.org
     new abdf508  Count vnode ranges towards concurrency factor instead merged ranges and cap max concurrency factor by core * 10
     new 61ecfda  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 9629c16  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/locator/ReplicaPlan.java  |  18 ++-
 .../org/apache/cassandra/locator/ReplicaPlans.java |  10 +-
 .../org/apache/cassandra/service/StorageProxy.java |  93 +++++++++++---
 .../reads/ShortReadPartitionsProtection.java       |   2 +-
 .../cassandra/db/PartitionRangeReadTest.java       | 140 +++++++++++++++++++++
 .../cassandra/service/reads/DataResolverTest.java  |   4 +-
 .../reads/repair/AbstractReadRepairTest.java       |   3 +-
 8 files changed, 237 insertions(+), 34 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9629c1650d53af6e624d9317c83e4bc1998b04bd
Merge: de33321 61ecfda
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Mon Jun 22 16:22:14 2020 +0100

    Merge branch 'cassandra-3.11' into trunk
    
    # Conflicts:
    #	src/java/org/apache/cassandra/service/StorageProxy.java
    #	test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/locator/ReplicaPlan.java  |  18 ++-
 .../org/apache/cassandra/locator/ReplicaPlans.java |  10 +-
 .../org/apache/cassandra/service/StorageProxy.java |  93 +++++++++++---
 .../reads/ShortReadPartitionsProtection.java       |   2 +-
 .../cassandra/db/PartitionRangeReadTest.java       | 140 +++++++++++++++++++++
 .../cassandra/service/reads/DataResolverTest.java  |   4 +-
 .../reads/repair/AbstractReadRepairTest.java       |   3 +-
 8 files changed, 237 insertions(+), 34 deletions(-)

diff --cc CHANGES.txt
index 569d74b,aa4be97..1212030
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,54 -1,12 +1,55 @@@
 -3.11.7
 +4.0-alpha5
 + * Update defaults for server and client TLS settings (CASSANDRA-15262)
 + * Differentiate follower/initator in StreamMessageHeader (CASSANDRA-15665)
 + * Add a startup check to detect if LZ4 uses java rather than native implementation (CASSANDRA-15884)
 + * Fix missing topology events when running multiple nodes on the same network interface (CASSANDRA-15677)
 + * Create config.yml.MIDRES (CASSANDRA-15712)
 + * Fix handling of fully purged static rows in repaired data tracking (CASSANDRA-15848)
 + * Prevent validation request submission from blocking ANTI_ENTROPY stage (CASSANDRA-15812)
 + * Add fqltool and auditlogviewer to rpm and deb packages (CASSANDRA-14712)
 + * Include DROPPED_COLUMNS in schema digest computation (CASSANDRA-15843)
 + * Fix Cassandra restart from rpm install (CASSANDRA-15830)
 + * Improve handling of 2i initialization failures (CASSANDRA-13606)
 + * Add completion_ratio column to sstable_tasks virtual table (CASANDRA-15759)
 + * Add support for adding custom Verbs (CASSANDRA-15725)
 + * Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783)
 + * Provide ability to configure IAuditLogger (CASSANDRA-15748)
 + * Fix nodetool enablefullquerylog blocking param parsing (CASSANDRA-15819)
 + * Add isTransient to SSTableMetadataView (CASSANDRA-15806)
 + * Fix tools/bin/fqltool for all shells (CASSANDRA-15820)
 + * Fix clearing of legacy size_estimates (CASSANDRA-15776)
 + * Update port when reconnecting to pre-4.0 SSL storage (CASSANDRA-15727)
 + * Only calculate dynamicBadnessThreshold once per loop in DynamicEndpointSnitch (CASSANDRA-15798)
 + * Cleanup redundant nodetool commands added in 4.0 (CASSANDRA-15256)
 + * Update to Python driver 3.23 for cqlsh (CASSANDRA-15793)
 + * Add tunable initial size and growth factor to RangeTombstoneList (CASSANDRA-15763)
 + * Improve debug logging in SSTableReader for index summary (CASSANDRA-15755)
 + * bin/sstableverify should support user provided token ranges (CASSANDRA-15753)
 + * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781)
 + * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
 + * Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726)
 + * Avoid race condition when completing stream sessions (CASSANDRA-15666)
 + * Flush with fast compressors by default (CASSANDRA-15379)
 + * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637)
 + * Allow sending Entire SSTables over SSL (CASSANDRA-15740)
 + * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739)
 + * Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730)
 + * Fix regression with traceOutgoingMessage printing message size (CASSANDRA-15687)
 + * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601)
 + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
 + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
 + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
 + * Improve logging around incremental repair (CASSANDRA-15599)
 + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688)
 + * Replace array iterators with get by index (CASSANDRA-15394)
 + * Minimize BTree iterator allocations (CASSANDRA-15389)
 +Merged from 3.11:
   * Upgrade Jackson to 2.9.10 (CASSANDRA-15867)
   * Fix CQL formatting of read command restrictions for slow query log (CASSANDRA-15503)
 - * Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
  Merged from 3.0:
+  * Fixed range read concurrency factor computation and capped as 10 times tpc cores (CASSANDRA-15752)
   * Catch exception on bootstrap resume and init native transport (CASSANDRA-15863)
   * Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
 - * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805)
   * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667)
   * EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790)
   * Fix index queries on partition key columns when some partitions contains only static data (CASSANDRA-13666)
diff --cc src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 16af58a,0000000..407db5b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@@ -1,238 -1,0 +1,250 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.locator;
 +
 +import com.google.common.collect.Iterables;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.dht.AbstractBounds;
 +
 +import java.util.function.Predicate;
 +
 +public abstract class ReplicaPlan<E extends Endpoints<E>>
 +{
 +    protected final Keyspace keyspace;
 +    protected final ConsistencyLevel consistencyLevel;
 +
 +    // all nodes we will contact via any mechanism, including hints
 +    // i.e., for:
 +    //  - reads, only live natural replicas
 +    //      ==> live.natural().subList(0, blockFor + initial speculate)
 +    //  - writes, includes all full, and any pending replicas, (and only any necessary transient ones to make up the difference)
 +    //      ==> liveAndDown.natural().filter(isFull) ++ liveAndDown.pending() ++ live.natural.filter(isTransient, req)
 +    //  - paxos, includes all live replicas (natural+pending), for this DC if SERIAL_LOCAL
 +    //      ==> live.all()  (if consistencyLevel.isDCLocal(), then .filter(consistencyLevel.isLocal))
 +    private final E contacts;
 +
 +    ReplicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, E contacts)
 +    {
 +        assert contacts != null;
 +        this.keyspace = keyspace;
 +        this.consistencyLevel = consistencyLevel;
 +        this.contacts = contacts;
 +    }
 +
 +    public abstract int blockFor();
 +
 +    public E contacts() { return contacts; }
 +
 +    // TODO: should this semantically return true if we contain the endpoint, not the exact replica?
 +    public boolean contacts(Replica replica) { return contacts.contains(replica); }
 +    public Keyspace keyspace() { return keyspace; }
 +    public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
 +
 +    public static abstract class ForRead<E extends Endpoints<E>> extends ReplicaPlan<E>
 +    {
 +        // all nodes we *could* contacts; typically all natural replicas that are believed to be alive
 +        // we will consult this collection to find uncontacted nodes we might contact if we doubt we will meet consistency level
 +        private final E candidates;
 +
 +        ForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, E candidates, E contact)
 +        {
 +            super(keyspace, consistencyLevel, contact);
 +            this.candidates = candidates;
 +        }
 +
 +        public int blockFor() { return consistencyLevel.blockFor(keyspace); }
 +
 +        public E candidates() { return candidates; }
 +
 +        public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate)
 +        {
 +            return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull();
 +        }
 +
 +        public Replica lookup(InetAddressAndPort endpoint)
 +        {
 +            return candidates().byEndpoint().get(endpoint);
 +        }
 +
 +        public String toString()
 +        {
 +            return "ReplicaPlan.ForRead [ CL: " + consistencyLevel + " keyspace: " + keyspace + " candidates: " + candidates + " contacts: " + contacts() + " ]";
 +        }
 +    }
 +
 +    public static class ForTokenRead extends ForRead<EndpointsForToken>
 +    {
 +        public ForTokenRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken candidates, EndpointsForToken contact)
 +        {
 +            super(keyspace, consistencyLevel, candidates, contact);
 +        }
 +
 +        ForTokenRead withContact(EndpointsForToken newContact)
 +        {
 +            return new ForTokenRead(keyspace, consistencyLevel, candidates(), newContact);
 +        }
 +    }
 +
 +    public static class ForRangeRead extends ForRead<EndpointsForRange>
 +    {
 +        final AbstractBounds<PartitionPosition> range;
- 
-         public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact)
++        final int vnodeCount;
++
++        public ForRangeRead(Keyspace keyspace,
++                            ConsistencyLevel consistencyLevel,
++                            AbstractBounds<PartitionPosition> range,
++                            EndpointsForRange candidates,
++                            EndpointsForRange contact,
++                            int vnodeCount)
 +        {
 +            super(keyspace, consistencyLevel, candidates, contact);
 +            this.range = range;
++            this.vnodeCount = vnodeCount;
 +        }
 +
 +        public AbstractBounds<PartitionPosition> range() { return range; }
 +
++        /**
++         * @return number of vnode ranges covered by the range
++         */
++        public int vnodeCount() { return vnodeCount; }
++
 +        ForRangeRead withContact(EndpointsForRange newContact)
 +        {
-             return new ForRangeRead(keyspace, consistencyLevel, range, candidates(), newContact);
++            return new ForRangeRead(keyspace, consistencyLevel, range, candidates(), newContact, vnodeCount);
 +        }
 +    }
 +
 +    public static abstract class ForWrite<E extends Endpoints<E>> extends ReplicaPlan<E>
 +    {
 +        // TODO: this is only needed because of poor isolation of concerns elsewhere - we can remove it soon, and will do so in a follow-up patch
 +        final E pending;
 +        final E liveAndDown;
 +        final E live;
 +
 +        ForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, E pending, E liveAndDown, E live, E contact)
 +        {
 +            super(keyspace, consistencyLevel, contact);
 +            this.pending = pending;
 +            this.liveAndDown = liveAndDown;
 +            this.live = live;
 +        }
 +
 +        public int blockFor() { return consistencyLevel.blockForWrite(keyspace, pending()); }
 +
 +        /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */
 +        public E pending() { return pending; }
 +        /** Replicas that can participate in the write - this always includes all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM (which is local DC only) */
 +        public E liveAndDown() { return liveAndDown; }
 +        /** The live replicas present in liveAndDown, usually derived from FailureDetector.isReplicaAlive */
 +        public E live() { return live; }
 +        /** Calculate which live endpoints we could have contacted, but chose not to */
 +        public E liveUncontacted() { return live().filter(r -> !contacts(r)); }
 +        /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */
 +        public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); }
 +        public Replica lookup(InetAddressAndPort endpoint)
 +        {
 +            return liveAndDown().byEndpoint().get(endpoint);
 +        }
 +
 +        public String toString()
 +        {
 +            return "ReplicaPlan.ForWrite [ CL: " + consistencyLevel + " keyspace: " + keyspace + " liveAndDown: " + liveAndDown + " live: " + live + " contacts: " + contacts() +  " ]";
 +        }
 +    }
 +
 +    public static class ForTokenWrite extends ForWrite<EndpointsForToken>
 +    {
 +        public ForTokenWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact)
 +        {
 +            super(keyspace, consistencyLevel, pending, liveAndDown, live, contact);
 +        }
 +
 +        private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel newConsistencyLevel, EndpointsForToken newContact)
 +        {
 +            return new ReplicaPlan.ForTokenWrite(keyspace, newConsistencyLevel, pending(), liveAndDown(), live(), newContact);
 +        }
 +
 +        ForTokenWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); }
 +        public ForTokenWrite withContact(EndpointsForToken newContact) { return copy(consistencyLevel, newContact); }
 +    }
 +
 +    public static class ForPaxosWrite extends ForWrite<EndpointsForToken>
 +    {
 +        final int requiredParticipants;
 +
 +        ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact, int requiredParticipants)
 +        {
 +            super(keyspace, consistencyLevel, pending, liveAndDown, live, contact);
 +            this.requiredParticipants = requiredParticipants;
 +        }
 +
 +        public int requiredParticipants() { return requiredParticipants; }
 +    }
 +
 +    /**
 +     * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to share a ReplicaPlan whose 'contacts' replicas
 +     * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
 +     *
 +     * The internal reference is not volatile, despite being shared between threads.  The initial reference provided to
 +     * the constructor should be visible by the normal process of sharing data between threads (i.e. executors, etc)
 +     * and any updates will either be seen or not seen, perhaps not promptly, but certainly not incompletely.
 +     * The contained ReplicaPlan has only final member properties, so it cannot be seen partially initialised.
 +     */
 +    public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
 +    {
 +        /**
 +         * add the provided replica to this shared plan, by updating the internal reference
 +         */
 +        public void addToContacts(Replica replica);
 +        /**
 +         * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised
 +         */
 +        public P get();
 +        /**
 +         * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised,
 +         * but replace its 'contacts' with those provided
 +         */
 +        public abstract P getWithContacts(E endpoints);
 +    }
 +
 +    public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead>
 +    {
 +        private ForTokenRead replicaPlan;
 +        SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; }
 +        public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
 +        public ForTokenRead get() { return replicaPlan; }
 +        public ForTokenRead getWithContacts(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); }
 +    }
 +
 +    public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead>
 +    {
 +        private ForRangeRead replicaPlan;
 +        SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; }
 +        public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
 +        public ForRangeRead get() { return replicaPlan; }
 +        public ForRangeRead getWithContacts(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); }
 +    }
 +
 +    public static SharedForTokenRead shared(ForTokenRead replicaPlan) { return new SharedForTokenRead(replicaPlan); }
 +    public static SharedForRangeRead shared(ForRangeRead replicaPlan) { return new SharedForRangeRead(replicaPlan); }
 +
 +}
diff --cc src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 436a9ed,0000000..083da7a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@@ -1,635 -1,0 +1,635 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.locator;
 +
 +import com.carrotsearch.hppc.ObjectIntHashMap;
 +import com.carrotsearch.hppc.cursors.ObjectIntCursor;
 +import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ArrayListMultimap;
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.ListMultimap;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Multimap;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.UnavailableException;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
 +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 +
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.function.Consumer;
 +import java.util.function.Function;
 +import java.util.function.Predicate;
 +
 +import static com.google.common.collect.Iterables.any;
 +import static com.google.common.collect.Iterables.filter;
 +import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
 +import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForRead;
 +import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForWrite;
 +import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor;
 +import static org.apache.cassandra.db.ConsistencyLevel.localQuorumForOurDc;
 +import static org.apache.cassandra.locator.Replicas.addToCountPerDc;
 +import static org.apache.cassandra.locator.Replicas.countInOurDc;
 +import static org.apache.cassandra.locator.Replicas.countPerDc;
 +
 +public class ReplicaPlans
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(ReplicaPlans.class);
 +
 +    public static boolean isSufficientLiveReplicasForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas)
 +    {
 +        switch (consistencyLevel)
 +        {
 +            case ANY:
 +                // local hint is acceptable, and local node is always live
 +                return true;
 +            case LOCAL_ONE:
 +                return countInOurDc(liveReplicas).hasAtleast(1, 1);
 +            case LOCAL_QUORUM:
 +                return countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(keyspace), 1);
 +            case EACH_QUORUM:
 +                if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
 +                {
 +                    int fullCount = 0;
 +                    Collection<String> dcs = ((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getDatacenters();
 +                    for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry : countPerDc(dcs, liveReplicas))
 +                    {
 +                        Replicas.ReplicaCount count = entry.value;
 +                        if (!count.hasAtleast(localQuorumFor(keyspace, entry.key), 0))
 +                            return false;
 +                        fullCount += count.fullReplicas();
 +                    }
 +                    return fullCount > 0;
 +                }
 +                // Fallthough on purpose for SimpleStrategy
 +            default:
 +                return liveReplicas.size() >= consistencyLevel.blockFor(keyspace)
 +                        && Replicas.countFull(liveReplicas) > 0;
 +        }
 +    }
 +
 +    static void assureSufficientLiveReplicasForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws UnavailableException
 +    {
 +        assureSufficientLiveReplicas(keyspace, consistencyLevel, liveReplicas, consistencyLevel.blockFor(keyspace), 1);
 +    }
 +    static void assureSufficientLiveReplicasForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
 +    {
 +        assureSufficientLiveReplicas(keyspace, consistencyLevel, allLive, consistencyLevel.blockForWrite(keyspace, pendingWithDown), 0);
 +    }
 +    static void assureSufficientLiveReplicas(Keyspace keyspace, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
 +    {
 +        switch (consistencyLevel)
 +        {
 +            case ANY:
 +                // local hint is acceptable, and local node is always live
 +                break;
 +            case LOCAL_ONE:
 +            {
 +                Replicas.ReplicaCount localLive = countInOurDc(allLive);
 +                if (!localLive.hasAtleast(blockFor, blockForFullReplicas))
 +                    throw UnavailableException.create(consistencyLevel, 1, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas());
 +                break;
 +            }
 +            case LOCAL_QUORUM:
 +            {
 +                Replicas.ReplicaCount localLive = countInOurDc(allLive);
 +                if (!localLive.hasAtleast(blockFor, blockForFullReplicas))
 +                {
 +                    if (logger.isTraceEnabled())
 +                    {
 +                        logger.trace(String.format("Local replicas %s are insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d full replicas in '%s'",
 +                                allLive.filter(InOurDcTester.replicas()), blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter()));
 +                    }
 +                    throw UnavailableException.create(consistencyLevel, blockFor, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas());
 +                }
 +                break;
 +            }
 +            case EACH_QUORUM:
 +                if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
 +                {
 +                    int total = 0;
 +                    int totalFull = 0;
 +                    Collection<String> dcs = ((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getDatacenters();
 +                    for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry : countPerDc(dcs, allLive))
 +                    {
 +                        int dcBlockFor = localQuorumFor(keyspace, entry.key);
 +                        Replicas.ReplicaCount dcCount = entry.value;
 +                        if (!dcCount.hasAtleast(dcBlockFor, 0))
 +                            throw UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas());
 +                        totalFull += dcCount.fullReplicas();
 +                        total += dcCount.allReplicas();
 +                    }
 +                    if (totalFull < blockForFullReplicas)
 +                        throw UnavailableException.create(consistencyLevel, blockFor, total, blockForFullReplicas, totalFull);
 +                    break;
 +                }
 +                // Fallthough on purpose for SimpleStrategy
 +            default:
 +                int live = allLive.size();
 +                int full = Replicas.countFull(allLive);
 +                if (live < blockFor || full < blockForFullReplicas)
 +                {
 +                    if (logger.isTraceEnabled())
 +                        logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor);
 +                    throw UnavailableException.create(consistencyLevel, blockFor, blockForFullReplicas, live, full);
 +                }
 +                break;
 +        }
 +    }
 +
 +    /**
 +     * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive.
 +     */
 +    public static ReplicaPlan.ForTokenWrite forSingleReplicaWrite(Keyspace keyspace, Token token, Replica replica)
 +    {
 +        EndpointsForToken one = EndpointsForToken.of(token, replica);
 +        EndpointsForToken empty = EndpointsForToken.empty(token);
 +        return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, empty, one, one, one);
 +    }
 +
 +    /**
 +     * A forwarding counter write is always sent to a single owning coordinator for the range, by the original coordinator
 +     * (if it is not itself an owner)
 +     */
 +    public static ReplicaPlan.ForTokenWrite forForwardingCounterWrite(Keyspace keyspace, Token token, Replica replica)
 +    {
 +        return forSingleReplicaWrite(keyspace, token, replica);
 +    }
 +
 +    public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite()
 +    {
 +        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
 +        Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
 +        Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
 +
 +        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
 +                EndpointsForToken.of(token, localSystemReplica),
 +                EndpointsForToken.empty(token)
 +        );
 +
 +        return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll);
 +    }
 +
 +    /**
 +     * Requires that the provided endpoints are alive.  Converts them to their relevant system replicas.
 +     * Note that the liveAndDown collection and live are equal to the provided endpoints.
 +     *
 +     * @param isAny if batch consistency level is ANY, in which case a local node will be picked
 +     */
 +    public static ReplicaPlan.ForTokenWrite forBatchlogWrite(boolean isAny) throws UnavailableException
 +    {
 +        // A single case we write not for range or token, but multiple mutations to many tokens
 +        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
 +
 +        TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
 +        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 +        Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks()
 +                                                                                          .get(snitch.getLocalDatacenter()));
 +        // Replicas are picked manually:
 +        //  - replicas should be alive according to the failure detector
 +        //  - replicas should be in the local datacenter
 +        //  - choose min(2, number of qualifying candiates above)
 +        //  - allow the local node to be the only replica only if it's a single-node DC
 +        Collection<InetAddressAndPort> chosenEndpoints = filterBatchlogEndpoints(snitch.getLocalRack(), localEndpoints);
 +
 +        if (chosenEndpoints.isEmpty() && isAny)
 +            chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
 +
 +        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
 +                SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
 +                EndpointsForToken.empty(token)
 +        );
 +
 +        // Batchlog is hosted by either one node or two nodes from different racks.
 +        ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
 +
 +        Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
 +
 +        // assume that we have already been given live endpoints, and skip applying the failure detector
 +        return forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
 +    }
 +
 +    private static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
 +                                                                          Multimap<String, InetAddressAndPort> endpoints)
 +    {
 +        return filterBatchlogEndpoints(localRack,
 +                                       endpoints,
 +                                       Collections::shuffle,
 +                                       FailureDetector.isEndpointAlive,
 +                                       ThreadLocalRandom.current()::nextInt);
 +    }
 +
 +    // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
 +    @VisibleForTesting
 +    public static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack,
 +                                                                         Multimap<String, InetAddressAndPort> endpoints,
 +                                                                         Consumer<List<?>> shuffle,
 +                                                                         Predicate<InetAddressAndPort> isAlive,
 +                                                                         Function<Integer, Integer> indexPicker)
 +    {
 +        // special case for single-node data centers
 +        if (endpoints.values().size() == 1)
 +            return endpoints.values();
 +
 +        // strip out dead endpoints and localhost
 +        ListMultimap<String, InetAddressAndPort> validated = ArrayListMultimap.create();
 +        for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries())
 +        {
 +            InetAddressAndPort addr = entry.getValue();
 +            if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr))
 +                validated.put(entry.getKey(), entry.getValue());
 +        }
 +
 +        if (validated.size() <= 2)
 +            return validated.values();
 +
 +        if (validated.size() - validated.get(localRack).size() >= 2)
 +        {
 +            // we have enough endpoints in other racks
 +            validated.removeAll(localRack);
 +        }
 +
 +        if (validated.keySet().size() == 1)
 +        {
 +            /*
 +             * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack)
 +             * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack
 +             * because of the preceding if block.
 +             */
 +            List<InetAddressAndPort> otherRack = Lists.newArrayList(validated.values());
 +            shuffle.accept(otherRack);
 +            return otherRack.subList(0, 2);
 +        }
 +
 +        // randomize which racks we pick from if more than 2 remaining
 +        Collection<String> racks;
 +        if (validated.keySet().size() == 2)
 +        {
 +            racks = validated.keySet();
 +        }
 +        else
 +        {
 +            racks = Lists.newArrayList(validated.keySet());
 +            shuffle.accept((List<?>) racks);
 +        }
 +
 +        // grab a random member of up to two racks
 +        List<InetAddressAndPort> result = new ArrayList<>(2);
 +        for (String rack : Iterables.limit(racks, 2))
 +        {
 +            List<InetAddressAndPort> rackMembers = validated.get(rack);
 +            result.add(rackMembers.get(indexPicker.apply(rackMembers.size())));
 +        }
 +
 +        return result;
 +    }
 +
 +    public static ReplicaPlan.ForTokenWrite forReadRepair(Token token, ReplicaPlan.ForRead<?> readPlan) throws UnavailableException
 +    {
 +        return forWrite(readPlan.keyspace, readPlan.consistencyLevel, token, writeReadRepair(readPlan));
 +    }
 +
 +    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
 +    {
 +        return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
 +    }
 +
 +    @VisibleForTesting
 +    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
 +    {
 +        return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector);
 +    }
 +
 +    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Selector selector) throws UnavailableException
 +    {
 +        return forWrite(keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector);
 +    }
 +
 +    @VisibleForTesting
 +    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
 +    {
 +        ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
 +        return forWrite(keyspace, consistencyLevel, liveAndDown, live, selector);
 +    }
 +
 +    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
 +    {
 +        EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
 +        assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending());
 +        return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
 +    }
 +
 +    public interface Selector
 +    {
 +        <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
 +        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
 +    }
 +
 +    /**
 +     * Select all nodes, transient or otherwise, as targets for the operation.
 +     *
 +     * This is may no longer be useful once we finish implementing transient replication support, however
 +     * it can be of value to stipulate that a location writes to all nodes without regard to transient status.
 +     */
 +    public static final Selector writeAll = new Selector()
 +    {
 +        @Override
 +        public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
 +        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
 +        {
 +            return liveAndDown.all();
 +        }
 +    };
 +
 +    /**
 +     * Select all full nodes, live or down, as write targets.  If there are insufficient nodes to complete the write,
 +     * but there are live transient nodes, select a sufficient number of these to reach our consistency level.
 +     *
 +     * Pending nodes are always contacted, whether or not they are full.  When a transient replica is undergoing
 +     * a pending move to a new node, if we write (transiently) to it, this write would not be replicated to the
 +     * pending transient node, and so when completing the move, the write could effectively have not reached the
 +     * promised consistency level.
 +     */
 +    public static final Selector writeNormal = new Selector()
 +    {
 +        @Override
 +        public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
 +        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
 +        {
 +            if (!any(liveAndDown.all(), Replica::isTransient))
 +                return liveAndDown.all();
 +
 +            ReplicaCollection.Builder<E> contacts = liveAndDown.all().newBuilder(liveAndDown.all().size());
 +            contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
 +            contacts.addAll(liveAndDown.pending());
 +
 +            /**
 +             * Per CASSANDRA-14768, we ensure we write to at least a QUORUM of nodes in every DC,
 +             * regardless of how many responses we need to wait for and our requested consistencyLevel.
 +             * This is to minimally surprise users with transient replication; with normal writes, we
 +             * soft-ensure that we reach QUORUM in all DCs we are able to, by writing to every node;
 +             * even if we don't wait for ACK, we have in both cases sent sufficient messages.
 +              */
 +            ObjectIntHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending());
 +            addToCountPerDc(requiredPerDc, live.natural().filter(Replica::isFull), -1);
 +            addToCountPerDc(requiredPerDc, live.pending(), -1);
 +
 +            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 +            for (Replica replica : filter(live.natural(), Replica::isTransient))
 +            {
 +                String dc = snitch.getDatacenter(replica);
 +                if (requiredPerDc.addTo(dc, -1) >= 0)
 +                    contacts.add(replica);
 +            }
 +            return contacts.build();
 +        }
 +    };
 +
 +    /**
 +     * TODO: Transient Replication C-14404/C-14665
 +     * TODO: We employ this even when there is no monotonicity to guarantee,
 +     *          e.g. in case of CL.TWO, CL.ONE with speculation, etc.
 +     *
 +     * Construct a read-repair write plan to provide monotonicity guarantees on any data we return as part of a read.
 +     *
 +     * Since this is not a regular write, this is just to guarantee future reads will read this data, we select only
 +     * the minimal number of nodes to meet the consistency level, and prefer nodes we contacted on read to minimise
 +     * data transfer.
 +     */
 +    public static Selector writeReadRepair(ReplicaPlan.ForRead<?> readPlan)
 +    {
 +        return new Selector()
 +        {
 +            @Override
 +            public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
 +            E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
 +            {
 +                assert !any(liveAndDown.all(), Replica::isTransient);
 +
 +                ReplicaCollection.Builder<E> contacts = live.all().newBuilder(live.all().size());
 +                // add all live nodes we might write to that we have already contacted on read
 +                contacts.addAll(filter(live.all(), r -> readPlan.contacts().endpoints().contains(r.endpoint())));
 +
 +                // finally, add sufficient nodes to achieve our consistency level
 +                if (consistencyLevel != EACH_QUORUM)
 +                {
 +                    int add = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - contacts.size();
 +                    if (add > 0)
 +                    {
 +                        for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
 +                        {
 +                            contacts.add(replica);
 +                            if (--add == 0)
 +                                break;
 +                        }
 +                    }
 +                }
 +                else
 +                {
 +                    ObjectIntHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending());
 +                    addToCountPerDc(requiredPerDc, contacts.snapshot(), -1);
 +                    IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 +                    for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
 +                    {
 +                        String dc = snitch.getDatacenter(replica);
 +                        if (requiredPerDc.addTo(dc, -1) >= 0)
 +                            contacts.add(replica);
 +                    }
 +                }
 +                return contacts.build();
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
 +     * but for the paxos linearisation agreement.
 +     *
 +     * This will select all live nodes as the candidates for the operation.  Only the required number of participants
 +     */
 +    public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
 +    {
 +        Token tk = key.getToken();
 +        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
 +
 +        Replicas.temporaryAssertFull(liveAndDown.all()); // TODO CASSANDRA-14547
 +
 +        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
 +        {
 +            // TODO: we should cleanup our semantics here, as we're filtering ALL nodes to localDC which is unexpected for ReplicaPlan
 +            // Restrict natural and pending to node in the local DC only
 +            liveAndDown = liveAndDown.filter(InOurDcTester.replicas());
 +        }
 +
 +        ReplicaLayout.ForTokenWrite live = liveAndDown.filter(FailureDetector.isReplicaAlive);
 +
 +        // TODO: this should use assureSufficientReplicas
 +        int participants = liveAndDown.all().size();
 +        int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
 +
 +        EndpointsForToken contacts = live.all();
 +        if (contacts.size() < requiredParticipants)
 +            throw UnavailableException.create(consistencyForPaxos, requiredParticipants, contacts.size());
 +
 +        // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
 +        // Note that we fake an impossible number of required nodes in the unavailable exception
 +        // to nail home the point that it's an impossible operation no matter how many nodes are live.
 +        if (liveAndDown.pending().size() > 1)
 +            throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()),
 +                    consistencyForPaxos,
 +                    participants + 1,
 +                    contacts.size());
 +
 +        return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants);
 +    }
 +
 +
 +    private static <E extends Endpoints<E>> E candidatesForRead(ConsistencyLevel consistencyLevel, E liveNaturalReplicas)
 +    {
 +        return consistencyLevel.isDatacenterLocal()
 +                ? liveNaturalReplicas.filter(InOurDcTester.replicas())
 +                : liveNaturalReplicas;
 +    }
 +
 +    private static <E extends Endpoints<E>> E contactForEachQuorumRead(Keyspace keyspace, E candidates)
 +    {
 +        assert keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy;
 +        ObjectIntHashMap<String> perDc = eachQuorumForRead(keyspace);
 +
 +        final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 +        return candidates.filter(replica -> {
 +            String dc = snitch.getDatacenter(replica);
 +            return perDc.addTo(dc, -1) >= 0;
 +        });
 +    }
 +
 +    private static <E extends Endpoints<E>> E contactForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
 +    {
 +        /*
 +         * If we are doing an each quorum query, we have to make sure that the endpoints we select
 +         * provide a quorum for each data center. If we are not using a NetworkTopologyStrategy,
 +         * we should fall through and grab a quorum in the replication strategy.
 +         *
 +         * We do not speculate for EACH_QUORUM.
 +         *
 +         * TODO: this is still very inconistently managed between {LOCAL,EACH}_QUORUM and other consistency levels - should address this in a follow-up
 +         */
 +        if (consistencyLevel == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
 +            return contactForEachQuorumRead(keyspace, candidates);
 +
 +        int count = consistencyLevel.blockFor(keyspace) + (alwaysSpeculate ? 1 : 0);
 +        return candidates.subList(0, Math.min(count, candidates.size()));
 +    }
 +
 +
 +    /**
 +     * Construct a plan for reading from a single node - this permits no speculation or read-repair
 +     */
 +    public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace keyspace, Token token, Replica replica)
 +    {
 +        EndpointsForToken one = EndpointsForToken.of(token, replica);
 +        return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE, one, one);
 +    }
 +
 +    /**
 +     * Construct a plan for reading from a single node - this permits no speculation or read-repair
 +     */
-     public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
++    public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica, int vnodeCount)
 +    {
 +        // TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class
 +        EndpointsForRange one = EndpointsForRange.of(replica);
-         return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, range, one, one);
++        return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, range, one, one, vnodeCount);
 +    }
 +
 +    /**
 +     * Construct a plan for reading the provided token at the provided consistency level.  This translates to a collection of
 +     *   - candidates who are: alive, replicate the token, and are sorted by their snitch scores
 +     *   - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) candidates
 +     *
 +     * The candidate collection can be used for speculation, although at present
 +     * it would break EACH_QUORUM to do so without further filtering
 +     */
 +    public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
 +    {
 +        EndpointsForToken candidates = candidatesForRead(consistencyLevel, ReplicaLayout.forTokenReadLiveSorted(keyspace, token).natural());
 +        EndpointsForToken contacts = contactForRead(keyspace, consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), candidates);
 +
 +        assureSufficientLiveReplicasForRead(keyspace, consistencyLevel, contacts);
 +        return new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates, contacts);
 +    }
 +
 +    /**
 +     * Construct a plan for reading the provided range at the provided consistency level.  This translates to a collection of
 +     *   - candidates who are: alive, replicate the range, and are sorted by their snitch scores
 +     *   - contacts who are: the first blockFor candidates
 +     *
 +     * There is no speculation for range read queries at present, so we never 'always speculate' here, and a failed response fails the query.
 +     */
-     public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range)
++    public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, int vnodeCount)
 +    {
 +        EndpointsForRange candidates = candidatesForRead(consistencyLevel, ReplicaLayout.forRangeReadLiveSorted(keyspace, range).natural());
 +        EndpointsForRange contacts = contactForRead(keyspace, consistencyLevel, false, candidates);
 +
 +        assureSufficientLiveReplicasForRead(keyspace, consistencyLevel, contacts);
-         return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates, contacts);
++        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates, contacts, vnodeCount);
 +    }
 +
 +    /**
 +     * Take two range read plans for adjacent ranges, and check if it is OK (and worthwhile) to combine them into a single plan
 +     */
 +    public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, ReplicaPlan.ForRangeRead right)
 +    {
 +        // TODO: should we be asserting that the ranges are adjacent?
 +        AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right);
 +        EndpointsForRange mergedCandidates = left.candidates().keep(right.candidates().endpoints());
 +
 +        // Check if there are enough shared endpoints for the merge to be possible.
 +        if (!isSufficientLiveReplicasForRead(keyspace, consistencyLevel, mergedCandidates))
 +            return null;
 +
 +        EndpointsForRange contacts = contactForRead(keyspace, consistencyLevel, false, mergedCandidates);
 +
 +        // Estimate whether merging will be a win or not
 +        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts()))
 +            return null;
 +
 +        // If we get there, merge this range and the next one
-         return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts);
++        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts, left.vnodeCount() + right.vnodeCount());
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 89a0d44,b1e0696..0aedd3d
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -42,14 -25,15 +42,15 @@@ import java.util.concurrent.TimeoutExce
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
  
 -import com.google.common.base.Predicate;
+ import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
  import com.google.common.cache.CacheLoader;
 -import com.google.common.collect.*;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.PeekingIterator;
  import com.google.common.primitives.Ints;
  import com.google.common.util.concurrent.Uninterruptibles;
 -
  import org.apache.commons.lang3.StringUtils;
 -
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -1912,10 -1952,36 +1922,11 @@@ public class StorageProxy implements St
                                   : index.getEstimatedResultRows();
  
          // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
 -        return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
 -    }
 -
 -    @VisibleForTesting
 -    public static class RangeForQuery
 -    {
 -        public final AbstractBounds<PartitionPosition> range;
 -        public final List<InetAddress> liveEndpoints;
 -        public final List<InetAddress> filteredEndpoints;
 -        public final int vnodeCount;
 -
 -        public RangeForQuery(AbstractBounds<PartitionPosition> range,
 -                             List<InetAddress> liveEndpoints,
 -                             List<InetAddress> filteredEndpoints,
 -                             int vnodeCount)
 -        {
 -            this.range = range;
 -            this.liveEndpoints = liveEndpoints;
 -            this.filteredEndpoints = filteredEndpoints;
 -            this.vnodeCount = vnodeCount;
 -        }
 -
 -        public int vnodeCount()
 -        {
 -            return vnodeCount;
 -        }
 +        return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
      }
  
-     private static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
+     @VisibleForTesting
 -    public static class RangeIterator extends AbstractIterator<RangeForQuery>
++    public static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
      {
          private final Keyspace keyspace;
          private final ConsistencyLevel consistency;
@@@ -1944,17 -2010,23 +1955,17 @@@
              if (!ranges.hasNext())
                  return endOfData();
  
-             return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next());
 -            AbstractBounds<PartitionPosition> range = ranges.next();
 -            List<InetAddress> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right);
 -            return new RangeForQuery(range,
 -                                     liveEndpoints,
 -                                     consistency.filterForQuery(keyspace, liveEndpoints),
 -                                     1);
++            return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next(), 1);
          }
      }
  
-     private static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
 -    @VisibleForTesting
 -    public static class RangeMerger extends AbstractIterator<RangeForQuery>
++    public static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
      {
          private final Keyspace keyspace;
          private final ConsistencyLevel consistency;
 -        private final PeekingIterator<RangeForQuery> ranges;
 +        private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
  
-         private RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
 -        public RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency)
++        public RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
          {
              this.keyspace = keyspace;
              this.consistency = consistency;
@@@ -2029,11 -2114,13 +2040,11 @@@
          }
      }
  
-     private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+     public static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
      {
 -        private final Iterator<RangeForQuery> ranges;
 +        private final Iterator<ReplicaPlan.ForRangeRead> ranges;
          private final int totalRangeCount;
          private final PartitionRangeReadCommand command;
 -        private final Keyspace keyspace;
 -        private final ConsistencyLevel consistency;
          private final boolean enforceStrictLiveness;
  
          private final long startTime;
@@@ -2046,14 -2134,25 +2058,21 @@@
          // when it was not good enough initially.
          private int liveReturned;
          private int rangesQueried;
+         private int batchesRequested = 0;
  
-         public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime)
 -        public RangeCommandIterator(Iterator<RangeForQuery> ranges,
++        public RangeCommandIterator(Iterator<ReplicaPlan.ForRangeRead> ranges,
+                                     PartitionRangeReadCommand command,
+                                     int concurrencyFactor,
+                                     int maxConcurrencyFactor,
+                                     int totalRangeCount,
 -                                    Keyspace keyspace,
 -                                    ConsistencyLevel consistency,
+                                     long queryStartNanoTime)
          {
              this.command = command;
              this.concurrencyFactor = concurrencyFactor;
+             this.maxConcurrencyFactor = maxConcurrencyFactor;
              this.startTime = System.nanoTime();
-             this.ranges = new RangeMerger(ranges, keyspace, consistency);
-             this.totalRangeCount = ranges.rangeCount();
+             this.ranges = ranges;
+             this.totalRangeCount = totalRangeCount;
 -            this.consistency = consistency;
 -            this.keyspace = keyspace;
              this.queryStartNanoTime = queryStartNanoTime;
              this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
          }
@@@ -2169,25 -2268,14 +2198,30 @@@
          private PartitionIterator sendNextRequests()
          {
              List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
 -            for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
 +            List<ReadRepair> readRepairs = new ArrayList<>(concurrencyFactor);
 +
 +            try
              {
-                 for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
 -                RangeForQuery range = ranges.next();
 -                concurrentQueries.add(query(range, i == 0));
 -                rangesQueried += range.vnodeCount();
 -                i += range.vnodeCount();
++                for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
 +                {
++                    ReplicaPlan.ForRangeRead range = ranges.next();
++
 +                    @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below
-                     SingleRangeResponse response = query(ranges.next(), i == 0);
++                    SingleRangeResponse response = query(range, i == 0);
 +                    concurrentQueries.add(response);
 +                    readRepairs.add(response.readRepair);
-                     ++rangesQueried;
++                    // due to RangeMerger, coordinator may fetch more ranges than required by concurrency factor.
++                    rangesQueried += range.vnodeCount();
++                    i += range.vnodeCount();
 +                }
++                batchesRequested++;
 +            }
 +            catch (Throwable t)
 +            {
 +                for (PartitionIterator response: concurrentQueries)
 +                    response.close();
 +                throw t;
              }
 -            batchesRequested++;
  
              Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
              // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
@@@ -2225,16 -2325,25 +2271,23 @@@
          // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
          // fetch enough rows in the first round
          resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+         int maxConcurrencyFactor = Math.min(ranges.rangeCount(), MAX_CONCURRENT_RANGE_REQUESTS);
          int concurrencyFactor = resultsPerRange == 0.0
--                              ? 1
-                               : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
 -                              : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
++                                ? 1
++                                : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
          logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
                       resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
          Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
  
          // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
- 
-         return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)),
+         RangeMerger mergedRanges = new RangeMerger(ranges, keyspace, consistencyLevel);
+         RangeCommandIterator rangeCommandIterator = new RangeCommandIterator(mergedRanges,
+                                                                              command,
+                                                                              concurrencyFactor,
+                                                                              maxConcurrencyFactor,
+                                                                              ranges.rangeCount(),
 -                                                                             keyspace,
 -                                                                             consistencyLevel,
+                                                                              queryStartNanoTime);
+         return command.limits().filter(command.postReconciliationProcessing(rangeCommandIterator),
                                         command.nowInSec(),
                                         command.selectsFullPartition(),
                                         command.metadata().enforceStrictLiveness());
diff --cc src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index eda2b33,0000000..59edc5a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@@ -1,199 -1,0 +1,199 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.locator.ReplicaPlans;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.PartitionRangeReadCommand;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.transform.MorePartitions;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.ExcludingBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
 +    private final ReadCommand command;
 +    private final Replica source;
 +
 +    private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
 +    private final DataLimits.Counter mergedResultCounter; // merged end-result counter
 +
 +    private DecoratedKey lastPartitionKey; // key of the last observed partition
 +
 +    private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call
 +
 +    private final long queryStartNanoTime;
 +
 +    public ShortReadPartitionsProtection(ReadCommand command, Replica source,
 +                                         DataLimits.Counter singleResultCounter,
 +                                         DataLimits.Counter mergedResultCounter,
 +                                         long queryStartNanoTime)
 +    {
 +        this.command = command;
 +        this.source = source;
 +        this.singleResultCounter = singleResultCounter;
 +        this.mergedResultCounter = mergedResultCounter;
 +        this.queryStartNanoTime = queryStartNanoTime;
 +    }
 +
 +    @Override
 +    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
 +    {
 +        partitionsFetched = true;
 +
 +        lastPartitionKey = partition.partitionKey();
 +
 +        /*
 +         * Extend for moreContents() then apply protection to track lastClustering by applyToRow().
 +         *
 +         * If we don't apply the transformation *after* extending the partition with MoreRows,
 +         * applyToRow() method of protection will not be called on the first row of the new extension iterator.
 +         */
 +        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
 +        ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
 +        ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
 +                                                                         command, source,
 +                                                                         (cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
 +                                                                         singleResultCounter,
 +                                                                         mergedResultCounter);
 +        return Transformation.apply(MoreRows.extend(partition, protection), protection);
 +    }
 +
 +    /*
 +     * We only get here once all the rows and partitions in this iterator have been iterated over, and so
 +     * if the node had returned the requested number of rows but we still get here, then some results were
 +     * skipped during reconciliation.
 +     */
 +    public UnfilteredPartitionIterator moreContents()
 +    {
 +        // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit
 +        assert !mergedResultCounter.isDone();
 +
 +        // we do not apply short read protection when we have no limits at all
 +        assert !command.limits().isUnlimited();
 +
 +        /*
 +         * If this is a single partition read command or an (indexed) partition range read command with
 +         * a partition key specified, then we can't and shouldn't try fetch more partitions.
 +         */
 +        assert !command.isLimitedToOnePartition();
 +
 +        /*
 +         * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more.
 +         *
 +         * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
 +         * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
 +         */
 +        if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
 +            return null;
 +
 +        /*
 +         * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator.
 +         * There is no point to ask the replica for more rows - it has no more in the requested range.
 +         */
 +        if (!partitionsFetched)
 +            return null;
 +        partitionsFetched = false;
 +
 +        /*
 +         * We are going to fetch one partition at a time for thrift and potentially more for CQL.
 +         * The row limit will either be set to the per partition limit - if the command has no total row limit set, or
 +         * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions,
 +         * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones.
 +         */
 +        int toQuery = command.limits().count() != DataLimits.NO_LIMIT
 +                      ? command.limits().count() - counted(mergedResultCounter)
 +                      : command.limits().perPartitionCount();
 +
 +        ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
 +        Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
 +        logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source);
 +
 +        return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
 +    }
 +
 +    // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
 +    private int counted(DataLimits.Counter counter)
 +    {
 +        return command.limits().isGroupByLimit()
 +               ? counter.rowCounted()
 +               : counter.counted();
 +    }
 +
 +    private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery)
 +    {
 +        PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
 +
 +        DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
 +
 +        AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
 +        AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
 +                                                      ? new Range<>(lastPartitionKey, bounds.right)
 +                                                      : new ExcludingBounds<>(lastPartitionKey, bounds.right);
 +        DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
 +
-         ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
++        ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source, 1);
 +        return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan));
 +    }
 +
 +    private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
 +    UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan)
 +    {
 +        DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
 +        ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
 +
 +        if (source.isSelf())
 +        {
 +            Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
 +        }
 +        else
 +        {
 +            if (source.isTransient())
 +                cmd = cmd.copyAsTransientQuery(source);
 +            MessagingService.instance().sendWithCallback(cmd.createMessage(false), source.endpoint(), handler);
 +        }
 +
 +        // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
 +        handler.awaitResults();
 +        assert resolver.getMessages().size() == 1;
 +        return resolver.getMessages().get(0).payload.makeIterator(command);
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
index 9ae6c75,c418afc..8a666fc
--- a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
@@@ -19,10 -19,16 +19,14 @@@
  package org.apache.cassandra.db;
  
  import java.math.BigInteger;
 -import java.net.InetAddress;
  import java.nio.ByteBuffer;
  import java.nio.charset.CharacterCodingException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
 -import java.util.Collections;
+ import java.util.Iterator;
  import java.util.List;
  
+ import com.google.common.collect.Iterators;
  import org.junit.BeforeClass;
  import org.junit.Test;
  
@@@ -30,15 -37,23 +34,25 @@@ import static org.junit.Assert.assertEq
  import static org.junit.Assert.assertTrue;
  
  import org.apache.cassandra.*;
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.marshal.AsciiType;
  import org.apache.cassandra.db.rows.Row;
  import org.apache.cassandra.db.marshal.IntegerType;
  import org.apache.cassandra.db.partitions.*;
+ import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.dht.Range;
+ import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.ReplicaPlan;
++import org.apache.cassandra.locator.ReplicaPlans;
+ import org.apache.cassandra.locator.TokenMetadata;
 +import org.apache.cassandra.schema.ColumnMetadata;
  import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.service.StorageProxy;
+ import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
  
  public class PartitionRangeReadTest
  {
@@@ -188,5 -199,337 +202,131 @@@
          assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
          assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
      }
+ 
 -        // TODO: Port or remove, depending on what DataLimits.thriftLimits (per cell) looks like
 -//    @Test
 -//    public void testRangeSliceColumnsLimit() throws Throwable
 -//    {
 -//        String keyspaceName = KEYSPACE1;
 -//        String cfName = CF_STANDARD1;
 -//        Keyspace keyspace = Keyspace.open(keyspaceName);
 -//        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
 -//        cfs.clearUnsafe();
 -//
 -//        Cell[] cols = new Cell[5];
 -//        for (int i = 0; i < 5; i++)
 -//            cols[i] = column("c" + i, "value", 1);
 -//
 -//        putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], cols[3], cols[4]);
 -//        putColsStandard(cfs, Util.dk("b"), cols[0], cols[1]);
 -//        putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], cols[3]);
 -//        cfs.forceBlockingFlush();
 -//
 -//        SlicePredicate sp = new SlicePredicate();
 -//        sp.setSlice_range(new SliceRange());
 -//        sp.getSlice_range().setCount(1);
 -//        sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
 -//        sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
 -//
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              3,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            3);
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              5,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            5);
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              8,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            8);
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              10,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            10);
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              100,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            11);
 -//
 -//        // Check that when querying by name, we always include all names for a
 -//        // gien row even if it means returning more columns than requested (this is necesseray for CQL)
 -//        sp = new SlicePredicate();
 -//        sp.setColumn_names(Arrays.asList(
 -//            ByteBufferUtil.bytes("c0"),
 -//            ByteBufferUtil.bytes("c1"),
 -//            ByteBufferUtil.bytes("c2")
 -//        ));
 -//
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              1,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            3);
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              4,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            5);
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              5,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            5);
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              6,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            8);
 -//        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
 -//                                              null,
 -//                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
 -//                                              100,
 -//                                              System.currentTimeMillis(),
 -//                                              true,
 -//                                              false),
 -//                            8);
 -//    }
 -
 -    // TODO: Port or remove, depending on what DataLimits.thriftLimits (per cell) looks like
 -//    @Test
 -//    public void testRangeSlicePaging() throws Throwable
 -//    {
 -//        String keyspaceName = KEYSPACE1;
 -//        String cfName = CF_STANDARD1;
 -//        Keyspace keyspace = Keyspace.open(keyspaceName);
 -//        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
 -//        cfs.clearUnsafe();
 -//
 -//        Cell[] cols = new Cell[4];
 -//        for (int i = 0; i < 4; i++)
 -//            cols[i] = column("c" + i, "value", 1);
 -//
 -//        DecoratedKey ka = Util.dk("a");
 -//        DecoratedKey kb = Util.dk("b");
 -//        DecoratedKey kc = Util.dk("c");
 -//
 -//        PartitionPosition min = Util.rp("");
 -//
 -//        putColsStandard(cfs, ka, cols[0], cols[1], cols[2], cols[3]);
 -//        putColsStandard(cfs, kb, cols[0], cols[1], cols[2]);
 -//        putColsStandard(cfs, kc, cols[0], cols[1], cols[2], cols[3]);
 -//        cfs.forceBlockingFlush();
 -//
 -//        SlicePredicate sp = new SlicePredicate();
 -//        sp.setSlice_range(new SliceRange());
 -//        sp.getSlice_range().setCount(1);
 -//        sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
 -//        sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
 -//
 -//        Collection<Row> rows;
 -//        Row row, row1, row2;
 -//        IDiskAtomFilter filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
 -//
 -//        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(Util.range("", ""), filter, null, 3, true, true, System.currentTimeMillis()));
 -//        assert rows.size() == 1 : "Expected 1 row, got " + toString(rows);
 -//        row = rows.iterator().next();
 -//        assertColumnNames(row, "c0", "c1", "c2");
 -//
 -//        sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
 -//        filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
 -//        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<PartitionPosition>(ka, min), filter, null, 3, true, true, System.currentTimeMillis()));
 -//        assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
 -//        Iterator<Row> iter = rows.iterator();
 -//        row1 = iter.next();
 -//        row2 = iter.next();
 -//        assertColumnNames(row1, "c2", "c3");
 -//        assertColumnNames(row2, "c0");
 -//
 -//        sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c0")));
 -//        filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
 -//        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<PartitionPosition>(row2.key, min), filter, null, 3, true, true, System.currentTimeMillis()));
 -//        assert rows.size() == 1 : "Expected 1 row, got " + toString(rows);
 -//        row = rows.iterator().next();
 -//        assertColumnNames(row, "c0", "c1", "c2");
 -//
 -//        sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
 -//        filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
 -//        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<PartitionPosition>(row.key, min), filter, null, 3, true, true, System.currentTimeMillis()));
 -//        assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
 -//        iter = rows.iterator();
 -//        row1 = iter.next();
 -//        row2 = iter.next();
 -//        assertColumnNames(row1, "c2");
 -//        assertColumnNames(row2, "c0", "c1");
 -//
 -//        // Paging within bounds
 -//        SliceQueryFilter sf = new SliceQueryFilter(cellname("c1"),
 -//                                                   cellname("c2"),
 -//                                                   false,
 -//                                                   0);
 -//        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<PartitionPosition>(ka, kc), sf, cellname("c2"), cellname("c1"), null, 2, true, System.currentTimeMillis()));
 -//        assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
 -//        iter = rows.iterator();
 -//        row1 = iter.next();
 -//        row2 = iter.next();
 -//        assertColumnNames(row1, "c2");
 -//        assertColumnNames(row2, "c1");
 -//
 -//        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<PartitionPosition>(kb, kc), sf, cellname("c1"), cellname("c1"), null, 10, true, System.currentTimeMillis()));
 -//        assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
 -//        iter = rows.iterator();
 -//        row1 = iter.next();
 -//        row2 = iter.next();
 -//        assertColumnNames(row1, "c1", "c2");
 -//        assertColumnNames(row2, "c1");
 -//    }
 -
+     @Test
+     public void testComputeConcurrencyFactor()
+     {
+         int maxConcurrentRangeRequest = 32;
+ 
+         // no live row returned, fetch all remaining ranges but hit the max instead
+         int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0);
+         assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+ 
+         // no live row returned, fetch all remaining ranges
+         cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0);
+         assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+ 
+         // returned half rows, fetch rangesQueried again but hit the max instead
+         cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240);
+         assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+ 
+         // returned half rows, fetch rangesQueried again
+         cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240);
+         assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+ 
+         // returned most of rows, 1 more range to fetch
+         cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479);
+         assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+     }
+ 
+     @Test
+     public void testRangeCountWithRangeMerge()
+     {
+         List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
+         int vnodeCount = 0;
+ 
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
 -        List<StorageProxy.RangeForQuery> ranges = new ArrayList<>();
++        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+         for (int i = 0; i + 1 < tokens.size(); i++)
+         {
+             Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
 -            ranges.add(new StorageProxy.RangeForQuery(range, LOCAL, LOCAL, 1));
++            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+             vnodeCount++;
+         }
+ 
 -        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ONE);
 -        StorageProxy.RangeForQuery mergedRange = Iterators.getOnlyElement(merge);
++        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
++        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+         // all ranges are merged as test has only one node.
+         assertEquals(vnodeCount, mergedRange.vnodeCount());
+     }
+ 
+     @Test
+     public void testRangeQueried()
+     {
+         List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
+         int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges
+ 
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+         cfs.clearUnsafe();
+ 
+         int rows = 100;
+         for (int i = 0; i < rows; ++i)
+         {
 -            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 10, String.valueOf(i));
++            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+             builder.clustering("c");
+             builder.add("val", String.valueOf(i));
+             builder.build().applyUnsafe();
+         }
+         cfs.forceBlockingFlush();
+ 
+         PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+ 
+         // without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges
 -        Iterator<StorageProxy.RangeForQuery> ranges = rangeIterator(command, keyspace, false);
 -        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, keyspace, ONE, System.nanoTime());
++        Iterator<ReplicaPlan.ForRangeRead> ranges = rangeIterator(command, keyspace, false);
++        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, System.nanoTime());
+         verifyRangeCommandIterator(data, rows, 2, vnodeCount);
+ 
+         // without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch
+         ranges = rangeIterator(command, keyspace, false);
 -        data = new StorageProxy.RangeCommandIterator(ranges, command, vnodeCount, 1000, vnodeCount, keyspace, ONE, System.nanoTime());
++        data = new StorageProxy.RangeCommandIterator(ranges, command, vnodeCount, 1000, vnodeCount, System.nanoTime());
+         verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+ 
+         // without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch
+         ranges = rangeIterator(command, keyspace, false);
 -        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, keyspace, ONE, System.nanoTime());
++        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, System.nanoTime());
+         verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount);
+ 
+         // with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost
+         ranges = rangeIterator(command, keyspace, true);
 -        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, keyspace, ONE, System.nanoTime());
++        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, System.nanoTime());
+         verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+ 
+         // with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost
+         ranges = rangeIterator(command, keyspace, true);
 -        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, keyspace, ONE, System.nanoTime());
++        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, System.nanoTime());
+         verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+     }
+ 
 -    private Iterator<StorageProxy.RangeForQuery> rangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, boolean withRangeMerger)
++    private Iterator<ReplicaPlan.ForRangeRead> rangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, boolean withRangeMerger)
+     {
 -        Iterator<StorageProxy.RangeForQuery> ranges = new StorageProxy.RangeIterator(command, keyspace, ONE);
++        Iterator<ReplicaPlan.ForRangeRead> ranges = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+         if (withRangeMerger)
 -            ranges = new StorageProxy.RangeMerger(ranges, keyspace, ONE);
++            ranges = new StorageProxy.RangeMerger(ranges, keyspace, ConsistencyLevel.ONE);
+ 
+         return  ranges;
+     }
+ 
+     private void verifyRangeCommandIterator(StorageProxy.RangeCommandIterator data, int rows, int batches, int vnodeCount)
+     {
+         int num = Util.size(data);
+         assertEquals(rows, num);
+         assertEquals(batches, data.batchesRequested());
+         assertEquals(vnodeCount, data.rangesQueried());
+     }
+ 
+     private List<Token> setTokens(List<Integer> values)
+     {
+         IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+         List<Token> tokens = new ArrayList<>(values.size());
+         for (Integer val : values)
+             tokens.add(partitioner.getToken(ByteBufferUtil.bytes(val)));
+ 
+         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+         tmd.clearUnsafe();
 -        tmd.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
++        tmd.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
+ 
+         return tokens;
+     }
  }
  
diff --cc test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 50ed09d,0000000..5d71f4d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@@ -1,1343 -1,0 +1,1341 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.net.UnknownHostException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.UUID;
 +
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
- import org.apache.cassandra.dht.ByteOrderedPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.junit.Assert;
- import org.junit.Before;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.ClusteringBound;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.DeletionInfo;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.EmptyIterators;
 +import org.apache.cassandra.db.MutableDeletionInfo;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.RangeTombstone;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.RowUpdateBuilder;
 +import org.apache.cassandra.db.Slice;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.BTreeRow;
 +import org.apache.cassandra.db.rows.BufferCell;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.CellPath;
 +import org.apache.cassandra.db.rows.ComplexColumnData;
 +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
 +import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.locator.EndpointsForRange;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.locator.ReplicaUtils;
 +import org.apache.cassandra.net.*;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
 +import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
 +import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 +import org.apache.cassandra.service.reads.repair.TestableReadRepair;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import static org.apache.cassandra.Util.assertClustering;
 +import static org.apache.cassandra.Util.assertColumn;
 +import static org.apache.cassandra.Util.assertColumns;
 +import static org.apache.cassandra.db.ClusteringBound.Kind;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
 +
 +public class DataResolverTest extends AbstractReadResponseTest
 +{
 +    private ReadCommand command;
 +    private TestableReadRepair readRepair;
 +    private Keyspace ks;
 +    private ColumnFamilyStore cfs;
 +
 +    private EndpointsForRange makeReplicas(int num)
 +    {
 +        StorageService.instance.getTokenMetadata().clearUnsafe();
 +
 +        switch (num)
 +        {
 +            case 2:
 +                ks = AbstractReadResponseTest.ks;
 +                cfs = AbstractReadResponseTest.cfs;
 +                break;
 +            case 4:
 +                ks = AbstractReadResponseTest.ks3;
 +                cfs = AbstractReadResponseTest.cfs3;
 +                break;
 +            default:
 +                throw new IllegalStateException("This test needs refactoring to cleanly support different replication factors");
 +        }
 +
 +        command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
 +        command.trackRepairedStatus();
 +        readRepair = new TestableReadRepair(command);
 +        Token token = Murmur3Partitioner.instance.getMinimumToken();
 +        EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num);
 +        for (int i = 0; i < num; i++)
 +        {
 +            try
 +            {
 +                InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) });
 +                replicas.add(ReplicaUtils.full(endpoint));
 +                StorageService.instance.getTokenMetadata().updateNormalToken(token = token.increaseSlightly(), endpoint);
 +                Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new AssertionError(e);
 +            }
 +        }
 +        return replicas.build();
 +    }
 +
 +    @Test
 +    public void testResolveNewerSingleRow()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
 +                                                                                                     .add("c1", "v1")
 +                                                                                                     .buildUpdate()), false));
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
 +                                                                                                     .add("c1", "v2")
 +                                                                                                     .buildUpdate()), false));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = Iterators.getOnlyElement(data))
 +            {
 +                Row row = Iterators.getOnlyElement(rows);
 +                assertColumns(row, "c1");
 +                assertColumn(cfm, row, "c1", "v2", 1);
 +            }
 +        }
 +
 +        assertEquals(1, readRepair.sent.size());
 +        // peer 1 just needs to repair with the row from peer 2
 +        Mutation mutation = readRepair.getForEndpoint(peer1);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoDeletions(mutation);
 +        assertRepairContainsColumn(mutation, "1", "c1", "v2", 1);
 +    }
 +
 +    @Test
 +    public void testResolveDisjointSingleRow()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
 +                                                                                                     .add("c1", "v1")
 +                                                                                                     .buildUpdate())));
 +
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
 +                                                                                                     .add("c2", "v2")
 +                                                                                                     .buildUpdate())));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = Iterators.getOnlyElement(data))
 +            {
 +                Row row = Iterators.getOnlyElement(rows);
 +                assertColumns(row, "c1", "c2");
 +                assertColumn(cfm, row, "c1", "v1", 0);
 +                assertColumn(cfm, row, "c2", "v2", 1);
 +            }
 +        }
 +
 +        assertEquals(2, readRepair.sent.size());
 +        // each peer needs to repair with each other's column
 +        Mutation mutation = readRepair.getForEndpoint(peer1);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsColumn(mutation, "1", "c2", "v2", 1);
 +
 +        mutation = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsColumn(mutation, "1", "c1", "v1", 0);
 +    }
 +
 +    @Test
 +    public void testResolveDisjointMultipleRows() throws UnknownHostException
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
 +                                                                                                     .add("c1", "v1")
 +                                                                                                     .buildUpdate())));
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
 +                                                                                                     .add("c2", "v2")
 +                                                                                                     .buildUpdate())));
 +
 +        try (PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = data.next())
 +            {
 +                // We expect the resolved superset to contain both rows
 +                Row row = rows.next();
 +                assertClustering(cfm, row, "1");
 +                assertColumns(row, "c1");
 +                assertColumn(cfm, row, "c1", "v1", 0);
 +
 +                row = rows.next();
 +                assertClustering(cfm, row, "2");
 +                assertColumns(row, "c2");
 +                assertColumn(cfm, row, "c2", "v2", 1);
 +
 +                assertFalse(rows.hasNext());
 +                assertFalse(data.hasNext());
 +            }
 +        }
 +
 +        assertEquals(2, readRepair.sent.size());
 +        // each peer needs to repair the row from the other
 +        Mutation mutation = readRepair.getForEndpoint(peer1);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoDeletions(mutation);
 +        assertRepairContainsColumn(mutation, "2", "c2", "v2", 1);
 +
 +        mutation = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoDeletions(mutation);
 +        assertRepairContainsColumn(mutation, "1", "c1", "v1", 0);
 +    }
 +
 +    @Test
 +    public void testResolveDisjointMultipleRowsWithRangeTombstones()
 +    {
 +        EndpointsForRange replicas = makeReplicas(4);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +
 +        RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
 +        RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
 +        PartitionUpdate update = new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
 +                                                                            .addRangeTombstone(tombstone2)
 +                                                                            .buildUpdate();
 +
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
 +                                                                                            .addRangeTombstone(tombstone2)
 +                                                                                            .buildUpdate());
 +        resolver.preprocess(response(command, peer1, iter1));
 +        // not covered by any range tombstone
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("0")
 +                                                                                            .add("c1", "v0")
 +                                                                                            .buildUpdate());
 +        resolver.preprocess(response(command, peer2, iter2));
 +        // covered by a range tombstone
 +        InetAddressAndPort peer3 = replicas.get(2).endpoint();
 +        UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("10")
 +                                                                                            .add("c2", "v1")
 +                                                                                            .buildUpdate());
 +        resolver.preprocess(response(command, peer3, iter3));
 +        // range covered by rt, but newer
 +        InetAddressAndPort peer4 = replicas.get(3).endpoint();
 +        UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm3, nowInSec, 2L, dk).clustering("3")
 +                                                                                            .add("one", "A")
 +                                                                                            .buildUpdate());
 +        resolver.preprocess(response(command, peer4, iter4));
 +        try (PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = data.next())
 +            {
 +                Row row = rows.next();
 +                assertClustering(cfm, row, "0");
 +                assertColumns(row, "c1");
 +                assertColumn(cfm, row, "c1", "v0", 0);
 +
 +                row = rows.next();
 +                assertClustering(cfm, row, "3");
 +                assertColumns(row, "one");
 +                assertColumn(cfm, row, "one", "A", 2);
 +
 +                assertFalse(rows.hasNext());
 +            }
 +        }
 +
 +        assertEquals(4, readRepair.sent.size());
 +        // peer1 needs the rows from peers 2 and 4
 +        Mutation mutation = readRepair.getForEndpoint(peer1);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoDeletions(mutation);
 +        assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
 +        assertRepairContainsColumn(mutation, "3", "one", "A", 2);
 +
 +        // peer2 needs to get the row from peer4 and the RTs
 +        mutation = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
 +        assertRepairContainsColumn(mutation, "3", "one", "A", 2);
 +
 +        // peer 3 needs both rows and the RTs
 +        mutation = readRepair.getForEndpoint(peer3);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
 +        assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
 +        assertRepairContainsColumn(mutation, "3", "one", "A", 2);
 +
 +        // peer4 needs the row from peer2  and the RTs
 +        mutation = readRepair.getForEndpoint(peer4);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
 +        assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
 +    }
 +
 +    @Test
 +    public void testResolveWithOneEmpty()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
 +                                                                                                     .add("c2", "v2")
 +                                                                                                     .buildUpdate())));
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(command, peer2, EmptyIterators.unfilteredPartition(cfm)));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = Iterators.getOnlyElement(data))
 +            {
 +                Row row = Iterators.getOnlyElement(rows);
 +                assertColumns(row, "c2");
 +                assertColumn(cfm, row, "c2", "v2", 1);
 +            }
 +        }
 +
 +        assertEquals(1, readRepair.sent.size());
 +        // peer 2 needs the row from peer 1
 +        Mutation mutation = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoDeletions(mutation);
 +        assertRepairContainsColumn(mutation, "1", "c2", "v2", 1);
 +    }
 +
 +    @Test
 +    public void testResolveWithBothEmpty()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        TestableReadRepair readRepair = new TestableReadRepair(command);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        resolver.preprocess(response(command, replicas.get(0).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
 +        resolver.preprocess(response(command, replicas.get(1).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            assertFalse(data.hasNext());
 +        }
 +
 +        assertTrue(readRepair.sent.isEmpty());
 +    }
 +
 +    @Test
 +    public void testResolveDeleted()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        // one response with columns timestamped before a delete in another response
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
 +                                                                                                     .add("one", "A")
 +                                                                                                     .buildUpdate())));
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(command, peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
 +
 +        try (PartitionIterator data = resolver.resolve())
 +        {
 +            assertFalse(data.hasNext());
 +        }
 +
 +        // peer1 should get the deletion from peer2
 +        assertEquals(1, readRepair.sent.size());
 +        Mutation mutation = readRepair.getForEndpoint(peer1);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsDeletions(mutation, new DeletionTime(1, nowInSec));
 +        assertRepairContainsNoColumns(mutation);
 +    }
 +
 +    @Test
 +    public void testResolveMultipleDeleted()
 +    {
 +        EndpointsForRange replicas = makeReplicas(4);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        // deletes and columns with interleaved timestamp, with out of order return sequence
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(command, peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
 +        // these columns created after the previous deletion
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
 +                                                                                                     .add("one", "A")
 +                                                                                                     .add("two", "A")
 +                                                                                                     .buildUpdate())));
 +        //this column created after the next delete
 +        InetAddressAndPort peer3 = replicas.get(2).endpoint();
 +        resolver.preprocess(response(command, peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
 +                                                                                                     .add("two", "B")
 +                                                                                                     .buildUpdate())));
 +        InetAddressAndPort peer4 = replicas.get(3).endpoint();
 +        resolver.preprocess(response(command, peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = Iterators.getOnlyElement(data))
 +            {
 +                Row row = Iterators.getOnlyElement(rows);
 +                assertColumns(row, "two");
 +                assertColumn(cfm, row, "two", "B", 3);
 +            }
 +        }
 +
 +        // peer 1 needs to get the partition delete from peer 4 and the row from peer 3
 +        assertEquals(4, readRepair.sent.size());
 +        Mutation mutation = readRepair.getForEndpoint(peer1);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
 +        assertRepairContainsColumn(mutation, "1", "two", "B", 3);
 +
 +        // peer 2 needs the deletion from peer 4 and the row from peer 3
 +        mutation = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
 +        assertRepairContainsColumn(mutation, "1", "two", "B", 3);
 +
 +        // peer 3 needs just the deletion from peer 4
 +        mutation = readRepair.getForEndpoint(peer3);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
 +        assertRepairContainsNoColumns(mutation);
 +
 +        // peer 4 needs just the row from peer 3
 +        mutation = readRepair.getForEndpoint(peer4);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoDeletions(mutation);
 +        assertRepairContainsColumn(mutation, "1", "two", "B", 3);
 +    }
 +
 +    @Test
 +    public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
 +    {
 +        resolveRangeTombstonesOnBoundary(1, 2);
 +    }
 +
 +    @Test
 +    public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
 +    {
 +        resolveRangeTombstonesOnBoundary(2, 1);
 +    }
 +
 +    @Test
 +    public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
 +    {
 +        resolveRangeTombstonesOnBoundary(1, 1);
 +    }
 +
 +    /*
 +     * We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
 +     *   1: [1, 2)(3, 4](5, 6]  2
 +     *   2:    [2, 3][4, 5)     1
 +     * which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
 +     *
 +     * Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
 +     * However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
 +     * test a few combination.
 +     */
 +    private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +
 +        // 1st "stream"
 +        RangeTombstone one_two    = tombstone("1", true , "2", false, timestamp1, nowInSec);
 +        RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
 +        RangeTombstone five_six   = tombstone("5", false, "6", true , timestamp1, nowInSec);
 +        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
 +                                                                                            .addRangeTombstone(three_four)
 +                                                                                            .addRangeTombstone(five_six)
 +                                                                                            .buildUpdate());
 +
 +        // 2nd "stream"
 +        RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
 +        RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
 +        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
 +                                                                                            .addRangeTombstone(four_five)
 +                                                                                            .buildUpdate());
 +
 +        resolver.preprocess(response(command, peer1, iter1));
 +        resolver.preprocess(response(command, peer2, iter2));
 +
 +        // No results, we've only reconciled tombstones.
 +        try (PartitionIterator data = resolver.resolve())
 +        {
 +            assertFalse(data.hasNext());
 +        }
 +
 +        assertEquals(2, readRepair.sent.size());
 +
 +        Mutation msg1 = readRepair.getForEndpoint(peer1);
 +        assertRepairMetadata(msg1);
 +        assertRepairContainsNoColumns(msg1);
 +
 +        Mutation msg2 = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(msg2);
 +        assertRepairContainsNoColumns(msg2);
 +
 +        // Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
 +        // around the value "4" however, as it's included by both stream.
 +        // So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
 +        // from whatever range it receives as repair since the stream already covers it.
 +
 +        // Message to peer1 contains peer2 ranges
 +        assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
 +
 +        // Message to peer2 contains peer1 ranges
 +        assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
 +    }
 +
 +    /**
 +     * Test cases where a boundary of a source is covered by another source deletion and timestamp on one or both side
 +     * of the boundary are equal to the "merged" deletion.
 +     * This is a test for CASSANDRA-13237 to make sure we handle this case properly.
 +     */
 +    @Test
 +    public void testRepairRangeTombstoneBoundary() throws UnknownHostException
 +    {
 +        testRepairRangeTombstoneBoundary(1, 0, 1);
 +        readRepair.sent.clear();
 +        testRepairRangeTombstoneBoundary(1, 1, 0);
 +        readRepair.sent.clear();
 +        testRepairRangeTombstoneBoundary(1, 1, 1);
 +    }
 +
 +    /**
 +     * Test for CASSANDRA-13237, checking we don't fail (and handle correctly) the case where a RT boundary has the
 +     * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
 +     * thus still be sent).
 +     */
 +    private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +
 +        // 1st "stream"
 +        RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
 +        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
 +                                                 .addRangeTombstone(one_nine)
 +                                                 .buildUpdate());
 +
 +        // 2nd "stream" (build more manually to ensure we have the boundary we want)
 +        RangeTombstoneBoundMarker open_one = marker("0", true, true, timestamp2, nowInSec);
 +        RangeTombstoneBoundaryMarker boundary_five = boundary("5", false, timestamp2, nowInSec, timestamp3, nowInSec);
 +        RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
 +        UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
 +
 +        resolver.preprocess(response(command, peer1, iter1));
 +        resolver.preprocess(response(command, peer2, iter2));
 +
 +        boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
 +
 +        // No results, we've only reconciled tombstones.
 +        try (PartitionIterator data = resolver.resolve())
 +        {
 +            assertFalse(data.hasNext());
 +        }
 +
 +        assertEquals(shouldHaveRepair? 1 : 0, readRepair.sent.size());
 +
 +        if (!shouldHaveRepair)
 +            return;
 +
 +        Mutation mutation = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoColumns(mutation);
 +
 +        RangeTombstone expected = timestamp1 != timestamp2
 +                                  // We've repaired the 1st part
 +                                  ? tombstone("0", true, "5", false, timestamp1, nowInSec)
 +                                  // We've repaired the 2nd part
 +                                  : tombstone("5", true, "9", true, timestamp1, nowInSec);
 +        assertRepairContainsDeletions(mutation, null, expected);
 +    }
 +
 +    /**
 +     * Test for CASSANDRA-13719: tests that having a partition deletion shadow a range tombstone on another source
 +     * doesn't trigger an assertion error.
 +     */
 +    @Test
 +    public void testRepairRangeTombstoneWithPartitionDeletion()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +
 +        // 1st "stream": just a partition deletion
 +        UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
 +
 +        // 2nd "stream": a range tombstone that is covered by the 1st stream
 +        RangeTombstone rt = tombstone("0", true , "10", true, 5, nowInSec);
 +        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
 +                                                 .addRangeTombstone(rt)
 +                                                 .buildUpdate());
 +
 +        resolver.preprocess(response(command, peer1, iter1));
 +        resolver.preprocess(response(command, peer2, iter2));
 +
 +        // No results, we've only reconciled tombstones.
 +        try (PartitionIterator data = resolver.resolve())
 +        {
 +            assertFalse(data.hasNext());
 +            // 2nd stream should get repaired
 +        }
 +
 +        assertEquals(1, readRepair.sent.size());
 +
 +        Mutation mutation = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoColumns(mutation);
 +
 +        assertRepairContainsDeletions(mutation, new DeletionTime(10, nowInSec));
 +    }
 +
 +    /**
 +     * Additional test for CASSANDRA-13719: tests the case where a partition deletion doesn't shadow a range tombstone.
 +     */
 +    @Test
 +    public void testRepairRangeTombstoneWithPartitionDeletion2()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +
 +        // 1st "stream": a partition deletion and a range tombstone
 +        RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
 +        PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
 +                               .addRangeTombstone(rt1)
 +                               .buildUpdate();
 +        ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
 +        UnfilteredPartitionIterator iter1 = iter(upd1);
 +
 +        // 2nd "stream": a range tombstone that is covered by the other stream rt
 +        RangeTombstone rt2 = tombstone("2", true , "3", true, 11, nowInSec);
 +        RangeTombstone rt3 = tombstone("4", true , "5", true, 10, nowInSec);
 +        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
 +                                                 .addRangeTombstone(rt2)
 +                                                 .addRangeTombstone(rt3)
 +                                                 .buildUpdate());
 +
 +        resolver.preprocess(response(command, peer1, iter1));
 +        resolver.preprocess(response(command, peer2, iter2));
 +
 +        // No results, we've only reconciled tombstones.
 +        try (PartitionIterator data = resolver.resolve())
 +        {
 +            assertFalse(data.hasNext());
 +            // 2nd stream should get repaired
 +        }
 +
 +        assertEquals(1, readRepair.sent.size());
 +
 +        Mutation mutation = readRepair.getForEndpoint(peer2);
 +        assertRepairMetadata(mutation);
 +        assertRepairContainsNoColumns(mutation);
 +
 +        // 2nd stream should get both the partition deletion, as well as the part of the 1st stream RT that it misses
 +        assertRepairContainsDeletions(mutation, new DeletionTime(10, nowInSec),
 +                                      tombstone("0", true, "2", false, 11, nowInSec),
 +                                      tombstone("3", false, "9", true, 11, nowInSec));
 +    }
 +
 +    // Forces the start to be exclusive if the condition holds
 +    private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
 +    {
 +        if (!condition)
 +            return rt;
 +
 +        Slice slice = rt.deletedSlice();
 +        ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues());
 +        return condition
 +               ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
 +               : rt;
 +    }
 +
 +    // Forces the end to be exclusive if the condition holds
 +    private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
 +    {
 +        if (!condition)
 +            return rt;
 +
 +        Slice slice = rt.deletedSlice();
 +        ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues());
 +        return condition
 +               ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
 +               : rt;
 +    }
 +
 +    private static ByteBuffer bb(int b)
 +    {
 +        return ByteBufferUtil.bytes(b);
 +    }
 +
 +    private Cell mapCell(int k, int v, long ts)
 +    {
 +        return BufferCell.live(m, ts, bb(v), CellPath.create(bb(k)));
 +    }
 +
 +    @Test
 +    public void testResolveComplexDelete()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
 +        TestableReadRepair readRepair = new TestableReadRepair(cmd);
 +        DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +
 +        long[] ts = {100, 200};
 +
 +        Row.Builder builder = BTreeRow.unsortedBuilder();
 +        builder.newRow(Clustering.EMPTY);
 +        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
 +        builder.addCell(mapCell(0, 0, ts[0]));
 +
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 +
 +        builder.newRow(Clustering.EMPTY);
 +        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
 +        builder.addComplexDeletion(m, expectedCmplxDelete);
 +        Cell expectedCell = mapCell(1, 1, ts[1]);
 +        builder.addCell(expectedCell);
 +
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = Iterators.getOnlyElement(data))
 +            {
 +                Row row = Iterators.getOnlyElement(rows);
 +                assertColumns(row, "m");
 +                Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
 +                Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
 +            }
 +        }
 +
 +        Mutation mutation = readRepair.getForEndpoint(peer1);
 +        Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
 +        assertTrue(rowIter.hasNext());
 +        Row row = rowIter.next();
 +        assertFalse(rowIter.hasNext());
 +
 +        ComplexColumnData cd = row.getComplexColumnData(m);
 +
 +        assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
 +        assertEquals(expectedCmplxDelete, cd.complexDeletion());
 +
 +        Assert.assertNull(readRepair.sent.get(peer2));
 +    }
 +
 +    @Test
 +    public void testResolveDeletedCollection()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
 +        TestableReadRepair readRepair = new TestableReadRepair(cmd);
 +        DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +
 +        long[] ts = {100, 200};
 +
 +        Row.Builder builder = BTreeRow.unsortedBuilder();
 +        builder.newRow(Clustering.EMPTY);
 +        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
 +        builder.addCell(mapCell(0, 0, ts[0]));
 +
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 +
 +        builder.newRow(Clustering.EMPTY);
 +        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
 +        builder.addComplexDeletion(m, expectedCmplxDelete);
 +
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            assertFalse(data.hasNext());
 +        }
 +
 +        Mutation mutation = readRepair.getForEndpoint(peer1);
 +        Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
 +        assertTrue(rowIter.hasNext());
 +        Row row = rowIter.next();
 +        assertFalse(rowIter.hasNext());
 +
 +        ComplexColumnData cd = row.getComplexColumnData(m);
 +
 +        assertEquals(Collections.emptySet(), Sets.newHashSet(cd));
 +        assertEquals(expectedCmplxDelete, cd.complexDeletion());
 +
 +        Assert.assertNull(readRepair.sent.get(peer2));
 +    }
 +
 +    @Test
 +    public void testResolveNewCollection()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
 +        TestableReadRepair readRepair = new TestableReadRepair(cmd);
 +        DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +
 +        long[] ts = {100, 200};
 +
 +        // map column
 +        Row.Builder builder = BTreeRow.unsortedBuilder();
 +        builder.newRow(Clustering.EMPTY);
 +        DeletionTime expectedCmplxDelete = new DeletionTime(ts[0] - 1, nowInSec);
 +        builder.addComplexDeletion(m, expectedCmplxDelete);
 +        Cell expectedCell = mapCell(0, 0, ts[0]);
 +        builder.addCell(expectedCell);
 +
 +        // empty map column
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 +
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = Iterators.getOnlyElement(data))
 +            {
 +                Row row = Iterators.getOnlyElement(rows);
 +                assertColumns(row, "m");
 +                ComplexColumnData cd = row.getComplexColumnData(m);
 +                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
 +            }
 +        }
 +
 +        Assert.assertNull(readRepair.sent.get(peer1));
 +
 +        Mutation mutation = readRepair.getForEndpoint(peer2);
 +        Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
 +        assertTrue(rowIter.hasNext());
 +        Row row = rowIter.next();
 +        assertFalse(rowIter.hasNext());
 +
 +        ComplexColumnData cd = row.getComplexColumnData(m);
 +
 +        assertEquals(Sets.newHashSet(expectedCell), Sets.newHashSet(cd));
 +        assertEquals(expectedCmplxDelete, cd.complexDeletion());
 +    }
 +
 +    @Test
 +    public void testResolveNewCollectionOverwritingDeleted()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
 +        TestableReadRepair readRepair = new TestableReadRepair(cmd);
 +        DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 +
 +        long[] ts = {100, 200};
 +
 +        // cleared map column
 +        Row.Builder builder = BTreeRow.unsortedBuilder();
 +        builder.newRow(Clustering.EMPTY);
 +        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
 +
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 +
 +        // newer, overwritten map column
 +        builder.newRow(Clustering.EMPTY);
 +        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
 +        builder.addComplexDeletion(m, expectedCmplxDelete);
 +        Cell expectedCell = mapCell(1, 1, ts[1]);
 +        builder.addCell(expectedCell);
 +
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
 +
 +        try(PartitionIterator data = resolver.resolve())
 +        {
 +            try (RowIterator rows = Iterators.getOnlyElement(data))
 +            {
 +                Row row = Iterators.getOnlyElement(rows);
 +                assertColumns(row, "m");
 +                ComplexColumnData cd = row.getComplexColumnData(m);
 +                assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
 +            }
 +        }
 +
 +        Row row = Iterators.getOnlyElement(readRepair.getForEndpoint(peer1).getPartitionUpdate(cfm2).iterator());
 +
 +        ComplexColumnData cd = row.getComplexColumnData(m);
 +
 +        assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
 +        assertEquals(expectedCmplxDelete, cd.complexDeletion());
 +
 +        Assert.assertNull(readRepair.sent.get(peer2));
 +    }
 +
 +    /** Tests for repaired data tracking */
 +
 +    @Test
 +    public void trackMatchingEmptyDigestsWithAllConclusive()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, true);
 +        verifier.expectDigest(peer2, digest1, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMatchingEmptyDigestsWithSomeConclusive()
 +    {
 +        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +        EndpointsForRange replicas = makeReplicas(2);
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, false);
 +        verifier.expectDigest(peer2, digest1, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMatchingEmptyDigestsWithNoneConclusive()
 +    {
 +        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +        EndpointsForRange replicas = makeReplicas(2);
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, false);
 +        verifier.expectDigest(peer2, digest1, false);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMatchingDigestsWithAllConclusive()
 +    {
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        EndpointsForRange replicas = makeReplicas(2);
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, true);
 +        verifier.expectDigest(peer2, digest1, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMatchingDigestsWithSomeConclusive()
 +    {
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        EndpointsForRange replicas = makeReplicas(2);
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, true);
 +        verifier.expectDigest(peer2, digest1, false);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMatchingDigestsWithNoneConclusive()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, false);
 +        verifier.expectDigest(peer2, digest1, false);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMatchingRepairedDigestsWithDifferentData()
 +    {
 +        // As far as repaired data tracking is concerned, the actual data in the response is not relevant
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, true);
 +        verifier.expectDigest(peer2, digest1, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMismatchingRepairedDigestsWithAllConclusive()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, true);
 +        verifier.expectDigest(peer2, digest2, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMismatchingRepairedDigestsWithSomeConclusive()
 +    {
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
 +        EndpointsForRange replicas = makeReplicas(2);
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, false);
 +        verifier.expectDigest(peer2, digest2, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMismatchingRepairedDigestsWithNoneConclusive()
 +    {
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
 +        EndpointsForRange replicas = makeReplicas(2);
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, false);
 +        verifier.expectDigest(peer2, digest2, false);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, false, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void trackMismatchingRepairedDigestsWithDifferentData()
 +    {
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
 +        EndpointsForRange replicas = makeReplicas(2);
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, true);
 +        verifier.expectDigest(peer2, digest2, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void noVerificationForSingletonResponse()
 +    {
 +        // for CL <= 1 a coordinator shouldn't request repaired data tracking but we
 +        // can easily assert that the verification isn't attempted even if it did
 +        EndpointsForRange replicas = makeReplicas(2);
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        verifier.expectDigest(peer1, digest1, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertFalse(verifier.verified);
 +    }
 +
 +    @Test
 +    public void responsesFromOlderVersionsAreNotTracked()
 +    {
 +        // In a mixed version cluster, responses from a replicas running older versions won't include
 +        // tracking info, so the digest and pending session status are defaulted. To make sure these
 +        // default values don't result in false positives we make sure not to consider them when
 +        // processing in DataResolver
 +        EndpointsForRange replicas = makeReplicas(2);
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        verifier.expectDigest(peer1, digest1, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
 +        // peer2 is advertising an older version, so when we deserialize its response there are two things to note:
 +        // i) the actual serialized response cannot contain any tracking info so deserialization will use defaults of
 +        //    an empty digest and pending sessions = false
 +        // ii) under normal circumstances, this would cause a mismatch with peer1, but because of the older version,
 +        //     here it will not
 +        resolver.preprocess(response(command, peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)),
 +                                     false, MessagingService.VERSION_30,
 +                                     ByteBufferUtil.EMPTY_BYTE_BUFFER, false));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    @Test
 +    public void responsesFromTransientReplicasAreNotTracked()
 +    {
 +        EndpointsForRange replicas = makeReplicas(2);
 +        EndpointsForRange.Builder mutable = replicas.newBuilder(2);
 +        mutable.add(replicas.get(0));
 +        mutable.add(Replica.transientReplica(replicas.get(1).endpoint(), replicas.range()));
 +        replicas = mutable.build();
 +
 +        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
 +        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
 +        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
 +        InetAddressAndPort peer1 = replicas.get(0).endpoint();
 +        InetAddressAndPort peer2 = replicas.get(1).endpoint();
 +        verifier.expectDigest(peer1, digest1, true);
 +
 +        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
 +
 +        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
 +        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest2, true, command));
 +
 +        resolveAndConsume(resolver);
 +        assertTrue(verifier.verified);
 +    }
 +
 +    private static class TestRepairedDataVerifier implements RepairedDataVerifier
 +    {
 +        private final RepairedDataTracker expected = new RepairedDataTracker(null);
 +        private boolean verified = false;
 +
 +        private void expectDigest(InetAddressAndPort from, ByteBuffer digest, boolean conclusive)
 +        {
 +            expected.recordDigest(from, digest, conclusive);
 +        }
 +
 +        @Override
 +        public void verify(RepairedDataTracker tracker)
 +        {
 +            verified = expected.equals(tracker);
 +        }
 +    }
 +
 +    private DataResolver resolverWithVerifier(final ReadCommand command,
 +                                              final ReplicaPlan.SharedForRangeRead plan,
 +                                              final ReadRepair readRepair,
 +                                              final long queryStartNanoTime,
 +                                              final RepairedDataVerifier verifier)
 +    {
 +        class TestableDataResolver extends DataResolver
 +        {
 +
 +            public TestableDataResolver(ReadCommand command, ReplicaPlan.SharedForRangeRead plan, ReadRepair readRepair, long queryStartNanoTime)
 +            {
 +                super(command, plan, readRepair, queryStartNanoTime);
 +            }
 +
 +            protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
 +            {
 +                return verifier;
 +            }
 +        }
 +
 +        return new TestableDataResolver(command, plan, readRepair, queryStartNanoTime);
 +    }
 +
 +    private void assertRepairContainsDeletions(Mutation mutation,
 +                                               DeletionTime deletionTime,
 +                                               RangeTombstone...rangeTombstones)
 +    {
 +        PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
 +        DeletionInfo deletionInfo = update.deletionInfo();
 +        if (deletionTime != null)
 +            assertEquals(deletionTime, deletionInfo.getPartitionDeletion());
 +
 +        assertEquals(rangeTombstones.length, deletionInfo.rangeCount());
 +        Iterator<RangeTombstone> ranges = deletionInfo.rangeIterator(false);
 +        int i = 0;
 +        while (ranges.hasNext())
 +        {
 +            RangeTombstone expected = rangeTombstones[i++];
 +            RangeTombstone actual = ranges.next();
 +            String msg = String.format("Expected %s, but got %s", expected.toString(cfm.comparator), actual.toString(cfm.comparator));
 +            assertEquals(msg, expected, actual);
 +        }
 +    }
 +
 +    private void assertRepairContainsNoDeletions(Mutation mutation)
 +    {
 +        PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
 +        assertTrue(update.deletionInfo().isLive());
 +    }
 +
 +    private void assertRepairContainsColumn(Mutation mutation,
 +                                            String clustering,
 +                                            String columnName,
 +                                            String value,
 +                                            long timestamp)
 +    {
 +        PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
 +        Row row = update.getRow(update.metadata().comparator.make(clustering));
 +        assertNotNull(row);
 +        assertColumn(cfm, row, columnName, value, timestamp);
 +    }
 +
 +    private void assertRepairContainsNoColumns(Mutation mutation)
 +    {
 +        PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
 +        assertFalse(update.iterator().hasNext());
 +    }
 +
 +    private void assertRepairMetadata(Mutation mutation)
 +    {
 +        PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
 +        assertEquals(update.metadata().keyspace, ks.getName());
 +        assertEquals(update.metadata().name, cfm.name);
 +    }
 +
 +    private ReplicaPlan.SharedForRangeRead plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel)
 +    {
-         return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas));
++        return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas, 1));
 +    }
 +
 +    private static void resolveAndConsume(DataResolver resolver)
 +    {
 +        try (PartitionIterator iterator = resolver.resolve())
 +        {
 +            while (iterator.hasNext())
 +            {
 +                try (RowIterator partition = iterator.next())
 +                {
 +                    while (partition.hasNext())
 +                        partition.next();
 +                }
 +            }
 +        }
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index ed4ef3f,0000000..14074ed
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@@ -1,351 -1,0 +1,350 @@@
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.nio.ByteBuffer;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Consumer;
 +
 +import com.google.common.base.Predicates;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Sets;
 +import com.google.common.primitives.Ints;
 +
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.locator.EndpointsForToken;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Ignore;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.ReadResponse;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.BTreeRow;
 +import org.apache.cassandra.db.rows.BufferCell;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.locator.EndpointsForRange;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.locator.ReplicaPlans;
 +import org.apache.cassandra.locator.ReplicaUtils;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.MigrationManager;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.Tables;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import static org.apache.cassandra.locator.Replica.fullReplica;
 +import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE;
 +import static org.apache.cassandra.net.Verb.INTERNAL_RSP;
 +
 +@Ignore
 +public abstract  class AbstractReadRepairTest
 +{
 +    static Keyspace ks;
 +    static ColumnFamilyStore cfs;
 +    static TableMetadata cfm;
 +    static InetAddressAndPort target1;
 +    static InetAddressAndPort target2;
 +    static InetAddressAndPort target3;
 +    static List<InetAddressAndPort> targets;
 +
 +    static Replica replica1;
 +    static Replica replica2;
 +    static Replica replica3;
 +    static EndpointsForRange replicas;
 +    static ReplicaPlan.ForRead<?> replicaPlan;
 +
 +    static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
 +    static DecoratedKey key;
 +    static Cell cell1;
 +    static Cell cell2;
 +    static Cell cell3;
 +    static Mutation resolved;
 +
 +    static ReadCommand command;
 +
 +    static void assertRowsEqual(Row expected, Row actual)
 +    {
 +        try
 +        {
 +            Assert.assertEquals(expected == null, actual == null);
 +            if (expected == null)
 +                return;
 +            Assert.assertEquals(expected.clustering(), actual.clustering());
 +            Assert.assertEquals(expected.deletion(), actual.deletion());
 +            Assert.assertArrayEquals(Iterables.toArray(expected.cells(), Cell.class), Iterables.toArray(expected.cells(), Cell.class));
 +        } catch (Throwable t)
 +        {
 +            throw new AssertionError(String.format("Row comparison failed, expected %s got %s", expected, actual), t);
 +        }
 +    }
 +
 +    static void assertRowsEqual(RowIterator expected, RowIterator actual)
 +    {
 +        assertRowsEqual(expected.staticRow(), actual.staticRow());
 +        while (expected.hasNext())
 +        {
 +            assert actual.hasNext();
 +            assertRowsEqual(expected.next(), actual.next());
 +        }
 +        assert !actual.hasNext();
 +    }
 +
 +    static void assertPartitionsEqual(PartitionIterator expected, PartitionIterator actual)
 +    {
 +        while (expected.hasNext())
 +        {
 +            assert actual.hasNext();
 +            assertRowsEqual(expected.next(), actual.next());
 +        }
 +
 +        assert !actual.hasNext();
 +    }
 +
 +    static void assertMutationEqual(Mutation expected, Mutation actual)
 +    {
 +        Assert.assertEquals(expected.getKeyspaceName(), actual.getKeyspaceName());
 +        Assert.assertEquals(expected.key(), actual.key());
 +        Assert.assertEquals(expected.key(), actual.key());
 +        PartitionUpdate expectedUpdate = Iterables.getOnlyElement(expected.getPartitionUpdates());
 +        PartitionUpdate actualUpdate = Iterables.getOnlyElement(actual.getPartitionUpdates());
 +        assertRowsEqual(Iterables.getOnlyElement(expectedUpdate), Iterables.getOnlyElement(actualUpdate));
 +    }
 +
 +    static DecoratedKey dk(int v)
 +    {
 +        return DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(v));
 +    }
 +
 +    static Cell cell(String name, String value, long timestamp)
 +    {
 +        return BufferCell.live(cfm.getColumn(ColumnIdentifier.getInterned(name, false)), timestamp, ByteBufferUtil.bytes(value));
 +    }
 +
 +    static PartitionUpdate update(Cell... cells)
 +    {
 +        Row.Builder builder = BTreeRow.unsortedBuilder();
 +        builder.newRow(Clustering.EMPTY);
 +        for (Cell cell: cells)
 +        {
 +            builder.addCell(cell);
 +        }
 +        return PartitionUpdate.singleRowUpdate(cfm, key, builder.build());
 +    }
 +
 +    static PartitionIterator partition(Cell... cells)
 +    {
 +        UnfilteredPartitionIterator iter = new SingletonUnfilteredPartitionIterator(update(cells).unfilteredIterator());
 +        return UnfilteredPartitionIterators.filter(iter, Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(now)));
 +    }
 +
 +    static Mutation mutation(Cell... cells)
 +    {
 +        return new Mutation(update(cells));
 +    }
 +
 +    @SuppressWarnings("resource")
 +    static Message<ReadResponse> msg(InetAddressAndPort from, Cell... cells)
 +    {
 +        UnfilteredPartitionIterator iter = new SingletonUnfilteredPartitionIterator(update(cells).unfilteredIterator());
 +        return Message.builder(INTERNAL_RSP, ReadResponse.createDataResponse(iter, command))
 +                      .from(from)
 +                      .build();
 +    }
 +
 +    static class ResultConsumer implements Consumer<PartitionIterator>
 +    {
 +
 +        PartitionIterator result = null;
 +
 +        @Override
 +        public void accept(PartitionIterator partitionIterator)
 +        {
 +            Assert.assertNotNull(partitionIterator);
 +            result = partitionIterator;
 +        }
 +    }
 +
 +    private static boolean configured = false;
 +
 +    static void configureClass(ReadRepairStrategy repairStrategy) throws Throwable
 +    {
 +        SchemaLoader.loadSchema();
 +        String ksName = "ks";
 +
 +        String ddl = String.format("CREATE TABLE tbl (k int primary key, v text) WITH read_repair='%s'",
 +                                   repairStrategy.toString().toLowerCase());
 +
 +        cfm = CreateTableStatement.parse(ddl, ksName).build();
 +        assert cfm.params.readRepair == repairStrategy;
 +        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, KeyspaceParams.simple(3), Tables.of(cfm));
 +        MigrationManager.announceNewKeyspace(ksm, false);
 +
 +        ks = Keyspace.open(ksName);
 +        cfs = ks.getColumnFamilyStore("tbl");
 +
 +        cfs.sampleReadLatencyNanos = 0;
 +        cfs.additionalWriteLatencyNanos = 0;
 +
 +        target1 = InetAddressAndPort.getByName("127.0.0.255");
 +        target2 = InetAddressAndPort.getByName("127.0.0.254");
 +        target3 = InetAddressAndPort.getByName("127.0.0.253");
 +
 +        targets = ImmutableList.of(target1, target2, target3);
 +
 +        replica1 = fullReplica(target1, FULL_RANGE);
 +        replica2 = fullReplica(target2, FULL_RANGE);
 +        replica3 = fullReplica(target3, FULL_RANGE);
 +        replicas = EndpointsForRange.of(replica1, replica2, replica3);
 +
 +        replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
 +
 +        StorageService.instance.getTokenMetadata().clearUnsafe();
 +        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint());
 +        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint());
 +        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint());
 +        Gossiper.instance.initializeNodeUnsafe(replica1.endpoint(), UUID.randomUUID(), 1);
 +        Gossiper.instance.initializeNodeUnsafe(replica2.endpoint(), UUID.randomUUID(), 1);
 +        Gossiper.instance.initializeNodeUnsafe(replica3.endpoint(), UUID.randomUUID(), 1);
 +
 +        // default test values
 +        key  = dk(5);
 +        cell1 = cell("v", "val1", now);
 +        cell2 = cell("v", "val2", now);
 +        cell3 = cell("v", "val3", now);
 +        resolved = mutation(cell1, cell2);
 +
 +        command = Util.cmd(cfs, 1).build();
 +
 +        configured = true;
 +    }
 +
 +    static Set<InetAddressAndPort> epSet(InetAddressAndPort... eps)
 +    {
 +        return Sets.newHashSet(eps);
 +    }
 +
 +    @Before
 +    public void setUp()
 +    {
 +        assert configured : "configureClass must be called in a @BeforeClass method";
 +
 +        cfs.sampleReadLatencyNanos = 0;
 +        cfs.additionalWriteLatencyNanos = 0;
 +    }
 +
 +    static ReplicaPlan.ForRangeRead replicaPlan(ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
 +    {
 +        return replicaPlan(ks, consistencyLevel, replicas, replicas);
 +    }
 +
 +    static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan)
 +    {
 +        return repairPlan(readPlan, readPlan.candidates());
 +    }
 +
 +    static ReplicaPlan.ForTokenWrite repairPlan(EndpointsForRange liveAndDown, EndpointsForRange targets)
 +    {
 +        return repairPlan(replicaPlan(liveAndDown, targets), liveAndDown);
 +    }
 +
 +    static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown)
 +    {
 +        Token token = readPlan.range().left.getToken();
 +        EndpointsForToken pending = EndpointsForToken.empty(token);
 +        return ReplicaPlans.forWrite(ks, ConsistencyLevel.TWO, liveAndDown.forToken(token), pending, Predicates.alwaysTrue(), ReplicaPlans.writeReadRepair(readPlan));
 +    }
 +    static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets)
 +    {
 +        return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
 +    }
 +    static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
 +    {
 +        return replicaPlan(keyspace, consistencyLevel, replicas, replicas);
 +    }
 +    static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas, EndpointsForRange targets)
 +    {
-         return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel,
-                 ReplicaUtils.FULL_BOUNDS, replicas, targets);
++        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, targets, 1);
 +    }
 +
 +    public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime);
 +
 +    public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaPlan.Shared<?, ?> replicaPlan)
 +    {
 +        return createInstrumentedReadRepair(command, replicaPlan, System.nanoTime());
 +
 +    }
 +
 +    /**
 +     * If we haven't received enough full data responses by the time the speculation
 +     * timeout occurs, we should send read requests to additional replicas
 +     */
 +    @Test
 +    public void readSpeculationCycle()
 +    {
 +        InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2))));
 +        ResultConsumer consumer = new ResultConsumer();
 +
 +        Assert.assertEquals(epSet(), repair.getReadRecipients());
 +        repair.startRepair(null, consumer);
 +
 +        Assert.assertEquals(epSet(target1, target2), repair.getReadRecipients());
 +        repair.maybeSendAdditionalReads();
 +        Assert.assertEquals(epSet(target1, target2, target3), repair.getReadRecipients());
 +        Assert.assertNull(consumer.result);
 +    }
 +
 +    /**
 +     * If we receive enough data responses by the before the speculation timeout
 +     * passes, we shouldn't send additional read requests
 +     */
 +    @Test
 +    public void noSpeculationRequired()
 +    {
 +        InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2))));
 +        ResultConsumer consumer = new ResultConsumer();
 +
 +        Assert.assertEquals(epSet(), repair.getReadRecipients());
 +        repair.startRepair(null, consumer);
 +
 +        Assert.assertEquals(epSet(target1, target2), repair.getReadRecipients());
 +        repair.getReadCallback().onResponse(msg(target1, cell1));
 +        repair.getReadCallback().onResponse(msg(target2, cell1));
 +
 +        repair.maybeSendAdditionalReads();
 +        Assert.assertEquals(epSet(target1, target2), repair.getReadRecipients());
 +
 +        repair.awaitReads();
 +
 +        assertPartitionsEqual(partition(cell1), consumer.result);
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org