You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/26 22:38:55 UTC

[1/3] git commit: Fixes for speculative retry patch by ayeschenko and jbellis for CASSANDRA-5932

Updated Branches:
  refs/heads/cassandra-2.0 006eec4a5 -> 20c419b94
  refs/heads/trunk 246fefabf -> 84f6c26aa


Fixes for speculative retry
patch by ayeschenko and jbellis for CASSANDRA-5932


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20c419b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20c419b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20c419b9

Branch: refs/heads/cassandra-2.0
Commit: 20c419b9480e0e5b3c1da53a106b2a6760be35b9
Parents: 006eec4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Sep 26 15:38:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Sep 26 15:38:36 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/AbstractReadExecutor.java | 338 ++++++++++++-------
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../apache/cassandra/service/StorageProxy.java  |  30 +-
 4 files changed, 226 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc3daf6..3d4d19c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.2
+ * Fixes for speculative retry (CASSANDRA-5932)
  * Improve memory usage of metadata min/max column names (CASSANDRA-6077)
  * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
  * Fix insertion of collections with CAS (CASSANDRA-6069)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 2ebc0b3..83368c2 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -18,12 +18,17 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -39,142 +44,225 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
 import org.apache.cassandra.utils.FBUtilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+/**
+ * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
+ *
+ * Optionally, may perform additional requests to provide redundancy against replica failure:
+ * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while
+ * SpeculatingReadExecutor will wait until it looks like the original request is in danger
+ * of timing out before performing extra reads.
+ */
 public abstract class AbstractReadExecutor
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
-    protected final ReadCallback<ReadResponse, Row> handler;
+
     protected final ReadCommand command;
+    protected final List<InetAddress> targetReplicas;
     protected final RowDigestResolver resolver;
-    protected final List<InetAddress> unfiltered;
-    protected final List<InetAddress> endpoints;
-    protected final ColumnFamilyStore cfs;
-
-    AbstractReadExecutor(ColumnFamilyStore cfs,
-                         ReadCommand command,
-                         ConsistencyLevel consistency_level,
-                         List<InetAddress> allReplicas,
-                         List<InetAddress> queryTargets)
-    throws UnavailableException
+    protected final ReadCallback<ReadResponse, Row> handler;
+
+    AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
     {
-        unfiltered = allReplicas;
-        this.endpoints = queryTargets;
-        this.resolver = new RowDigestResolver(command.ksName, command.key);
-        this.handler = new ReadCallback<ReadResponse, Row>(resolver, consistency_level, command, this.endpoints);
         this.command = command;
-        this.cfs = cfs;
+        this.targetReplicas = targetReplicas;
+        resolver = new RowDigestResolver(command.ksName, command.key);
+        handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas);
+    }
 
-        handler.assureSufficientLiveNodes();
-        assert !handler.endpoints.isEmpty();
+    private static boolean isLocalRequest(InetAddress replica)
+    {
+        return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS;
     }
 
-    void executeAsync()
+    protected void makeDataRequests(Iterable<InetAddress> endpoints)
     {
-        // The data-request message is sent to dataPoint, the node that will actually get the data for us
-        InetAddress dataPoint = handler.endpoints.get(0);
-        if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS)
-        {
-            logger.trace("reading data locally");
-            StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
-        }
-        else
+        for (InetAddress endpoint : endpoints)
         {
-            logger.trace("reading data from {}", dataPoint);
-            MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
+            if (isLocalRequest(endpoint))
+            {
+                logger.trace("reading data locally");
+                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+            }
+            else
+            {
+                logger.trace("reading data from {}", endpoint);
+                MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+            }
         }
+    }
 
-        if (handler.endpoints.size() == 1)
-            return;
-
-        // send the other endpoints a digest request
+    protected void makeDigestRequests(Iterable<InetAddress> endpoints)
+    {
         ReadCommand digestCommand = command.copy();
         digestCommand.setDigestQuery(true);
-        MessageOut<?> message = null;
-        for (int i = 1; i < handler.endpoints.size(); i++)
+        MessageOut<?> message = digestCommand.createMessage();
+        for (InetAddress endpoint : endpoints)
         {
-            InetAddress digestPoint = handler.endpoints.get(i);
-            if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+            if (isLocalRequest(endpoint))
             {
                 logger.trace("reading digest locally");
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
             }
             else
             {
-                logger.trace("reading digest from {}", digestPoint);
-                // (We lazy-construct the digest Message object since it may not be necessary if we
-                // are doing a local digest read, or no digest reads at all.)
-                if (message == null)
-                    message = digestCommand.createMessage();
-                MessagingService.instance().sendRR(message, digestPoint, handler);
+                logger.trace("reading digest from {}", endpoint);
+                MessagingService.instance().sendRR(message, endpoint, handler);
             }
         }
     }
 
-    void speculate()
-    {
-        // noop by default.
-    }
+    /**
+     * Perform additional requests if it looks like the original will time out.  May block while it waits
+     * to see if the original requests are answered first.
+     */
+    public abstract void maybeTryAdditionalReplicas();
 
-    Row get() throws ReadTimeoutException, DigestMismatchException
+    /**
+     * Get the replicas involved in the [finished] request.
+     *
+     * @return target replicas + the extra replica, *IF* we speculated.
+     */
+    public abstract Iterable<InetAddress> getContactedReplicas();
+
+    /**
+     * send the initial set of requests
+     */
+    public abstract void executeAsync();
+
+    /**
+     * wait for an answer.  Blocks until success or timeout, so it is caller's
+     * responsibility to call maybeTryAdditionalReplicas first.
+     */
+    public Row get() throws ReadTimeoutException, DigestMismatchException
     {
         return handler.get();
     }
 
-    public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistency_level) throws UnavailableException
+    /**
+     * @return an executor appropriate for the configured speculative read policy
+     */
+    public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(command.ksName);
         List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.key);
-        CFMetaData metaData = Schema.instance.getCFMetaData(command.ksName, command.cfName);
+        ReadRepairDecision repairDecision = Schema.instance.getCFMetaData(command.ksName, command.cfName).newReadRepairDecision();
+        List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
+
+        // Throw UAE early if we don't have enough replicas.
+        consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
 
-        ReadRepairDecision rrDecision = metaData.newReadRepairDecision();
-         
-        if (rrDecision != ReadRepairDecision.NONE) {
+        // Fat client. Speculating read executors need access to cfs metrics and sampled latency, and fat clients
+        // can't provide that. So, for now, fat clients will always use NeverSpeculatingReadExecutor.
+        if (StorageService.instance.isClientMode())
+            return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+
+        if (repairDecision != ReadRepairDecision.NONE)
             ReadRepairMetrics.attempted.mark();
-        }
 
-        List<InetAddress> queryTargets = consistency_level.filterForQuery(keyspace, allReplicas, rrDecision);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
+        RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
 
-        if (StorageService.instance.isClientMode())
+        // Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
+        if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) == allReplicas.size())
+            return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+
+        if (targetReplicas.size() == allReplicas.size())
         {
-            return new DefaultReadExecutor(null, command, consistency_level, allReplicas, queryTargets);
+            // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
+            // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
+            // (same amount of requests in total, but we turn 1 digest request into a full blown data request).
+            return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
         }
 
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
-
-        switch (metaData.getSpeculativeRetry().type)
+        // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
+        InetAddress extraReplica = allReplicas.get(targetReplicas.size());
+        // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so
+        // we might have to find a replacement that's not already in targetReplicas.
+        if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica))
         {
-            case ALWAYS:
-                return new SpeculateAlwaysExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
-            case PERCENTILE:
-            case CUSTOM:
-                return queryTargets.size() < allReplicas.size()
-                       ? new SpeculativeReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets)
-                       : new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
-            default:
-                return new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+            for (InetAddress address : allReplicas)
+            {
+                if (!targetReplicas.contains(address))
+                {
+                    extraReplica = address;
+                    break;
+                }
+            }
         }
+        targetReplicas.add(extraReplica);
+
+        if (retryType == RetryType.ALWAYS)
+            return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
+        else // PERCENTILE or CUSTOM.
+            return new SpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
     }
 
-    private static class DefaultReadExecutor extends AbstractReadExecutor
+    private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        public NeverSpeculatingReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+        {
+            super(command, consistencyLevel, targetReplicas);
+        }
+
+        public void executeAsync()
         {
-            super(cfs, command, consistency_level, allReplicas, queryTargets);
+            makeDataRequests(targetReplicas.subList(0, 1));
+            if (targetReplicas.size() > 1)
+                makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
+        }
+
+        public void maybeTryAdditionalReplicas()
+        {
+            // no-op
+        }
+
+        public Iterable<InetAddress> getContactedReplicas()
+        {
+            return targetReplicas;
         }
     }
 
-    private static class SpeculativeReadExecutor extends AbstractReadExecutor
+    private static class SpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        private final ColumnFamilyStore cfs;
+        private volatile boolean speculated = false;
+
+        public SpeculatingReadExecutor(ColumnFamilyStore cfs,
+                                       ReadCommand command,
+                                       ConsistencyLevel consistencyLevel,
+                                       List<InetAddress> targetReplicas)
         {
-            super(cfs, command, consistency_level, allReplicas, queryTargets);
-            assert handler.endpoints.size() < unfiltered.size();
+            super(command, consistencyLevel, targetReplicas);
+            this.cfs = cfs;
         }
 
-        @Override
-        void speculate()
+        public void executeAsync()
+        {
+            // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating.  So we know
+            // that the last replica in our list is "extra."
+            List<InetAddress> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
+
+            if (handler.blockfor < initialReplicas.size())
+            {
+                // We're hitting additional targets for read repair.  Since our "extra" replica is the least-
+                // preferred by the snitch, we do an extra data read to start with against a replica more
+                // likely to reply; better to let RR fail than the entire query.
+                makeDataRequests(initialReplicas.subList(0, 2));
+                if (initialReplicas.size() > 2)
+                    makeDigestRequests(initialReplicas.subList(2, initialReplicas.size()));
+            }
+            else
+            {
+                // not doing read repair; all replies are important, so it doesn't matter which nodes we
+                // perform data reads against vs digest.
+                makeDataRequests(initialReplicas.subList(0, 1));
+                if (initialReplicas.size() > 1)
+                    makeDigestRequests(initialReplicas.subList(1, initialReplicas.size()));
+            }
+        }
+
+        public void maybeTryAdditionalReplicas()
         {
             // no latency information, or we're overloaded
             if (cfs.sampleLatency > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
@@ -182,69 +270,61 @@ public abstract class AbstractReadExecutor
 
             if (!handler.await(cfs.sampleLatency, TimeUnit.NANOSECONDS))
             {
-                InetAddress endpoint = unfiltered.get(handler.endpoints.size());
-
-                // could be waiting on the data, or on enough digests
-                ReadCommand scommand = command;
+                // Could be waiting on the data, or on enough digests.
+                ReadCommand retryCommand = command;
                 if (resolver.getData() != null)
                 {
-                    scommand = command.copy();
-                    scommand.setDigestQuery(true);
+                    retryCommand = command.copy();
+                    retryCommand.setDigestQuery(true);
                 }
 
-                logger.trace("Speculating read retry on {}", endpoint);
-                MessagingService.instance().sendRR(scommand.createMessage(), endpoint, handler);
+                InetAddress extraReplica = Iterables.getLast(targetReplicas);
+                logger.trace("speculating read retry on {}", extraReplica);
+                MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
+                speculated = true;
+
                 cfs.metric.speculativeRetry.inc();
             }
         }
+
+        public Iterable<InetAddress> getContactedReplicas()
+        {
+            return speculated
+                 ? targetReplicas
+                 : targetReplicas.subList(0, targetReplicas.size() - 1);
+        }
     }
 
-    private static class SpeculateAlwaysExecutor extends AbstractReadExecutor
+    private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        private final ColumnFamilyStore cfs;
+
+        public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
+                                             ReadCommand command,
+                                             ConsistencyLevel consistencyLevel,
+                                             List<InetAddress> targetReplicas)
         {
-            super(cfs, command, consistency_level, allReplicas, queryTargets);
+            super(command, consistencyLevel, targetReplicas);
+            this.cfs = cfs;
         }
 
-        @Override
-        void executeAsync()
+        public void maybeTryAdditionalReplicas()
         {
-            int limit = unfiltered.size() >= 2 ? 2 : 1;
-            for (int i = 0; i < limit; i++)
-            {
-                InetAddress endpoint = unfiltered.get(i);
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                {
-                    logger.trace("reading full data locally");
-                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
-                }
-                else
-                {
-                    logger.trace("reading full data from {}", endpoint);
-                    MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
-                }
-            }
-            if (handler.endpoints.size() <= limit)
-                return;
+            // no-op
+        }
 
-            ReadCommand digestCommand = command.copy();
-            digestCommand.setDigestQuery(true);
-            MessageOut<?> message = digestCommand.createMessage();
-            for (int i = limit; i < handler.endpoints.size(); i++)
-            {
-                // Send the message
-                InetAddress endpoint = handler.endpoints.get(i);
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                {
-                    logger.trace("reading data locally, isDigest: {}", command.isDigestQuery());
-                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
-                }
-                else
-                {
-                    logger.trace("reading full data from {}, isDigest: {}", endpoint, command.isDigestQuery());
-                    MessagingService.instance().sendRR(message, endpoint, handler);
-                }
-            }
+        public Iterable<InetAddress> getContactedReplicas()
+        {
+            return targetReplicas;
+        }
+
+        @Override
+        public void executeAsync()
+        {
+            makeDataRequests(targetReplicas.subList(0, 2));
+            if (targetReplicas.size() > 2)
+                makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
+            cfs.metric.speculativeRetry.inc();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 58847ba..7f9c192 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -50,7 +50,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
     public final IResponseResolver<TMessage, TResolved> resolver;
     private final SimpleCondition condition = new SimpleCondition();
     final long start;
-    private final int blockfor;
+    final int blockfor;
     final List<InetAddress> endpoints;
     private final IReadCommand command;
     private final ConsistencyLevel consistencyLevel;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e0d5dff..ffc65b9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1197,10 +1197,11 @@ public class StorageProxy implements StorageProxyMBean
      * 4. If the digests (if any) match the data return the data
      * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level)
+    private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
     throws UnavailableException, ReadTimeoutException
     {
-        List<Row> rows = new ArrayList<Row>(initialCommands.size());
+        List<Row> rows = new ArrayList<>(initialCommands.size());
+        // (avoid allocating a new list in the common case of nothing-to-retry)
         List<ReadCommand> commandsToRetry = Collections.emptyList();
 
         do
@@ -1217,13 +1218,13 @@ public class StorageProxy implements StorageProxyMBean
                 ReadCommand command = commands.get(i);
                 assert !command.isDigestQuery();
 
-                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
+                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
                 exec.executeAsync();
                 readExecutors[i] = exec;
             }
 
-            for (AbstractReadExecutor exec: readExecutors)
-                exec.speculate();
+            for (AbstractReadExecutor exec : readExecutors)
+                exec.maybeTryAdditionalReplicas();
 
             // read results and make a second pass for any digest mismatches
             List<ReadCommand> repairCommands = null;
@@ -1238,13 +1239,13 @@ public class StorageProxy implements StorageProxyMBean
                         exec.command.maybeTrim(row);
                         rows.add(row);
                     }
+
                     if (logger.isDebugEnabled())
                         logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
-
                 }
                 catch (ReadTimeoutException ex)
                 {
-                    int blockFor = consistency_level.blockFor(Keyspace.open(exec.command.getKeyspace()));
+                    int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
                     int responseCount = exec.handler.getReceivedCount();
                     String gotData = responseCount > 0
                                    ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
@@ -1273,14 +1274,14 @@ public class StorageProxy implements StorageProxyMBean
 
                     if (repairCommands == null)
                     {
-                        repairCommands = new ArrayList<ReadCommand>();
-                        repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
+                        repairCommands = new ArrayList<>();
+                        repairResponseHandlers = new ArrayList<>();
                     }
                     repairCommands.add(exec.command);
                     repairResponseHandlers.add(repairHandler);
 
                     MessageOut<ReadCommand> message = exec.command.createMessage();
-                    for (InetAddress endpoint : exec.handler.endpoints)
+                    for (InetAddress endpoint : exec.getContactedReplicas())
                     {
                         Tracing.trace("Enqueuing full data read to {}", endpoint);
                         MessagingService.instance().sendRR(message, endpoint, repairHandler);
@@ -1288,8 +1289,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
             }
 
-            if (commandsToRetry != Collections.EMPTY_LIST)
-                commandsToRetry.clear();
+            commandsToRetry.clear();
 
             // read the results for the digest mismatch retries
             if (repairResponseHandlers != null)
@@ -1319,8 +1319,8 @@ public class StorageProxy implements StorageProxyMBean
                     catch (TimeoutException e)
                     {
                         Tracing.trace("Timed out on digest mismatch retries");
-                        int blockFor = consistency_level.blockFor(Keyspace.open(command.getKeyspace()));
-                        throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
+                        int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
+                        throw new ReadTimeoutException(consistencyLevel, blockFor, blockFor, true);
                     }
 
                     // retry any potential short reads
@@ -1329,7 +1329,7 @@ public class StorageProxy implements StorageProxyMBean
                     {
                         Tracing.trace("Issuing retry for read command");
                         if (commandsToRetry == Collections.EMPTY_LIST)
-                            commandsToRetry = new ArrayList<ReadCommand>();
+                            commandsToRetry = new ArrayList<>();
                         commandsToRetry.add(retryCommand);
                         continue;
                     }


[3/3] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84f6c26a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84f6c26a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84f6c26a

Branch: refs/heads/trunk
Commit: 84f6c26aaf7cba286f8b98a02cf408fa5bb2131a
Parents: 246fefa 20c419b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Sep 26 15:38:44 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Sep 26 15:38:44 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/AbstractReadExecutor.java | 338 ++++++++++++-------
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../apache/cassandra/service/StorageProxy.java  |  30 +-
 4 files changed, 226 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f6c26a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d2b1310,3d4d19c..d475170
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
 +2.1
 + * change logging from log4j to logback (CASSANDRA-5883)
 + * switch to LZ4 compression for internode communication (CASSANDRA-5887)
 + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
 + * Remove 1.2 network compatibility code (CASSANDRA-5960)
 + * Remove leveled json manifest migration code (CASSANDRA-5996)
 +
 +
  2.0.2
+  * Fixes for speculative retry (CASSANDRA-5932)
   * Improve memory usage of metadata min/max column names (CASSANDRA-6077)
   * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
   * Fix insertion of collections with CAS (CASSANDRA-6069)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f6c26a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------


[2/3] git commit: Fixes for speculative retry patch by ayeschenko and jbellis for CASSANDRA-5932

Posted by jb...@apache.org.
Fixes for speculative retry
patch by ayeschenko and jbellis for CASSANDRA-5932


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20c419b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20c419b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20c419b9

Branch: refs/heads/trunk
Commit: 20c419b9480e0e5b3c1da53a106b2a6760be35b9
Parents: 006eec4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Sep 26 15:38:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Sep 26 15:38:36 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/AbstractReadExecutor.java | 338 ++++++++++++-------
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../apache/cassandra/service/StorageProxy.java  |  30 +-
 4 files changed, 226 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc3daf6..3d4d19c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.2
+ * Fixes for speculative retry (CASSANDRA-5932)
  * Improve memory usage of metadata min/max column names (CASSANDRA-6077)
  * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
  * Fix insertion of collections with CAS (CASSANDRA-6069)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 2ebc0b3..83368c2 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -18,12 +18,17 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -39,142 +44,225 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
 import org.apache.cassandra.utils.FBUtilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+/**
+ * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
+ *
+ * Optionally, may perform additional requests to provide redundancy against replica failure:
+ * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while
+ * SpeculatingReadExecutor will wait until it looks like the original request is in danger
+ * of timing out before performing extra reads.
+ */
 public abstract class AbstractReadExecutor
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
-    protected final ReadCallback<ReadResponse, Row> handler;
+
     protected final ReadCommand command;
+    protected final List<InetAddress> targetReplicas;
     protected final RowDigestResolver resolver;
-    protected final List<InetAddress> unfiltered;
-    protected final List<InetAddress> endpoints;
-    protected final ColumnFamilyStore cfs;
-
-    AbstractReadExecutor(ColumnFamilyStore cfs,
-                         ReadCommand command,
-                         ConsistencyLevel consistency_level,
-                         List<InetAddress> allReplicas,
-                         List<InetAddress> queryTargets)
-    throws UnavailableException
+    protected final ReadCallback<ReadResponse, Row> handler;
+
+    AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
     {
-        unfiltered = allReplicas;
-        this.endpoints = queryTargets;
-        this.resolver = new RowDigestResolver(command.ksName, command.key);
-        this.handler = new ReadCallback<ReadResponse, Row>(resolver, consistency_level, command, this.endpoints);
         this.command = command;
-        this.cfs = cfs;
+        this.targetReplicas = targetReplicas;
+        resolver = new RowDigestResolver(command.ksName, command.key);
+        handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas);
+    }
 
-        handler.assureSufficientLiveNodes();
-        assert !handler.endpoints.isEmpty();
+    private static boolean isLocalRequest(InetAddress replica)
+    {
+        return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS;
     }
 
-    void executeAsync()
+    protected void makeDataRequests(Iterable<InetAddress> endpoints)
     {
-        // The data-request message is sent to dataPoint, the node that will actually get the data for us
-        InetAddress dataPoint = handler.endpoints.get(0);
-        if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS)
-        {
-            logger.trace("reading data locally");
-            StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
-        }
-        else
+        for (InetAddress endpoint : endpoints)
         {
-            logger.trace("reading data from {}", dataPoint);
-            MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
+            if (isLocalRequest(endpoint))
+            {
+                logger.trace("reading data locally");
+                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+            }
+            else
+            {
+                logger.trace("reading data from {}", endpoint);
+                MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+            }
         }
+    }
 
-        if (handler.endpoints.size() == 1)
-            return;
-
-        // send the other endpoints a digest request
+    protected void makeDigestRequests(Iterable<InetAddress> endpoints)
+    {
         ReadCommand digestCommand = command.copy();
         digestCommand.setDigestQuery(true);
-        MessageOut<?> message = null;
-        for (int i = 1; i < handler.endpoints.size(); i++)
+        MessageOut<?> message = digestCommand.createMessage();
+        for (InetAddress endpoint : endpoints)
         {
-            InetAddress digestPoint = handler.endpoints.get(i);
-            if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+            if (isLocalRequest(endpoint))
             {
                 logger.trace("reading digest locally");
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
             }
             else
             {
-                logger.trace("reading digest from {}", digestPoint);
-                // (We lazy-construct the digest Message object since it may not be necessary if we
-                // are doing a local digest read, or no digest reads at all.)
-                if (message == null)
-                    message = digestCommand.createMessage();
-                MessagingService.instance().sendRR(message, digestPoint, handler);
+                logger.trace("reading digest from {}", endpoint);
+                MessagingService.instance().sendRR(message, endpoint, handler);
             }
         }
     }
 
-    void speculate()
-    {
-        // noop by default.
-    }
+    /**
+     * Perform additional requests if it looks like the original will time out.  May block while it waits
+     * to see if the original requests are answered first.
+     */
+    public abstract void maybeTryAdditionalReplicas();
 
-    Row get() throws ReadTimeoutException, DigestMismatchException
+    /**
+     * Get the replicas involved in the [finished] request.
+     *
+     * @return target replicas + the extra replica, *IF* we speculated.
+     */
+    public abstract Iterable<InetAddress> getContactedReplicas();
+
+    /**
+     * send the initial set of requests
+     */
+    public abstract void executeAsync();
+
+    /**
+     * wait for an answer.  Blocks until success or timeout, so it is caller's
+     * responsibility to call maybeTryAdditionalReplicas first.
+     */
+    public Row get() throws ReadTimeoutException, DigestMismatchException
     {
         return handler.get();
     }
 
-    public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistency_level) throws UnavailableException
+    /**
+     * @return an executor appropriate for the configured speculative read policy
+     */
+    public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(command.ksName);
         List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.key);
-        CFMetaData metaData = Schema.instance.getCFMetaData(command.ksName, command.cfName);
+        ReadRepairDecision repairDecision = Schema.instance.getCFMetaData(command.ksName, command.cfName).newReadRepairDecision();
+        List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
+
+        // Throw UAE early if we don't have enough replicas.
+        consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
 
-        ReadRepairDecision rrDecision = metaData.newReadRepairDecision();
-         
-        if (rrDecision != ReadRepairDecision.NONE) {
+        // Fat client. Speculating read executors need access to cfs metrics and sampled latency, and fat clients
+        // can't provide that. So, for now, fat clients will always use NeverSpeculatingReadExecutor.
+        if (StorageService.instance.isClientMode())
+            return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+
+        if (repairDecision != ReadRepairDecision.NONE)
             ReadRepairMetrics.attempted.mark();
-        }
 
-        List<InetAddress> queryTargets = consistency_level.filterForQuery(keyspace, allReplicas, rrDecision);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
+        RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
 
-        if (StorageService.instance.isClientMode())
+        // Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
+        if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) == allReplicas.size())
+            return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+
+        if (targetReplicas.size() == allReplicas.size())
         {
-            return new DefaultReadExecutor(null, command, consistency_level, allReplicas, queryTargets);
+            // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
+            // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
+            // (same amount of requests in total, but we turn 1 digest request into a full blown data request).
+            return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
         }
 
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
-
-        switch (metaData.getSpeculativeRetry().type)
+        // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
+        InetAddress extraReplica = allReplicas.get(targetReplicas.size());
+        // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so
+        // we might have to find a replacement that's not already in targetReplicas.
+        if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica))
         {
-            case ALWAYS:
-                return new SpeculateAlwaysExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
-            case PERCENTILE:
-            case CUSTOM:
-                return queryTargets.size() < allReplicas.size()
-                       ? new SpeculativeReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets)
-                       : new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
-            default:
-                return new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+            for (InetAddress address : allReplicas)
+            {
+                if (!targetReplicas.contains(address))
+                {
+                    extraReplica = address;
+                    break;
+                }
+            }
         }
+        targetReplicas.add(extraReplica);
+
+        if (retryType == RetryType.ALWAYS)
+            return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
+        else // PERCENTILE or CUSTOM.
+            return new SpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
     }
 
-    private static class DefaultReadExecutor extends AbstractReadExecutor
+    private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        public NeverSpeculatingReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+        {
+            super(command, consistencyLevel, targetReplicas);
+        }
+
+        public void executeAsync()
         {
-            super(cfs, command, consistency_level, allReplicas, queryTargets);
+            makeDataRequests(targetReplicas.subList(0, 1));
+            if (targetReplicas.size() > 1)
+                makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
+        }
+
+        public void maybeTryAdditionalReplicas()
+        {
+            // no-op
+        }
+
+        public Iterable<InetAddress> getContactedReplicas()
+        {
+            return targetReplicas;
         }
     }
 
-    private static class SpeculativeReadExecutor extends AbstractReadExecutor
+    private static class SpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        private final ColumnFamilyStore cfs;
+        private volatile boolean speculated = false;
+
+        public SpeculatingReadExecutor(ColumnFamilyStore cfs,
+                                       ReadCommand command,
+                                       ConsistencyLevel consistencyLevel,
+                                       List<InetAddress> targetReplicas)
         {
-            super(cfs, command, consistency_level, allReplicas, queryTargets);
-            assert handler.endpoints.size() < unfiltered.size();
+            super(command, consistencyLevel, targetReplicas);
+            this.cfs = cfs;
         }
 
-        @Override
-        void speculate()
+        public void executeAsync()
+        {
+            // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating.  So we know
+            // that the last replica in our list is "extra."
+            List<InetAddress> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
+
+            if (handler.blockfor < initialReplicas.size())
+            {
+                // We're hitting additional targets for read repair.  Since our "extra" replica is the least-
+                // preferred by the snitch, we do an extra data read to start with against a replica more
+                // likely to reply; better to let RR fail than the entire query.
+                makeDataRequests(initialReplicas.subList(0, 2));
+                if (initialReplicas.size() > 2)
+                    makeDigestRequests(initialReplicas.subList(2, initialReplicas.size()));
+            }
+            else
+            {
+                // not doing read repair; all replies are important, so it doesn't matter which nodes we
+                // perform data reads against vs digest.
+                makeDataRequests(initialReplicas.subList(0, 1));
+                if (initialReplicas.size() > 1)
+                    makeDigestRequests(initialReplicas.subList(1, initialReplicas.size()));
+            }
+        }
+
+        public void maybeTryAdditionalReplicas()
         {
             // no latency information, or we're overloaded
             if (cfs.sampleLatency > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
@@ -182,69 +270,61 @@ public abstract class AbstractReadExecutor
 
             if (!handler.await(cfs.sampleLatency, TimeUnit.NANOSECONDS))
             {
-                InetAddress endpoint = unfiltered.get(handler.endpoints.size());
-
-                // could be waiting on the data, or on enough digests
-                ReadCommand scommand = command;
+                // Could be waiting on the data, or on enough digests.
+                ReadCommand retryCommand = command;
                 if (resolver.getData() != null)
                 {
-                    scommand = command.copy();
-                    scommand.setDigestQuery(true);
+                    retryCommand = command.copy();
+                    retryCommand.setDigestQuery(true);
                 }
 
-                logger.trace("Speculating read retry on {}", endpoint);
-                MessagingService.instance().sendRR(scommand.createMessage(), endpoint, handler);
+                InetAddress extraReplica = Iterables.getLast(targetReplicas);
+                logger.trace("speculating read retry on {}", extraReplica);
+                MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
+                speculated = true;
+
                 cfs.metric.speculativeRetry.inc();
             }
         }
+
+        public Iterable<InetAddress> getContactedReplicas()
+        {
+            return speculated
+                 ? targetReplicas
+                 : targetReplicas.subList(0, targetReplicas.size() - 1);
+        }
     }
 
-    private static class SpeculateAlwaysExecutor extends AbstractReadExecutor
+    private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        private final ColumnFamilyStore cfs;
+
+        public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
+                                             ReadCommand command,
+                                             ConsistencyLevel consistencyLevel,
+                                             List<InetAddress> targetReplicas)
         {
-            super(cfs, command, consistency_level, allReplicas, queryTargets);
+            super(command, consistencyLevel, targetReplicas);
+            this.cfs = cfs;
         }
 
-        @Override
-        void executeAsync()
+        public void maybeTryAdditionalReplicas()
         {
-            int limit = unfiltered.size() >= 2 ? 2 : 1;
-            for (int i = 0; i < limit; i++)
-            {
-                InetAddress endpoint = unfiltered.get(i);
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                {
-                    logger.trace("reading full data locally");
-                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
-                }
-                else
-                {
-                    logger.trace("reading full data from {}", endpoint);
-                    MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
-                }
-            }
-            if (handler.endpoints.size() <= limit)
-                return;
+            // no-op
+        }
 
-            ReadCommand digestCommand = command.copy();
-            digestCommand.setDigestQuery(true);
-            MessageOut<?> message = digestCommand.createMessage();
-            for (int i = limit; i < handler.endpoints.size(); i++)
-            {
-                // Send the message
-                InetAddress endpoint = handler.endpoints.get(i);
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                {
-                    logger.trace("reading data locally, isDigest: {}", command.isDigestQuery());
-                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
-                }
-                else
-                {
-                    logger.trace("reading full data from {}, isDigest: {}", endpoint, command.isDigestQuery());
-                    MessagingService.instance().sendRR(message, endpoint, handler);
-                }
-            }
+        public Iterable<InetAddress> getContactedReplicas()
+        {
+            return targetReplicas;
+        }
+
+        @Override
+        public void executeAsync()
+        {
+            makeDataRequests(targetReplicas.subList(0, 2));
+            if (targetReplicas.size() > 2)
+                makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
+            cfs.metric.speculativeRetry.inc();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 58847ba..7f9c192 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -50,7 +50,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
     public final IResponseResolver<TMessage, TResolved> resolver;
     private final SimpleCondition condition = new SimpleCondition();
     final long start;
-    private final int blockfor;
+    final int blockfor;
     final List<InetAddress> endpoints;
     private final IReadCommand command;
     private final ConsistencyLevel consistencyLevel;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e0d5dff..ffc65b9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1197,10 +1197,11 @@ public class StorageProxy implements StorageProxyMBean
      * 4. If the digests (if any) match the data return the data
      * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level)
+    private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
     throws UnavailableException, ReadTimeoutException
     {
-        List<Row> rows = new ArrayList<Row>(initialCommands.size());
+        List<Row> rows = new ArrayList<>(initialCommands.size());
+        // (avoid allocating a new list in the common case of nothing-to-retry)
         List<ReadCommand> commandsToRetry = Collections.emptyList();
 
         do
@@ -1217,13 +1218,13 @@ public class StorageProxy implements StorageProxyMBean
                 ReadCommand command = commands.get(i);
                 assert !command.isDigestQuery();
 
-                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
+                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
                 exec.executeAsync();
                 readExecutors[i] = exec;
             }
 
-            for (AbstractReadExecutor exec: readExecutors)
-                exec.speculate();
+            for (AbstractReadExecutor exec : readExecutors)
+                exec.maybeTryAdditionalReplicas();
 
             // read results and make a second pass for any digest mismatches
             List<ReadCommand> repairCommands = null;
@@ -1238,13 +1239,13 @@ public class StorageProxy implements StorageProxyMBean
                         exec.command.maybeTrim(row);
                         rows.add(row);
                     }
+
                     if (logger.isDebugEnabled())
                         logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
-
                 }
                 catch (ReadTimeoutException ex)
                 {
-                    int blockFor = consistency_level.blockFor(Keyspace.open(exec.command.getKeyspace()));
+                    int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
                     int responseCount = exec.handler.getReceivedCount();
                     String gotData = responseCount > 0
                                    ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
@@ -1273,14 +1274,14 @@ public class StorageProxy implements StorageProxyMBean
 
                     if (repairCommands == null)
                     {
-                        repairCommands = new ArrayList<ReadCommand>();
-                        repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
+                        repairCommands = new ArrayList<>();
+                        repairResponseHandlers = new ArrayList<>();
                     }
                     repairCommands.add(exec.command);
                     repairResponseHandlers.add(repairHandler);
 
                     MessageOut<ReadCommand> message = exec.command.createMessage();
-                    for (InetAddress endpoint : exec.handler.endpoints)
+                    for (InetAddress endpoint : exec.getContactedReplicas())
                     {
                         Tracing.trace("Enqueuing full data read to {}", endpoint);
                         MessagingService.instance().sendRR(message, endpoint, repairHandler);
@@ -1288,8 +1289,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
             }
 
-            if (commandsToRetry != Collections.EMPTY_LIST)
-                commandsToRetry.clear();
+            commandsToRetry.clear();
 
             // read the results for the digest mismatch retries
             if (repairResponseHandlers != null)
@@ -1319,8 +1319,8 @@ public class StorageProxy implements StorageProxyMBean
                     catch (TimeoutException e)
                     {
                         Tracing.trace("Timed out on digest mismatch retries");
-                        int blockFor = consistency_level.blockFor(Keyspace.open(command.getKeyspace()));
-                        throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
+                        int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
+                        throw new ReadTimeoutException(consistencyLevel, blockFor, blockFor, true);
                     }
 
                     // retry any potential short reads
@@ -1329,7 +1329,7 @@ public class StorageProxy implements StorageProxyMBean
                     {
                         Tracing.trace("Issuing retry for read command");
                         if (commandsToRetry == Collections.EMPTY_LIST)
-                            commandsToRetry = new ArrayList<ReadCommand>();
+                            commandsToRetry = new ArrayList<>();
                         commandsToRetry.add(retryCommand);
                         continue;
                     }