You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:00 UTC

[08/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 61b9948..031326e 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -17,11 +17,12 @@
  */
 package org.apache.cassandra.service.reads;
 
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.locator.ReplicaLayout;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,15 +38,20 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 
+import static com.google.common.collect.Iterables.all;
+import static com.google.common.collect.Iterables.tryFind;
+
 /**
  * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
  *
@@ -59,32 +65,27 @@ public abstract class AbstractReadExecutor
     private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
 
     protected final ReadCommand command;
-    protected final ConsistencyLevel consistency;
-    protected final List<InetAddressAndPort> targetReplicas;
-    protected final ReadRepair readRepair;
-    protected final DigestResolver digestResolver;
-    protected final ReadCallback handler;
+    private   final ReplicaLayout.ForToken replicaLayout;
+    protected final ReadRepair<EndpointsForToken, ReplicaLayout.ForToken> readRepair;
+    protected final DigestResolver<EndpointsForToken, ReplicaLayout.ForToken> digestResolver;
+    protected final ReadCallback<EndpointsForToken, ReplicaLayout.ForToken> handler;
     protected final TraceState traceState;
     protected final ColumnFamilyStore cfs;
     protected final long queryStartNanoTime;
+    private   final int initialDataRequestCount;
     protected volatile PartitionIterator result = null;
 
-    protected final Keyspace keyspace;
-    protected final int blockFor;
-
-    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime)
+    AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, int initialDataRequestCount, long queryStartNanoTime)
     {
         this.command = command;
-        this.consistency = consistency;
-        this.targetReplicas = targetReplicas;
-        this.readRepair = ReadRepair.create(command, queryStartNanoTime, consistency);
-        this.digestResolver = new DigestResolver(keyspace, command, consistency, readRepair, targetReplicas.size());
-        this.handler = new ReadCallback(digestResolver, consistency, command, targetReplicas, queryStartNanoTime);
+        this.replicaLayout = replicaLayout;
+        this.initialDataRequestCount = initialDataRequestCount;
+        this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
+        this.digestResolver = new DigestResolver<>(command, replicaLayout, readRepair, queryStartNanoTime);
+        this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime);
         this.cfs = cfs;
         this.traceState = Tracing.instance.get();
         this.queryStartNanoTime = queryStartNanoTime;
-        this.keyspace = keyspace;
-        this.blockFor = consistency.blockFor(keyspace);
 
 
         // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
@@ -92,8 +93,8 @@ public abstract class AbstractReadExecutor
         // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
         // we stop being compatible with pre-3.0 nodes.
         int digestVersion = MessagingService.current_version;
-        for (InetAddressAndPort replica : targetReplicas)
-            digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica));
+        for (Replica replica : replicaLayout.selected())
+            digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica.endpoint()));
         command.setDigestVersion(digestVersion);
     }
 
@@ -109,24 +110,34 @@ public abstract class AbstractReadExecutor
         return readRepair;
     }
 
-    protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
+    protected void makeFullDataRequests(ReplicaCollection<?> replicas)
     {
-        makeRequests(command, endpoints);
+        assert all(replicas, Replica::isFull);
+        makeRequests(command, replicas.filter(Replica::isFull));
+    }
 
+    protected void makeTransientDataRequests(ReplicaCollection<?> replicas)
+    {
+        makeRequests(command.copyAsTransientQuery(), replicas);
     }
 
-    protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
+    protected void makeDigestRequests(ReplicaCollection<?> replicas)
     {
-        makeRequests(command.copyAsDigestQuery(), endpoints);
+        assert all(replicas, Replica::isFull);
+        // only send digest requests to full replicas, send data requests instead to the transient replicas
+        makeRequests(command.copyAsDigestQuery(), replicas);
     }
 
-    private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints)
+    private void makeRequests(ReadCommand readCommand, ReplicaCollection<?> replicas)
     {
         boolean hasLocalEndpoint = false;
 
-        for (InetAddressAndPort endpoint : endpoints)
+        Preconditions.checkArgument(replicas.stream().allMatch(replica -> replica.isFull() || !readCommand.isDigestQuery()),
+                                    "Can not send digest requests to transient replicas");
+        for (Replica replica: replicas)
         {
-            if (StorageProxy.canDoLocalRequest(endpoint))
+            InetAddressAndPort endpoint = replica.endpoint();
+            if (replica.isLocal())
             {
                 hasLocalEndpoint = true;
                 continue;
@@ -134,7 +145,6 @@ public abstract class AbstractReadExecutor
 
             if (traceState != null)
                 traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
-            logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
             MessageOut<ReadCommand> message = readCommand.createMessage();
             MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
         }
@@ -154,16 +164,16 @@ public abstract class AbstractReadExecutor
     public abstract void maybeTryAdditionalReplicas();
 
     /**
-     * Get the replicas involved in the [finished] request.
-     *
-     * @return target replicas + the extra replica, *IF* we speculated.
-     */
-    public abstract List<InetAddressAndPort> getContactedReplicas();
-
-    /**
      * send the initial set of requests
      */
-    public abstract void executeAsync();
+    public void executeAsync()
+    {
+        EndpointsForToken selected = replicaLayout().selected();
+        EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount);
+        makeFullDataRequests(fullDataRequests);
+        makeTransientDataRequests(selected.filter(Replica::isTransient));
+        makeDigestRequests(selected.filter(r -> r.isFull() && !fullDataRequests.contains(r)));
+    }
 
     /**
      * @return an executor appropriate for the configured speculative read policy
@@ -171,34 +181,33 @@ public abstract class AbstractReadExecutor
     public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-
-        List<InetAddressAndPort> allLiveReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
-        List<InetAddressAndPort> selectedReplicas = consistencyLevel.filterForQuery(keyspace, allLiveReplicas);
-
-        // Throw UAE early if we don't have enough replicas.
-        consistencyLevel.assureSufficientLiveNodes(keyspace, selectedReplicas);
-
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
         SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
 
+        // Endpoints for Token
+        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
+
         // Speculative retry is disabled *OR*
         // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
         if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, selectedReplicas, queryStartNanoTime, false);
+            // TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0
+            return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false);
 
         // There are simply no extra replicas to speculate.
         // Handle this separately so it can record failed attempts to speculate due to lack of replicas
-        if (selectedReplicas.size() == allLiveReplicas.size())
+        if (replicaLayout.selected().size() == replicaLayout.all().size())
         {
             boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
-            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, selectedReplicas, queryStartNanoTime, recordFailedSpeculation);
+            return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation);
         }
 
-        selectedReplicas.add(allLiveReplicas.get(selectedReplicas.size()));
+        // If CL.ALL, upgrade to AlwaysSpeculating;
+        // If We are going to contact every node anyway, ask for 2 full data requests instead of 1, for redundancy
+        // (same amount of requests in total, but we turn 1 digest request into a full blown data request)
         if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, selectedReplicas, queryStartNanoTime);
+            return new AlwaysSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
         else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, selectedReplicas, queryStartNanoTime);
+            return new SpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
     }
 
     /**
@@ -208,10 +217,15 @@ public abstract class AbstractReadExecutor
     boolean shouldSpeculateAndMaybeWait()
     {
         // no latency information, or we're overloaded
-        if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
+        if (cfs.sampleReadLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
             return false;
 
-        return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
+        return !handler.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS);
+    }
+
+    ReplicaLayout.ForToken replicaLayout()
+    {
+        return replicaLayout;
     }
 
     void onReadTimeout() {}
@@ -223,78 +237,36 @@ public abstract class AbstractReadExecutor
          * log it is as a failure if it should have happened
          * but couldn't due to lack of replicas
          */
-        private final boolean recordFailedSpeculation;
-
-        NeverSpeculatingReadExecutor(Keyspace keyspace,
-                                     ColumnFamilyStore cfs,
-                                     ReadCommand command,
-                                     ConsistencyLevel consistencyLevel,
-                                     List<InetAddressAndPort> targetReplicas,
-                                     long queryStartNanoTime,
-                                     boolean recordFailedSpeculation)
-        {
-            super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
-            this.recordFailedSpeculation = recordFailedSpeculation;
-        }
+        private final boolean logFailedSpeculation;
 
-        public void executeAsync()
+        public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, long queryStartNanoTime, boolean logFailedSpeculation)
         {
-            makeDataRequests(targetReplicas.subList(0, 1));
-            if (targetReplicas.size() > 1)
-                makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
+            super(cfs, command, replicaLayout, 1, queryStartNanoTime);
+            this.logFailedSpeculation = logFailedSpeculation;
         }
 
         public void maybeTryAdditionalReplicas()
         {
-            if (shouldSpeculateAndMaybeWait() && recordFailedSpeculation)
+            if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
             {
                 cfs.metric.speculativeInsufficientReplicas.inc();
             }
         }
-
-        public List<InetAddressAndPort> getContactedReplicas()
-        {
-            return targetReplicas;
-        }
     }
 
     static class SpeculatingReadExecutor extends AbstractReadExecutor
     {
         private volatile boolean speculated = false;
 
-        public SpeculatingReadExecutor(Keyspace keyspace,
-                                       ColumnFamilyStore cfs,
+        public SpeculatingReadExecutor(ColumnFamilyStore cfs,
                                        ReadCommand command,
-                                       ConsistencyLevel consistencyLevel,
-                                       List<InetAddressAndPort> targetReplicas,
+                                       ReplicaLayout.ForToken replicaLayout,
                                        long queryStartNanoTime)
         {
-            super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
-        }
-
-        public void executeAsync()
-        {
-            // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating.  So we know
-            // that the last replica in our list is "extra."
-            List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
-
-            if (handler.blockfor < initialReplicas.size())
-            {
-                // We're hitting additional targets for read repair.  Since our "extra" replica is the least-
-                // preferred by the snitch, we do an extra data read to start with against a replica more
-                // likely to reply; better to let RR fail than the entire query.
-                makeDataRequests(initialReplicas.subList(0, 2));
-                if (initialReplicas.size() > 2)
-                    makeDigestRequests(initialReplicas.subList(2, initialReplicas.size()));
-            }
-            else
-            {
-                // not doing read repair; all replies are important, so it doesn't matter which nodes we
-                // perform data reads against vs digest.
-                makeDataRequests(initialReplicas.subList(0, 1));
-                if (initialReplicas.size() > 1)
-                    makeDigestRequests(initialReplicas.subList(1, initialReplicas.size()));
-            }
+            // We're hitting additional targets for read repair (??).  Since our "extra" replica is the least-
+            // preferred by the snitch, we do an extra data read to start with against a replica more
+            // likely to reply; better to let RR fail than the entire query.
+            super(cfs, command, replicaLayout, replicaLayout.blockFor() < replicaLayout.selected().size() ? 2 : 1, queryStartNanoTime);
         }
 
         public void maybeTryAdditionalReplicas()
@@ -302,28 +274,43 @@ public abstract class AbstractReadExecutor
             if (shouldSpeculateAndMaybeWait())
             {
                 //Handle speculation stats first in case the callback fires immediately
-                speculated = true;
                 cfs.metric.speculativeRetries.inc();
-                // Could be waiting on the data, or on enough digests.
+                speculated = true;
+
                 ReadCommand retryCommand = command;
+                Replica extraReplica;
                 if (handler.resolver.isDataPresent())
-                    retryCommand = command.copyAsDigestQuery();
+                {
+                    extraReplica = tryFind(replicaLayout().all(),
+                            r -> !replicaLayout().selected().contains(r)).orNull();
+
+                    // we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
+                    assert extraReplica != null;
+
+                    retryCommand = extraReplica.isTransient()
+                            ? command.copyAsTransientQuery()
+                            : command.copyAsDigestQuery();
+                }
+                else
+                {
+                    extraReplica = tryFind(replicaLayout().all(),
+                            r -> r.isFull() && !replicaLayout().selected().contains(r)).orNull();
+                    if (extraReplica == null)
+                    {
+                        cfs.metric.speculativeInsufficientReplicas.inc();
+                        // cannot safely speculate a new data request, without more work - requests assumed to be
+                        // unique per endpoint, and we have no full nodes left to speculate against
+                        return;
+                    }
+                }
 
-                InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas);
                 if (traceState != null)
                     traceState.trace("speculating read retry on {}", extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
-                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
+                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica.endpoint(), handler);
             }
         }
 
-        public List<InetAddressAndPort> getContactedReplicas()
-        {
-            return speculated
-                 ? targetReplicas
-                 : targetReplicas.subList(0, targetReplicas.size() - 1);
-        }
-
         @Override
         void onReadTimeout()
         {
@@ -336,14 +323,12 @@ public abstract class AbstractReadExecutor
 
     private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
-                                             ColumnFamilyStore cfs,
+        public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
                                              ReadCommand command,
-                                             ConsistencyLevel consistencyLevel,
-                                             List<InetAddressAndPort> targetReplicas,
+                                             ReplicaLayout.ForToken replicaLayout,
                                              long queryStartNanoTime)
         {
-            super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+            super(cfs, command, replicaLayout, replicaLayout.selected().size() > 1 ? 2 : 1, queryStartNanoTime);
         }
 
         public void maybeTryAdditionalReplicas()
@@ -351,17 +336,10 @@ public abstract class AbstractReadExecutor
             // no-op
         }
 
-        public List<InetAddressAndPort> getContactedReplicas()
-        {
-            return targetReplicas;
-        }
-
         @Override
         public void executeAsync()
         {
-            makeDataRequests(targetReplicas.subList(0, targetReplicas.size() > 1 ? 2 : 1));
-            if (targetReplicas.size() > 2)
-                makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
+            super.executeAsync();
             cfs.metric.speculativeRetries.inc();
         }
 
@@ -407,7 +385,7 @@ public abstract class AbstractReadExecutor
         else
         {
             Tracing.trace("Digest mismatch: Mismatch for key {}", getKey());
-            readRepair.startRepair(digestResolver, handler.endpoints, getContactedReplicas(), this::setResult);
+            readRepair.startRepair(digestResolver, this::setResult);
         }
     }
 
@@ -425,8 +403,7 @@ public abstract class AbstractReadExecutor
                 logger.trace("Timed out waiting on digest mismatch repair requests");
             // the caught exception here will have CL.ALL from the repair command,
             // not whatever CL the initial command was at (CASSANDRA-7947)
-            int blockFor = consistency.blockFor(Keyspace.open(command.metadata().keyspace));
-            throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
+            throw new ReadTimeoutException(replicaLayout().consistencyLevel(), handler.blockfor - 1, handler.blockfor, true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index c0bff7a..9043e87 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -17,39 +17,55 @@
  */
 package org.apache.cassandra.service.reads;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.db.transform.*;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
+import org.apache.cassandra.db.transform.Filter;
+import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Replicas;
+import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 
-public class DataResolver extends ResponseResolver
+import static com.google.common.collect.Iterables.*;
+
+public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
 {
-    private final long queryStartNanoTime;
     private final boolean enforceStrictLiveness;
 
-    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime, ReadRepair readRepair)
+    public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
     {
-        super(keyspace, command, consistency, readRepair, maxResponseCount);
-        this.queryStartNanoTime = queryStartNanoTime;
+        super(command, replicaLayout, readRepair, queryStartNanoTime);
         this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
     }
 
     public PartitionIterator getData()
     {
-        ReadResponse response = responses.iterator().next().payload;
+        ReadResponse response = responses.get(0).payload;
         return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
     }
 
@@ -63,15 +79,13 @@ public class DataResolver extends ResponseResolver
     {
         // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
         // at the beginning of this method), so grab the response count once and use that through the method.
-        int count = responses.size();
-        List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
-        InetAddressAndPort[] sources = new InetAddressAndPort[count];
-        for (int i = 0; i < count; i++)
-        {
-            MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator(command));
-            sources[i] = msg.from;
-        }
+        Collection<MessageIn<ReadResponse>> messages = responses.snapshot();
+        assert !any(messages, msg -> msg.payload.isDigestResponse());
+
+        E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
+        List<UnfilteredPartitionIterator> iters = new ArrayList<>(
+                Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
+        assert replicas.size() == iters.size();
 
         /*
          * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
@@ -86,18 +100,19 @@ public class DataResolver extends ResponseResolver
          *
          * See CASSANDRA-13747 for more details.
          */
-
         DataLimits.Counter mergedResultCounter =
             command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
 
-        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
+                                                                          replicaLayout.withSelected(replicas),
+                                                                          mergedResultCounter);
         FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
         PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
         return Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
     private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
-                                                                     InetAddressAndPort[] sources,
+                                                                     L sources,
                                                                      DataLimits.Counter mergedResultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads
@@ -110,17 +125,17 @@ public class DataResolver extends ResponseResolver
          */
         if (!command.limits().isUnlimited())
             for (int i = 0; i < results.size(); i++)
-                results.set(i, ShortReadProtection.extend(sources[i], results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
+                results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
 
         return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources));
     }
 
     private String makeResponsesDebugString(DecoratedKey partitionKey)
     {
-        return Joiner.on(",\n").join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
+        return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
     }
 
-    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, InetAddressAndPort[] sources)
+    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources)
     {
         return new UnfilteredPartitionIterators.MergeListener()
         {
@@ -144,8 +159,8 @@ public class DataResolver extends ResponseResolver
                             String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
                                                            table,
                                                            mergedDeletion == null ? "null" : mergedDeletion.toString(),
-                                                           '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
-                                                           Arrays.toString(sources),
+                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
+                                                           sources.selected(),
                                                            makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }
@@ -165,8 +180,8 @@ public class DataResolver extends ResponseResolver
                             String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
                                                            table,
                                                            merged == null ? "null" : merged.toString(table),
-                                                           '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
-                                                           Arrays.toString(sources),
+                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+                                                           sources.selected(),
                                                            makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }
@@ -191,8 +206,8 @@ public class DataResolver extends ResponseResolver
                             String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
                                                            table,
                                                            merged == null ? "null" : merged.toString(table),
-                                                           '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
-                                                           Arrays.toString(sources),
+                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+                                                           sources.selected(),
                                                            makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 897892f..c3eee43 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -18,25 +18,35 @@
 package org.apache.cassandra.service.reads;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class DigestResolver extends ResponseResolver
+import static com.google.common.collect.Iterables.any;
+
+public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
 {
-    private volatile ReadResponse dataResponse;
+    private volatile MessageIn<ReadResponse> dataResponse;
 
-    public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount)
+    public DigestResolver(ReadCommand command, L replicas, ReadRepair<E, L> readRepair, long queryStartNanoTime)
     {
-        super(keyspace, command, consistency, readRepair, maxResponseCount);
+        super(command, replicas, readRepair, queryStartNanoTime);
         Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
                                     "DigestResolver can only be used with SinglePartitionReadCommand commands");
     }
@@ -45,14 +55,60 @@ public class DigestResolver extends ResponseResolver
     public void preprocess(MessageIn<ReadResponse> message)
     {
         super.preprocess(message);
-        if (dataResponse == null && !message.payload.isDigestResponse())
-            dataResponse = message.payload;
+        Replica replica = replicaLayout.getReplicaFor(message.from);
+        if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
+        {
+            dataResponse = message;
+        }
+        else if (replica.isTransient() && message.payload.isDigestResponse())
+        {
+            throw new IllegalStateException("digest response received from transient replica");
+        }
+    }
+
+    @VisibleForTesting
+    public boolean hasTransientResponse()
+    {
+        return hasTransientResponse(responses.snapshot());
+    }
+
+    private boolean hasTransientResponse(Collection<MessageIn<ReadResponse>> responses)
+    {
+        return any(responses,
+                msg -> !msg.payload.isDigestResponse()
+                        && replicaLayout.getReplicaFor(msg.from).isTransient());
     }
 
     public PartitionIterator getData()
     {
         assert isDataPresent();
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+
+        Collection<MessageIn<ReadResponse>> responses = this.responses.snapshot();
+
+        if (!hasTransientResponse(responses))
+        {
+            return UnfilteredPartitionIterators.filter(dataResponse.payload.makeIterator(command), command.nowInSec());
+        }
+        else
+        {
+            // This path can be triggered only if we've got responses from full replicas and they match, but
+            // transient replica response still contains data, which needs to be reconciled.
+            DataResolver<E, L> dataResolver = new DataResolver<>(command,
+                                                                 replicaLayout,
+                                                                 (ReadRepair<E, L>) NoopReadRepair.instance,
+                                                                 queryStartNanoTime);
+
+            dataResolver.preprocess(dataResponse);
+            // Forward differences to all full nodes
+            for (MessageIn<ReadResponse> response : responses)
+            {
+                Replica replica = replicaLayout.getReplicaFor(response.from);
+                if (replica.isTransient())
+                    dataResolver.preprocess(response);
+            }
+
+            return dataResolver.resolve();
+        }
     }
 
     public boolean responsesMatch()
@@ -61,11 +117,12 @@ public class DigestResolver extends ResponseResolver
 
         // validate digests against each other; return false immediately on mismatch.
         ByteBuffer digest = null;
-        for (MessageIn<ReadResponse> message : responses)
+        for (MessageIn<ReadResponse> message : responses.snapshot())
         {
-            ReadResponse response = message.payload;
+            if (replicaLayout.getReplicaFor(message.from).isTransient())
+                continue;
 
-            ByteBuffer newDigest = response.digest(command);
+            ByteBuffer newDigest = message.payload.digest(command);
             if (digest == null)
                 digest = newDigest;
             else if (!digest.equals(newDigest))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 537e684..3d39377 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -18,42 +18,43 @@
 package org.apache.cassandra.service.reads;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
-public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
+public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements IAsyncCallbackWithFailure<ReadResponse>
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
     public final ResponseResolver resolver;
     final SimpleCondition condition = new SimpleCondition();
     private final long queryStartNanoTime;
+    // TODO: move to replica layout as well?
     final int blockfor;
-    final List<InetAddressAndPort> endpoints;
+    final L replicaLayout;
     private final ReadCommand command;
-    private final ConsistencyLevel consistencyLevel;
     private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
     private volatile int received = 0;
@@ -62,37 +63,19 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
     private volatile int failures = 0;
     private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
 
-    private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
-
-    /**
-     * Constructor when response count has to be calculated and blocked for.
-     */
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddressAndPort> filteredEndpoints, long queryStartNanoTime)
-    {
-        this(resolver,
-             consistencyLevel,
-             consistencyLevel.blockFor(Keyspace.open(command.metadata().keyspace)),
-             command,
-             Keyspace.open(command.metadata().keyspace),
-             filteredEndpoints,
-             queryStartNanoTime);
-    }
-
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddressAndPort> endpoints, long queryStartNanoTime)
+    public ReadCallback(ResponseResolver resolver, int blockfor, ReadCommand command, L replicaLayout, long queryStartNanoTime)
     {
         this.command = command;
-        this.keyspace = keyspace;
         this.blockfor = blockfor;
-        this.consistencyLevel = consistencyLevel;
         this.resolver = resolver;
         this.queryStartNanoTime = queryStartNanoTime;
-        this.endpoints = endpoints;
+        this.replicaLayout = replicaLayout;
         this.failureReasonByEndpoint = new ConcurrentHashMap<>();
         // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
-        assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
+        assert !(command instanceof PartitionRangeReadCommand) || blockfor >= replicaLayout.selected().size();
 
         if (logger.isTraceEnabled())
-            logger.trace("Blockfor is {}; setting up requests to {}", blockfor, StringUtils.join(this.endpoints, ","));
+            logger.trace("Blockfor is {}; setting up requests to {}", blockfor, this.replicaLayout);
     }
 
     public boolean await(long timePastStart, TimeUnit unit)
@@ -111,7 +94,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
     public void awaitResults() throws ReadFailureException, ReadTimeoutException
     {
         boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
-        boolean failed = blockfor + failures > endpoints.size();
+        boolean failed = failures > 0 && blockfor + failures > replicaLayout.selected().size();
         if (signaled && !failed)
             return;
 
@@ -128,8 +111,8 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
         // Same as for writes, see AbstractWriteResponseHandler
         throw failed
-            ? new ReadFailureException(consistencyLevel, received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
-            : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
+            ? new ReadFailureException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
+            : new ReadTimeoutException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent());
     }
 
     public int blockFor()
@@ -153,9 +136,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
      */
     private boolean waitingFor(InetAddressAndPort from)
     {
-        return consistencyLevel.isDatacenterLocal()
-             ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
-             : true;
+        return !replicaLayout.consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
     }
 
     /**
@@ -178,7 +159,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
     public void assureSufficientLiveNodes() throws UnavailableException
     {
-        consistencyLevel.assureSufficientLiveNodes(keyspace, endpoints);
+        replicaLayout.consistencyLevel().assureSufficientLiveNodesForRead(replicaLayout.keyspace(), replicaLayout.selected());
     }
 
     public boolean isLatencyForSnitch()
@@ -195,7 +176,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
         failureReasonByEndpoint.put(from, failureReason);
 
-        if (blockfor + n > endpoints.size())
+        if (blockfor + n > replicaLayout.selected().size())
             condition.signalAll();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index f4f00a2..e306b4d 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -20,37 +20,49 @@ package org.apache.cassandra.service.reads;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.utils.concurrent.Accumulator;
 
-public abstract class ResponseResolver
+public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
 {
     protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
 
-    protected final Keyspace keyspace;
     protected final ReadCommand command;
-    protected final ConsistencyLevel consistency;
-    protected final ReadRepair readRepair;
+    protected final L replicaLayout;
+    protected final ReadRepair<E, L> readRepair;
 
     // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
     protected final Accumulator<MessageIn<ReadResponse>> responses;
+    protected final long queryStartNanoTime;
 
-    public ResponseResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount)
+    public ResponseResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
     {
-        this.keyspace = keyspace;
         this.command = command;
-        this.consistency = consistency;
+        this.replicaLayout = replicaLayout;
         this.readRepair = readRepair;
-        this.responses = new Accumulator<>(maxResponseCount);
+        // TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes)
+        this.responses = new Accumulator<>(replicaLayout.all().size());
+        this.queryStartNanoTime = queryStartNanoTime;
     }
 
     public abstract boolean isDataPresent();
 
     public void preprocess(MessageIn<ReadResponse> message)
     {
-        responses.add(message);
+        try
+        {
+            responses.add(message);
+        }
+        catch (IllegalStateException e)
+        {
+            logger.error("Encountered error while trying to preprocess the message {}: %s in command {}, replicas: {}", message, command, readRepair, replicaLayout.consistencyLevel(), replicaLayout.selected());
+            throw e;
+        }
     }
 
     public Accumulator<MessageIn<ReadResponse>> getMessages()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index d4e8957..580b790 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -18,12 +18,13 @@
 
 package org.apache.cassandra.service.reads;
 
-import java.util.Collections;
+import org.apache.cassandra.locator.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
@@ -39,7 +40,8 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.ExcludingBounds;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 import org.apache.cassandra.service.StorageProxy;
@@ -47,8 +49,9 @@ import org.apache.cassandra.tracing.Tracing;
 
 public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator>
 {
+    private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
     private final ReadCommand command;
-    private final InetAddressAndPort source;
+    private final Replica source;
 
     private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
     private final DataLimits.Counter mergedResultCounter; // merged end-result counter
@@ -59,7 +62,7 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
 
     private final long queryStartNanoTime;
 
-    public ShortReadPartitionsProtection(ReadCommand command, InetAddressAndPort source,
+    public ShortReadPartitionsProtection(ReadCommand command, Replica source,
                                          DataLimits.Counter singleResultCounter,
                                          DataLimits.Counter mergedResultCounter,
                                          long queryStartNanoTime)
@@ -84,9 +87,10 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
          * If we don't apply the transformation *after* extending the partition with MoreRows,
          * applyToRow() method of protection will not be called on the first row of the new extension iterator.
          */
+        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
         ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
                                                                          command, source,
-                                                                         this::executeReadCommand,
+                                                                         (cmd) -> executeReadCommand(cmd, replicaLayout),
                                                                          singleResultCounter,
                                                                          mergedResultCounter);
         return Transformation.apply(MoreRows.extend(partition, protection), protection);
@@ -140,9 +144,9 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
 
         ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
         Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+        logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source);
 
-        PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery);
-        return executeReadCommand(cmd);
+        return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
     }
 
     // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
@@ -153,7 +157,7 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
                : counter.counted();
     }
 
-    private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery)
+    private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery)
     {
         PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
 
@@ -165,19 +169,19 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
                                                       : new ExcludingBounds<>(lastPartitionKey, bounds.right);
         DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
 
-        return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+        ReplicaLayout.ForRange replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
+        return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), replicaLayout);
     }
 
-    private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
+    private <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, L replicaLayout)
     {
-        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-        DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime, NoopReadRepair.instance);
-        ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime);
+        DataResolver<E, L> resolver = new DataResolver<>(cmd, replicaLayout, (NoopReadRepair<E, L>)NoopReadRepair.instance, queryStartNanoTime);
+        ReadCallback<E, L> handler = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), cmd, replicaLayout, queryStartNanoTime);
 
-        if (StorageProxy.canDoLocalRequest(source))
+        if (source.isLocal())
             StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
         else
-            MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, handler);
+            MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source.endpoint(), handler);
 
         // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
         handler.awaitResults();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
index f603e9b..ef1d45b 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 
 /**
  * We have a potential short read if the result from a given node contains the requested number of rows
@@ -40,7 +41,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 public class ShortReadProtection
 {
     @SuppressWarnings("resource")
-    public static UnfilteredPartitionIterator extend(InetAddressAndPort source, UnfilteredPartitionIterator partitions,
+    public static UnfilteredPartitionIterator extend(Replica source, UnfilteredPartitionIterator partitions,
                                                      ReadCommand command, DataLimits.Counter mergedResultCounter,
                                                      long queryStartNanoTime, boolean enforceStrictLiveness)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
index 6b1da0b..8dc7fc7 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
@@ -33,14 +33,14 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.transform.MoreRows;
 import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.tracing.Tracing;
 
 class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
 {
     private final ReadCommand command;
-    private final InetAddressAndPort source;
+    private final Replica source;
     private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
     private final DataLimits.Counter mergedResultCounter; // merged end-result counter
     private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor;
@@ -53,7 +53,7 @@ class ShortReadRowsProtection extends Transformation implements MoreRows<Unfilte
     private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
     private int lastQueried = 0; // # extra rows requested from the replica last time
 
-    ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, InetAddressAndPort source,
+    ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, Replica source,
                             Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor,
                             DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 7e3f0ae..30dea74 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -18,29 +18,23 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
 
 import com.codahale.metrics.Meter;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.reads.DataResolver;
@@ -48,11 +42,11 @@ import org.apache.cassandra.service.reads.DigestResolver;
 import org.apache.cassandra.service.reads.ReadCallback;
 import org.apache.cassandra.tracing.Tracing;
 
-public abstract class AbstractReadRepair implements ReadRepair
+public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
 {
     protected final ReadCommand command;
     protected final long queryStartNanoTime;
-    protected final ConsistencyLevel consistency;
+    protected final L replicaLayout;
     protected final ColumnFamilyStore cfs;
 
     private volatile DigestRepair digestRepair = null;
@@ -62,41 +56,25 @@ public abstract class AbstractReadRepair implements ReadRepair
         private final DataResolver dataResolver;
         private final ReadCallback readCallback;
         private final Consumer<PartitionIterator> resultConsumer;
-        private final List<InetAddressAndPort> initialContacts;
 
-        public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer, List<InetAddressAndPort> initialContacts)
+        public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer)
         {
             this.dataResolver = dataResolver;
             this.readCallback = readCallback;
             this.resultConsumer = resultConsumer;
-            this.initialContacts = initialContacts;
         }
     }
 
     public AbstractReadRepair(ReadCommand command,
-                              long queryStartNanoTime,
-                              ConsistencyLevel consistency)
+                              L replicaLayout,
+                              long queryStartNanoTime)
     {
         this.command = command;
         this.queryStartNanoTime = queryStartNanoTime;
-        this.consistency = consistency;
+        this.replicaLayout = replicaLayout;
         this.cfs = Keyspace.openAndGetStore(command.metadata());
     }
 
-    private int getMaxResponses()
-    {
-        AbstractReplicationStrategy strategy = cfs.keyspace.getReplicationStrategy();
-        if (consistency.isDatacenterLocal() && strategy instanceof NetworkTopologyStrategy)
-        {
-            NetworkTopologyStrategy nts = (NetworkTopologyStrategy) strategy;
-            return nts.getReplicationFactor(DatabaseDescriptor.getLocalDataCenter());
-        }
-        else
-        {
-            return strategy.getReplicationFactor();
-        }
-    }
-
     void sendReadCommand(InetAddressAndPort to, ReadCallback readCallback)
     {
         MessagingService.instance().sendRRWithFailure(command.createMessage(), to, readCallback);
@@ -105,24 +83,23 @@ public abstract class AbstractReadRepair implements ReadRepair
     abstract Meter getRepairMeter();
 
     // digestResolver isn't used here because we resend read requests to all participants
-    public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+    public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
     {
         getRepairMeter().mark();
 
         // Do a full data read to resolve the correct response (and repair node that need be)
-        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-        DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, getMaxResponses(), queryStartNanoTime, this);
-        ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, consistency.blockFor(cfs.keyspace), command,
-                                                     keyspace, allEndpoints, queryStartNanoTime);
+        DataResolver<E, L> resolver = new DataResolver<>(command, replicaLayout, this, queryStartNanoTime);
+        ReadCallback<E, L> readCallback = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(cfs.keyspace),
+                                                             command, replicaLayout, queryStartNanoTime);
 
-        digestRepair = new DigestRepair(resolver, readCallback, resultConsumer, contactedEndpoints);
+        digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
 
-        for (InetAddressAndPort endpoint : contactedEndpoints)
+        for (Replica replica : replicaLayout.selected())
         {
-            Tracing.trace("Enqueuing full data read to {}", endpoint);
-            sendReadCommand(endpoint, readCallback);
+            Tracing.trace("Enqueuing full data read to {}", replica);
+            sendReadCommand(replica.endpoint(), readCallback);
         }
-        ReadRepairDiagnostics.startRepair(this, contactedEndpoints, digestResolver, allEndpoints);
+        ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints());
     }
 
     public void awaitReads() throws ReadTimeoutException
@@ -137,15 +114,11 @@ public abstract class AbstractReadRepair implements ReadRepair
 
     private boolean shouldSpeculate()
     {
+        ConsistencyLevel consistency = replicaLayout.consistencyLevel();
         ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
         return  consistency != ConsistencyLevel.EACH_QUORUM
                 && consistency.satisfies(speculativeCL, cfs.keyspace)
-                && cfs.sampleLatencyNanos <= TimeUnit.MILLISECONDS.toNanos(command.getTimeout());
-    }
-
-    Iterable<InetAddressAndPort> getCandidatesForToken(Token token)
-    {
-        return BlockingReadRepairs.getCandidateEndpoints(cfs.keyspace, token, consistency);
+                && cfs.sampleReadLatencyNanos <= TimeUnit.MILLISECONDS.toNanos(command.getTimeout());
     }
 
     public void maybeSendAdditionalReads()
@@ -156,20 +129,17 @@ public abstract class AbstractReadRepair implements ReadRepair
         if (repair == null)
             return;
 
-        if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS))
+        if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS))
         {
-            Set<InetAddressAndPort> contacted = Sets.newHashSet(repair.initialContacts);
-            Token replicaToken = ((SinglePartitionReadCommand) command).partitionKey().getToken();
-            Iterable<InetAddressAndPort> candidates = getCandidatesForToken(replicaToken);
-
-            Optional<InetAddressAndPort> endpoint = Iterables.tryFind(candidates, e -> !contacted.contains(e));
-            if (endpoint.isPresent())
-            {
-                Tracing.trace("Enqueuing speculative full data read to {}", endpoint);
-                sendReadCommand(endpoint.get(), repair.readCallback);
-                ReadRepairMetrics.speculatedRead.mark();
-                ReadRepairDiagnostics.speculatedRead(this, endpoint.get(), candidates);
-            }
+            L uncontacted = replicaLayout.forNaturalUncontacted();
+            if (uncontacted.selected().isEmpty())
+                return;
+
+            Replica replica = uncontacted.selected().iterator().next();
+            Tracing.trace("Enqueuing speculative full data read to {}", replica);
+            sendReadCommand(replica.endpoint(), repair.readCallback);
+            ReadRepairMetrics.speculatedRead.mark();
+            ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 8d69bef..54af2cf 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -20,16 +20,14 @@ package org.apache.cassandra.service.reads.repair;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -38,7 +36,11 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
@@ -47,33 +49,29 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.tracing.Tracing;
 
-public class BlockingPartitionRepair extends AbstractFuture<Object> implements IAsyncCallback<Object>
+public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractFuture<Object> implements IAsyncCallback<Object>
 {
-    private final Keyspace keyspace;
     private final DecoratedKey key;
-    private final ConsistencyLevel consistency;
-    private final InetAddressAndPort[] participants;
-    private final ConcurrentMap<InetAddressAndPort, Mutation> pendingRepairs;
+    private final L replicaLayout;
+    private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
 
     private volatile long mutationsSentTime;
 
-    public BlockingPartitionRepair(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, InetAddressAndPort[] participants)
+    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
     {
-        this.keyspace = keyspace;
         this.key = key;
-        this.consistency = consistency;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
-        this.participants = participants;
+        this.replicaLayout = replicaLayout;
 
         // here we remove empty repair mutations from the block for total, since
         // we're not sending them mutations
         int blockFor = maxBlockFor;
-        for (InetAddressAndPort participant: participants)
+        for (Replica participant: replicaLayout.selected())
         {
             // remote dcs can sometimes get involved in dc-local reads. We want to repair
             // them if they do, but they shouldn't interfere with blocking the client read.
-            if (!repairs.containsKey(participant) && shouldBlockOn(participant))
+            if (!repairs.containsKey(participant) && shouldBlockOn(participant.endpoint()))
                 blockFor--;
         }
 
@@ -99,7 +97,7 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
 
     private boolean shouldBlockOn(InetAddressAndPort endpoint)
     {
-        return !consistency.isDatacenterLocal() || isLocal(endpoint);
+        return !replicaLayout.consistencyLevel().isDatacenterLocal() || isLocal(endpoint);
     }
 
     @VisibleForTesting
@@ -107,7 +105,7 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
     {
         if (shouldBlockOn(from))
         {
-            pendingRepairs.remove(from);
+            pendingRepairs.remove(replicaLayout.getReplicaFor(from));
             latch.countDown();
         }
     }
@@ -148,20 +146,23 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
     public void sendInitialRepairs()
     {
         mutationsSentTime = System.nanoTime();
-        for (Map.Entry<InetAddressAndPort, Mutation> entry: pendingRepairs.entrySet())
+        Replicas.assertFull(pendingRepairs.keySet());
+
+        for (Map.Entry<Replica, Mutation> entry: pendingRepairs.entrySet())
         {
-            InetAddressAndPort destination = entry.getKey();
+            Replica destination = entry.getKey();
+            Preconditions.checkArgument(destination.isFull(), "Can't send repairs to transient replicas: %s", destination);
             Mutation mutation = entry.getValue();
             TableId tableId = extractUpdate(mutation).metadata().id;
 
             Tracing.trace("Sending read-repair-mutation to {}", destination);
             // use a separate verb here to avoid writing hints on timeouts
-            sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), destination);
+            sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), destination.endpoint());
             ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark();
 
-            if (!shouldBlockOn(destination))
+            if (!shouldBlockOn(destination.endpoint()))
                 pendingRepairs.remove(destination);
-            ReadRepairDiagnostics.sendInitialRepair(this, destination, mutation);
+            ReadRepairDiagnostics.sendInitialRepair(this, destination.endpoint(), mutation);
         }
     }
 
@@ -197,9 +198,8 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
         if (awaitRepairs(timeout, timeoutUnit))
             return;
 
-        Set<InetAddressAndPort> exclude = Sets.newHashSet(participants);
-        Iterable<InetAddressAndPort> candidates = Iterables.filter(getCandidateEndpoints(), e -> !exclude.contains(e));
-        if (Iterables.isEmpty(candidates))
+        L newCandidates = replicaLayout.forNaturalUncontacted();
+        if (newCandidates.selected().isEmpty())
             return;
 
         PartitionUpdate update = mergeUnackedUpdates();
@@ -212,34 +212,34 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
 
         Mutation[] versionedMutations = new Mutation[msgVersionIdx(MessagingService.current_version) + 1];
 
-        for (InetAddressAndPort endpoint: candidates)
+        for (Replica replica : newCandidates.selected())
         {
-            int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(endpoint));
+            int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(replica.endpoint()));
 
             Mutation mutation = versionedMutations[versionIdx];
 
             if (mutation == null)
             {
-                mutation = BlockingReadRepairs.createRepairMutation(update, consistency, endpoint, true);
+                mutation = BlockingReadRepairs.createRepairMutation(update, replicaLayout.consistencyLevel(), replica.endpoint(), true);
                 versionedMutations[versionIdx] = mutation;
             }
 
             if (mutation == null)
             {
                 // the mutation is too large to send.
-                ReadRepairDiagnostics.speculatedWriteOversized(this, endpoint);
+                ReadRepairDiagnostics.speculatedWriteOversized(this, replica.endpoint());
                 continue;
             }
 
-            Tracing.trace("Sending speculative read-repair-mutation to {}", endpoint);
-            sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), endpoint);
-            ReadRepairDiagnostics.speculatedWrite(this, endpoint, mutation);
+            Tracing.trace("Sending speculative read-repair-mutation to {}", replica);
+            sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), replica.endpoint());
+            ReadRepairDiagnostics.speculatedWrite(this, replica.endpoint(), mutation);
         }
     }
 
     Keyspace getKeyspace()
     {
-        return keyspace;
+        return replicaLayout.keyspace();
     }
 
     DecoratedKey getKey()
@@ -249,13 +249,6 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
 
     ConsistencyLevel getConsistency()
     {
-        return consistency;
-    }
-
-    @VisibleForTesting
-    protected Iterable<InetAddressAndPort> getCandidateEndpoints()
-    {
-        return BlockingReadRepairs.getCandidateEndpoints(keyspace, key.getToken(), consistency);
+        return replicaLayout.consistencyLevel();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index e46372e..402aed0 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -23,18 +23,19 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.locator.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Meter;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -43,20 +44,22 @@ import org.apache.cassandra.tracing.Tracing;
  *  updates have been written to nodes needing correction. Breaks write
  *  atomicity in some situations
  */
-public class BlockingReadRepair extends AbstractReadRepair
+public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
 {
     private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
 
     protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
+    private final int blockFor;
 
-    public BlockingReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
+    BlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
     {
-        super(command, queryStartNanoTime, consistency);
+        super(command, replicaLayout, queryStartNanoTime);
+        this.blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
     }
 
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
     {
-        return new PartitionIteratorMergeListener(endpoints, command, consistency, this);
+        return new PartitionIteratorMergeListener(replicaLayout, command, this.replicaLayout.consistencyLevel(), this);
     }
 
     @Override
@@ -70,7 +73,7 @@ public class BlockingReadRepair extends AbstractReadRepair
     {
         for (BlockingPartitionRepair repair: repairs)
         {
-            repair.maybeSendAdditionalWrites(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
+            repair.maybeSendAdditionalWrites(cfs.transientWriteLatencyNanos, TimeUnit.NANOSECONDS);
         }
     }
 
@@ -88,20 +91,20 @@ public class BlockingReadRepair extends AbstractReadRepair
         if (timedOut)
         {
             // We got all responses, but timed out while repairing
-            int blockFor = consistency.blockFor(cfs.keyspace);
+            int blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
             if (Tracing.isTracing())
                 Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
             else
                 logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 
-            throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
+            throw new ReadTimeoutException(replicaLayout.consistencyLevel(), blockFor - 1, blockFor, true);
         }
     }
 
     @Override
-    public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
     {
-        BlockingPartitionRepair blockingRepair = new BlockingPartitionRepair(cfs.keyspace, key, consistency, mutations, consistency.blockFor(cfs.keyspace), destinations);
+        BlockingPartitionRepair<E, L> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaLayout);
         blockingRepair.sendInitialRepairs();
         repairs.add(blockingRepair);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
index e5f7179..ceb1765 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
@@ -18,10 +18,6 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import java.util.List;
-
-import com.google.common.collect.Iterables;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,13 +27,10 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.tracing.Tracing;
 
 public class BlockingReadRepairs
@@ -48,18 +41,6 @@ public class BlockingReadRepairs
         Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
 
     /**
-     * Returns all of the endpoints that are replicas for the given key. If the consistency level is datacenter
-     * local, only the endpoints in the local dc will be returned.
-     */
-    static Iterable<InetAddressAndPort> getCandidateEndpoints(Keyspace keyspace, Token token, ConsistencyLevel consistency)
-    {
-        List<InetAddressAndPort> endpoints = StorageProxy.getLiveSortedEndpoints(keyspace, token);
-        return consistency.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy
-               ? Iterables.filter(endpoints, ConsistencyLevel::isLocal)
-               : endpoints;
-    }
-
-    /**
      * Create a read repair mutation from the given update, if the mutation is not larger than the maximum
      * mutation size, otherwise return null. Or, if we're configured to be strict, throw an exception.
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org