You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/26 22:38:55 UTC
[1/3] git commit: Fixes for speculative retry patch by ayeschenko and
jbellis for CASSANDRA-5932
Updated Branches:
refs/heads/cassandra-2.0 006eec4a5 -> 20c419b94
refs/heads/trunk 246fefabf -> 84f6c26aa
Fixes for speculative retry
patch by ayeschenko and jbellis for CASSANDRA-5932
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20c419b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20c419b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20c419b9
Branch: refs/heads/cassandra-2.0
Commit: 20c419b9480e0e5b3c1da53a106b2a6760be35b9
Parents: 006eec4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Sep 26 15:38:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Sep 26 15:38:36 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/AbstractReadExecutor.java | 338 ++++++++++++-------
.../apache/cassandra/service/ReadCallback.java | 2 +-
.../apache/cassandra/service/StorageProxy.java | 30 +-
4 files changed, 226 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc3daf6..3d4d19c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.2
+ * Fixes for speculative retry (CASSANDRA-5932)
* Improve memory usage of metadata min/max column names (CASSANDRA-6077)
* Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
* Fix insertion of collections with CAS (CASSANDRA-6069)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 2ebc0b3..83368c2 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -18,12 +18,17 @@
package org.apache.cassandra.service;
import java.net.InetAddress;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.ReadRepairDecision;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -39,142 +44,225 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.utils.FBUtilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
+ *
+ * Optionally, may perform additional requests to provide redundancy against replica failure:
+ * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while
+ * SpeculatingReadExecutor will wait until it looks like the original request is in danger
+ * of timing out before performing extra reads.
+ */
public abstract class AbstractReadExecutor
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
- protected final ReadCallback<ReadResponse, Row> handler;
+
protected final ReadCommand command;
+ protected final List<InetAddress> targetReplicas;
protected final RowDigestResolver resolver;
- protected final List<InetAddress> unfiltered;
- protected final List<InetAddress> endpoints;
- protected final ColumnFamilyStore cfs;
-
- AbstractReadExecutor(ColumnFamilyStore cfs,
- ReadCommand command,
- ConsistencyLevel consistency_level,
- List<InetAddress> allReplicas,
- List<InetAddress> queryTargets)
- throws UnavailableException
+ protected final ReadCallback<ReadResponse, Row> handler;
+
+ AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
{
- unfiltered = allReplicas;
- this.endpoints = queryTargets;
- this.resolver = new RowDigestResolver(command.ksName, command.key);
- this.handler = new ReadCallback<ReadResponse, Row>(resolver, consistency_level, command, this.endpoints);
this.command = command;
- this.cfs = cfs;
+ this.targetReplicas = targetReplicas;
+ resolver = new RowDigestResolver(command.ksName, command.key);
+ handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas);
+ }
- handler.assureSufficientLiveNodes();
- assert !handler.endpoints.isEmpty();
+ private static boolean isLocalRequest(InetAddress replica)
+ {
+ return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS;
}
- void executeAsync()
+ protected void makeDataRequests(Iterable<InetAddress> endpoints)
{
- // The data-request message is sent to dataPoint, the node that will actually get the data for us
- InetAddress dataPoint = handler.endpoints.get(0);
- if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS)
- {
- logger.trace("reading data locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
- }
- else
+ for (InetAddress endpoint : endpoints)
{
- logger.trace("reading data from {}", dataPoint);
- MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
+ if (isLocalRequest(endpoint))
+ {
+ logger.trace("reading data locally");
+ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+ }
+ else
+ {
+ logger.trace("reading data from {}", endpoint);
+ MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+ }
}
+ }
- if (handler.endpoints.size() == 1)
- return;
-
- // send the other endpoints a digest request
+ protected void makeDigestRequests(Iterable<InetAddress> endpoints)
+ {
ReadCommand digestCommand = command.copy();
digestCommand.setDigestQuery(true);
- MessageOut<?> message = null;
- for (int i = 1; i < handler.endpoints.size(); i++)
+ MessageOut<?> message = digestCommand.createMessage();
+ for (InetAddress endpoint : endpoints)
{
- InetAddress digestPoint = handler.endpoints.get(i);
- if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+ if (isLocalRequest(endpoint))
{
logger.trace("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
- logger.trace("reading digest from {}", digestPoint);
- // (We lazy-construct the digest Message object since it may not be necessary if we
- // are doing a local digest read, or no digest reads at all.)
- if (message == null)
- message = digestCommand.createMessage();
- MessagingService.instance().sendRR(message, digestPoint, handler);
+ logger.trace("reading digest from {}", endpoint);
+ MessagingService.instance().sendRR(message, endpoint, handler);
}
}
}
- void speculate()
- {
- // noop by default.
- }
+ /**
+ * Perform additional requests if it looks like the original will time out. May block while it waits
+ * to see if the original requests are answered first.
+ */
+ public abstract void maybeTryAdditionalReplicas();
- Row get() throws ReadTimeoutException, DigestMismatchException
+ /**
+ * Get the replicas involved in the [finished] request.
+ *
+ * @return target replicas + the extra replica, *IF* we speculated.
+ */
+ public abstract Iterable<InetAddress> getContactedReplicas();
+
+ /**
+ * send the initial set of requests
+ */
+ public abstract void executeAsync();
+
+ /**
+ * wait for an answer. Blocks until success or timeout, so it is caller's
+ * responsibility to call maybeTryAdditionalReplicas first.
+ */
+ public Row get() throws ReadTimeoutException, DigestMismatchException
{
return handler.get();
}
- public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistency_level) throws UnavailableException
+ /**
+ * @return an executor appropriate for the configured speculative read policy
+ */
+ public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException
{
Keyspace keyspace = Keyspace.open(command.ksName);
List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.key);
- CFMetaData metaData = Schema.instance.getCFMetaData(command.ksName, command.cfName);
+ ReadRepairDecision repairDecision = Schema.instance.getCFMetaData(command.ksName, command.cfName).newReadRepairDecision();
+ List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
+
+ // Throw UAE early if we don't have enough replicas.
+ consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
- ReadRepairDecision rrDecision = metaData.newReadRepairDecision();
-
- if (rrDecision != ReadRepairDecision.NONE) {
+ // Fat client. Speculating read executors need access to cfs metrics and sampled latency, and fat clients
+ // can't provide that. So, for now, fat clients will always use NeverSpeculatingReadExecutor.
+ if (StorageService.instance.isClientMode())
+ return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+
+ if (repairDecision != ReadRepairDecision.NONE)
ReadRepairMetrics.attempted.mark();
- }
- List<InetAddress> queryTargets = consistency_level.filterForQuery(keyspace, allReplicas, rrDecision);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
+ RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
- if (StorageService.instance.isClientMode())
+ // Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
+ if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) == allReplicas.size())
+ return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+
+ if (targetReplicas.size() == allReplicas.size())
{
- return new DefaultReadExecutor(null, command, consistency_level, allReplicas, queryTargets);
+ // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
+ // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
+ // (same amount of requests in total, but we turn 1 digest request into a full blown data request).
+ return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
}
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
-
- switch (metaData.getSpeculativeRetry().type)
+ // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
+ InetAddress extraReplica = allReplicas.get(targetReplicas.size());
+ // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so
+ // we might have to find a replacement that's not already in targetReplicas.
+ if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica))
{
- case ALWAYS:
- return new SpeculateAlwaysExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
- case PERCENTILE:
- case CUSTOM:
- return queryTargets.size() < allReplicas.size()
- ? new SpeculativeReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets)
- : new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
- default:
- return new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+ for (InetAddress address : allReplicas)
+ {
+ if (!targetReplicas.contains(address))
+ {
+ extraReplica = address;
+ break;
+ }
+ }
}
+ targetReplicas.add(extraReplica);
+
+ if (retryType == RetryType.ALWAYS)
+ return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
+ else // PERCENTILE or CUSTOM.
+ return new SpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
}
- private static class DefaultReadExecutor extends AbstractReadExecutor
+ private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
{
- public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ public NeverSpeculatingReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+ {
+ super(command, consistencyLevel, targetReplicas);
+ }
+
+ public void executeAsync()
{
- super(cfs, command, consistency_level, allReplicas, queryTargets);
+ makeDataRequests(targetReplicas.subList(0, 1));
+ if (targetReplicas.size() > 1)
+ makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
+ }
+
+ public void maybeTryAdditionalReplicas()
+ {
+ // no-op
+ }
+
+ public Iterable<InetAddress> getContactedReplicas()
+ {
+ return targetReplicas;
}
}
- private static class SpeculativeReadExecutor extends AbstractReadExecutor
+ private static class SpeculatingReadExecutor extends AbstractReadExecutor
{
- public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ private final ColumnFamilyStore cfs;
+ private volatile boolean speculated = false;
+
+ public SpeculatingReadExecutor(ColumnFamilyStore cfs,
+ ReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ List<InetAddress> targetReplicas)
{
- super(cfs, command, consistency_level, allReplicas, queryTargets);
- assert handler.endpoints.size() < unfiltered.size();
+ super(command, consistencyLevel, targetReplicas);
+ this.cfs = cfs;
}
- @Override
- void speculate()
+ public void executeAsync()
+ {
+ // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know
+ // that the last replica in our list is "extra."
+ List<InetAddress> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
+
+ if (handler.blockfor < initialReplicas.size())
+ {
+ // We're hitting additional targets for read repair. Since our "extra" replica is the least-
+ // preferred by the snitch, we do an extra data read to start with against a replica more
+ // likely to reply; better to let RR fail than the entire query.
+ makeDataRequests(initialReplicas.subList(0, 2));
+ if (initialReplicas.size() > 2)
+ makeDigestRequests(initialReplicas.subList(2, initialReplicas.size()));
+ }
+ else
+ {
+ // not doing read repair; all replies are important, so it doesn't matter which nodes we
+ // perform data reads against vs digest.
+ makeDataRequests(initialReplicas.subList(0, 1));
+ if (initialReplicas.size() > 1)
+ makeDigestRequests(initialReplicas.subList(1, initialReplicas.size()));
+ }
+ }
+
+ public void maybeTryAdditionalReplicas()
{
// no latency information, or we're overloaded
if (cfs.sampleLatency > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
@@ -182,69 +270,61 @@ public abstract class AbstractReadExecutor
if (!handler.await(cfs.sampleLatency, TimeUnit.NANOSECONDS))
{
- InetAddress endpoint = unfiltered.get(handler.endpoints.size());
-
- // could be waiting on the data, or on enough digests
- ReadCommand scommand = command;
+ // Could be waiting on the data, or on enough digests.
+ ReadCommand retryCommand = command;
if (resolver.getData() != null)
{
- scommand = command.copy();
- scommand.setDigestQuery(true);
+ retryCommand = command.copy();
+ retryCommand.setDigestQuery(true);
}
- logger.trace("Speculating read retry on {}", endpoint);
- MessagingService.instance().sendRR(scommand.createMessage(), endpoint, handler);
+ InetAddress extraReplica = Iterables.getLast(targetReplicas);
+ logger.trace("speculating read retry on {}", extraReplica);
+ MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
+ speculated = true;
+
cfs.metric.speculativeRetry.inc();
}
}
+
+ public Iterable<InetAddress> getContactedReplicas()
+ {
+ return speculated
+ ? targetReplicas
+ : targetReplicas.subList(0, targetReplicas.size() - 1);
+ }
}
- private static class SpeculateAlwaysExecutor extends AbstractReadExecutor
+ private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
{
- public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ private final ColumnFamilyStore cfs;
+
+ public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
+ ReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ List<InetAddress> targetReplicas)
{
- super(cfs, command, consistency_level, allReplicas, queryTargets);
+ super(command, consistencyLevel, targetReplicas);
+ this.cfs = cfs;
}
- @Override
- void executeAsync()
+ public void maybeTryAdditionalReplicas()
{
- int limit = unfiltered.size() >= 2 ? 2 : 1;
- for (int i = 0; i < limit; i++)
- {
- InetAddress endpoint = unfiltered.get(i);
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- {
- logger.trace("reading full data locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
- }
- else
- {
- logger.trace("reading full data from {}", endpoint);
- MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
- }
- }
- if (handler.endpoints.size() <= limit)
- return;
+ // no-op
+ }
- ReadCommand digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- MessageOut<?> message = digestCommand.createMessage();
- for (int i = limit; i < handler.endpoints.size(); i++)
- {
- // Send the message
- InetAddress endpoint = handler.endpoints.get(i);
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- {
- logger.trace("reading data locally, isDigest: {}", command.isDigestQuery());
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
- }
- else
- {
- logger.trace("reading full data from {}, isDigest: {}", endpoint, command.isDigestQuery());
- MessagingService.instance().sendRR(message, endpoint, handler);
- }
- }
+ public Iterable<InetAddress> getContactedReplicas()
+ {
+ return targetReplicas;
+ }
+
+ @Override
+ public void executeAsync()
+ {
+ makeDataRequests(targetReplicas.subList(0, 2));
+ if (targetReplicas.size() > 2)
+ makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
+ cfs.metric.speculativeRetry.inc();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 58847ba..7f9c192 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -50,7 +50,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
public final IResponseResolver<TMessage, TResolved> resolver;
private final SimpleCondition condition = new SimpleCondition();
final long start;
- private final int blockfor;
+ final int blockfor;
final List<InetAddress> endpoints;
private final IReadCommand command;
private final ConsistencyLevel consistencyLevel;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e0d5dff..ffc65b9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1197,10 +1197,11 @@ public class StorageProxy implements StorageProxyMBean
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
- private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level)
+ private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
throws UnavailableException, ReadTimeoutException
{
- List<Row> rows = new ArrayList<Row>(initialCommands.size());
+ List<Row> rows = new ArrayList<>(initialCommands.size());
+ // (avoid allocating a new list in the common case of nothing-to-retry)
List<ReadCommand> commandsToRetry = Collections.emptyList();
do
@@ -1217,13 +1218,13 @@ public class StorageProxy implements StorageProxyMBean
ReadCommand command = commands.get(i);
assert !command.isDigestQuery();
- AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
+ AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
exec.executeAsync();
readExecutors[i] = exec;
}
- for (AbstractReadExecutor exec: readExecutors)
- exec.speculate();
+ for (AbstractReadExecutor exec : readExecutors)
+ exec.maybeTryAdditionalReplicas();
// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
@@ -1238,13 +1239,13 @@ public class StorageProxy implements StorageProxyMBean
exec.command.maybeTrim(row);
rows.add(row);
}
+
if (logger.isDebugEnabled())
logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
-
}
catch (ReadTimeoutException ex)
{
- int blockFor = consistency_level.blockFor(Keyspace.open(exec.command.getKeyspace()));
+ int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
int responseCount = exec.handler.getReceivedCount();
String gotData = responseCount > 0
? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
@@ -1273,14 +1274,14 @@ public class StorageProxy implements StorageProxyMBean
if (repairCommands == null)
{
- repairCommands = new ArrayList<ReadCommand>();
- repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
+ repairCommands = new ArrayList<>();
+ repairResponseHandlers = new ArrayList<>();
}
repairCommands.add(exec.command);
repairResponseHandlers.add(repairHandler);
MessageOut<ReadCommand> message = exec.command.createMessage();
- for (InetAddress endpoint : exec.handler.endpoints)
+ for (InetAddress endpoint : exec.getContactedReplicas())
{
Tracing.trace("Enqueuing full data read to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, repairHandler);
@@ -1288,8 +1289,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- if (commandsToRetry != Collections.EMPTY_LIST)
- commandsToRetry.clear();
+ commandsToRetry.clear();
// read the results for the digest mismatch retries
if (repairResponseHandlers != null)
@@ -1319,8 +1319,8 @@ public class StorageProxy implements StorageProxyMBean
catch (TimeoutException e)
{
Tracing.trace("Timed out on digest mismatch retries");
- int blockFor = consistency_level.blockFor(Keyspace.open(command.getKeyspace()));
- throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
+ int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
+ throw new ReadTimeoutException(consistencyLevel, blockFor, blockFor, true);
}
// retry any potential short reads
@@ -1329,7 +1329,7 @@ public class StorageProxy implements StorageProxyMBean
{
Tracing.trace("Issuing retry for read command");
if (commandsToRetry == Collections.EMPTY_LIST)
- commandsToRetry = new ArrayList<ReadCommand>();
+ commandsToRetry = new ArrayList<>();
commandsToRetry.add(retryCommand);
continue;
}
[3/3] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84f6c26a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84f6c26a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84f6c26a
Branch: refs/heads/trunk
Commit: 84f6c26aaf7cba286f8b98a02cf408fa5bb2131a
Parents: 246fefa 20c419b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Sep 26 15:38:44 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Sep 26 15:38:44 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/AbstractReadExecutor.java | 338 ++++++++++++-------
.../apache/cassandra/service/ReadCallback.java | 2 +-
.../apache/cassandra/service/StorageProxy.java | 30 +-
4 files changed, 226 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f6c26a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d2b1310,3d4d19c..d475170
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
+2.1
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+
+
2.0.2
+ * Fixes for speculative retry (CASSANDRA-5932)
* Improve memory usage of metadata min/max column names (CASSANDRA-6077)
* Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
* Fix insertion of collections with CAS (CASSANDRA-6069)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84f6c26a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
[2/3] git commit: Fixes for speculative retry patch by ayeschenko and
jbellis for CASSANDRA-5932
Posted by jb...@apache.org.
Fixes for speculative retry
patch by ayeschenko and jbellis for CASSANDRA-5932
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20c419b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20c419b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20c419b9
Branch: refs/heads/trunk
Commit: 20c419b9480e0e5b3c1da53a106b2a6760be35b9
Parents: 006eec4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Sep 26 15:38:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Sep 26 15:38:36 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/AbstractReadExecutor.java | 338 ++++++++++++-------
.../apache/cassandra/service/ReadCallback.java | 2 +-
.../apache/cassandra/service/StorageProxy.java | 30 +-
4 files changed, 226 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc3daf6..3d4d19c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.2
+ * Fixes for speculative retry (CASSANDRA-5932)
* Improve memory usage of metadata min/max column names (CASSANDRA-6077)
* Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
* Fix insertion of collections with CAS (CASSANDRA-6069)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 2ebc0b3..83368c2 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -18,12 +18,17 @@
package org.apache.cassandra.service;
import java.net.InetAddress;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.ReadRepairDecision;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -39,142 +44,225 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.utils.FBUtilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
+ *
+ * Optionally, may perform additional requests to provide redundancy against replica failure:
+ * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while
+ * SpeculatingReadExecutor will wait until it looks like the original request is in danger
+ * of timing out before performing extra reads.
+ */
public abstract class AbstractReadExecutor
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
- protected final ReadCallback<ReadResponse, Row> handler;
+
protected final ReadCommand command;
+ protected final List<InetAddress> targetReplicas;
protected final RowDigestResolver resolver;
- protected final List<InetAddress> unfiltered;
- protected final List<InetAddress> endpoints;
- protected final ColumnFamilyStore cfs;
-
- AbstractReadExecutor(ColumnFamilyStore cfs,
- ReadCommand command,
- ConsistencyLevel consistency_level,
- List<InetAddress> allReplicas,
- List<InetAddress> queryTargets)
- throws UnavailableException
+ protected final ReadCallback<ReadResponse, Row> handler;
+
+ AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
{
- unfiltered = allReplicas;
- this.endpoints = queryTargets;
- this.resolver = new RowDigestResolver(command.ksName, command.key);
- this.handler = new ReadCallback<ReadResponse, Row>(resolver, consistency_level, command, this.endpoints);
this.command = command;
- this.cfs = cfs;
+ this.targetReplicas = targetReplicas;
+ resolver = new RowDigestResolver(command.ksName, command.key);
+ handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas);
+ }
- handler.assureSufficientLiveNodes();
- assert !handler.endpoints.isEmpty();
+ private static boolean isLocalRequest(InetAddress replica)
+ {
+ return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS;
}
- void executeAsync()
+ protected void makeDataRequests(Iterable<InetAddress> endpoints)
{
- // The data-request message is sent to dataPoint, the node that will actually get the data for us
- InetAddress dataPoint = handler.endpoints.get(0);
- if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS)
- {
- logger.trace("reading data locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
- }
- else
+ for (InetAddress endpoint : endpoints)
{
- logger.trace("reading data from {}", dataPoint);
- MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
+ if (isLocalRequest(endpoint))
+ {
+ logger.trace("reading data locally");
+ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+ }
+ else
+ {
+ logger.trace("reading data from {}", endpoint);
+ MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+ }
}
+ }
- if (handler.endpoints.size() == 1)
- return;
-
- // send the other endpoints a digest request
+ protected void makeDigestRequests(Iterable<InetAddress> endpoints)
+ {
ReadCommand digestCommand = command.copy();
digestCommand.setDigestQuery(true);
- MessageOut<?> message = null;
- for (int i = 1; i < handler.endpoints.size(); i++)
+ MessageOut<?> message = digestCommand.createMessage();
+ for (InetAddress endpoint : endpoints)
{
- InetAddress digestPoint = handler.endpoints.get(i);
- if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+ if (isLocalRequest(endpoint))
{
logger.trace("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
- logger.trace("reading digest from {}", digestPoint);
- // (We lazy-construct the digest Message object since it may not be necessary if we
- // are doing a local digest read, or no digest reads at all.)
- if (message == null)
- message = digestCommand.createMessage();
- MessagingService.instance().sendRR(message, digestPoint, handler);
+ logger.trace("reading digest from {}", endpoint);
+ MessagingService.instance().sendRR(message, endpoint, handler);
}
}
}
- void speculate()
- {
- // noop by default.
- }
+ /**
+ * Perform additional requests if it looks like the original will time out. May block while it waits
+ * to see if the original requests are answered first.
+ */
+ public abstract void maybeTryAdditionalReplicas();
- Row get() throws ReadTimeoutException, DigestMismatchException
+ /**
+ * Get the replicas involved in the [finished] request.
+ *
+ * @return target replicas + the extra replica, *IF* we speculated.
+ */
+ public abstract Iterable<InetAddress> getContactedReplicas();
+
+ /**
+ * send the initial set of requests
+ */
+ public abstract void executeAsync();
+
+ /**
+ * wait for an answer. Blocks until success or timeout, so it is caller's
+ * responsibility to call maybeTryAdditionalReplicas first.
+ */
+ public Row get() throws ReadTimeoutException, DigestMismatchException
{
return handler.get();
}
- public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistency_level) throws UnavailableException
+ /**
+ * @return an executor appropriate for the configured speculative read policy
+ */
+ public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException
{
Keyspace keyspace = Keyspace.open(command.ksName);
List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.key);
- CFMetaData metaData = Schema.instance.getCFMetaData(command.ksName, command.cfName);
+ ReadRepairDecision repairDecision = Schema.instance.getCFMetaData(command.ksName, command.cfName).newReadRepairDecision();
+ List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
+
+ // Throw UAE early if we don't have enough replicas.
+ consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
- ReadRepairDecision rrDecision = metaData.newReadRepairDecision();
-
- if (rrDecision != ReadRepairDecision.NONE) {
+ // Fat client. Speculating read executors need access to cfs metrics and sampled latency, and fat clients
+ // can't provide that. So, for now, fat clients will always use NeverSpeculatingReadExecutor.
+ if (StorageService.instance.isClientMode())
+ return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+
+ if (repairDecision != ReadRepairDecision.NONE)
ReadRepairMetrics.attempted.mark();
- }
- List<InetAddress> queryTargets = consistency_level.filterForQuery(keyspace, allReplicas, rrDecision);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
+ RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
- if (StorageService.instance.isClientMode())
+ // Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
+ if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) == allReplicas.size())
+ return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+
+ if (targetReplicas.size() == allReplicas.size())
{
- return new DefaultReadExecutor(null, command, consistency_level, allReplicas, queryTargets);
+ // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
+ // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
+ // (same amount of requests in total, but we turn 1 digest request into a full blown data request).
+ return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
}
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
-
- switch (metaData.getSpeculativeRetry().type)
+ // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
+ InetAddress extraReplica = allReplicas.get(targetReplicas.size());
+ // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so
+ // we might have to find a replacement that's not already in targetReplicas.
+ if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica))
{
- case ALWAYS:
- return new SpeculateAlwaysExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
- case PERCENTILE:
- case CUSTOM:
- return queryTargets.size() < allReplicas.size()
- ? new SpeculativeReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets)
- : new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
- default:
- return new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+ for (InetAddress address : allReplicas)
+ {
+ if (!targetReplicas.contains(address))
+ {
+ extraReplica = address;
+ break;
+ }
+ }
}
+ targetReplicas.add(extraReplica);
+
+ if (retryType == RetryType.ALWAYS)
+ return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
+ else // PERCENTILE or CUSTOM.
+ return new SpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
}
- private static class DefaultReadExecutor extends AbstractReadExecutor
+ private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
{
- public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ public NeverSpeculatingReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+ {
+ super(command, consistencyLevel, targetReplicas);
+ }
+
+ public void executeAsync()
{
- super(cfs, command, consistency_level, allReplicas, queryTargets);
+ makeDataRequests(targetReplicas.subList(0, 1));
+ if (targetReplicas.size() > 1)
+ makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
+ }
+
+ public void maybeTryAdditionalReplicas()
+ {
+ // no-op
+ }
+
+ public Iterable<InetAddress> getContactedReplicas()
+ {
+ return targetReplicas;
}
}
- private static class SpeculativeReadExecutor extends AbstractReadExecutor
+ private static class SpeculatingReadExecutor extends AbstractReadExecutor
{
- public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ private final ColumnFamilyStore cfs;
+ private volatile boolean speculated = false;
+
+ public SpeculatingReadExecutor(ColumnFamilyStore cfs,
+ ReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ List<InetAddress> targetReplicas)
{
- super(cfs, command, consistency_level, allReplicas, queryTargets);
- assert handler.endpoints.size() < unfiltered.size();
+ super(command, consistencyLevel, targetReplicas);
+ this.cfs = cfs;
}
- @Override
- void speculate()
+ public void executeAsync()
+ {
+ // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know
+ // that the last replica in our list is "extra."
+ List<InetAddress> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
+
+ if (handler.blockfor < initialReplicas.size())
+ {
+ // We're hitting additional targets for read repair. Since our "extra" replica is the least-
+ // preferred by the snitch, we do an extra data read to start with against a replica more
+ // likely to reply; better to let RR fail than the entire query.
+ makeDataRequests(initialReplicas.subList(0, 2));
+ if (initialReplicas.size() > 2)
+ makeDigestRequests(initialReplicas.subList(2, initialReplicas.size()));
+ }
+ else
+ {
+ // not doing read repair; all replies are important, so it doesn't matter which nodes we
+ // perform data reads against vs digest.
+ makeDataRequests(initialReplicas.subList(0, 1));
+ if (initialReplicas.size() > 1)
+ makeDigestRequests(initialReplicas.subList(1, initialReplicas.size()));
+ }
+ }
+
+ public void maybeTryAdditionalReplicas()
{
// no latency information, or we're overloaded
if (cfs.sampleLatency > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
@@ -182,69 +270,61 @@ public abstract class AbstractReadExecutor
if (!handler.await(cfs.sampleLatency, TimeUnit.NANOSECONDS))
{
- InetAddress endpoint = unfiltered.get(handler.endpoints.size());
-
- // could be waiting on the data, or on enough digests
- ReadCommand scommand = command;
+ // Could be waiting on the data, or on enough digests.
+ ReadCommand retryCommand = command;
if (resolver.getData() != null)
{
- scommand = command.copy();
- scommand.setDigestQuery(true);
+ retryCommand = command.copy();
+ retryCommand.setDigestQuery(true);
}
- logger.trace("Speculating read retry on {}", endpoint);
- MessagingService.instance().sendRR(scommand.createMessage(), endpoint, handler);
+ InetAddress extraReplica = Iterables.getLast(targetReplicas);
+ logger.trace("speculating read retry on {}", extraReplica);
+ MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
+ speculated = true;
+
cfs.metric.speculativeRetry.inc();
}
}
+
+ public Iterable<InetAddress> getContactedReplicas()
+ {
+ return speculated
+ ? targetReplicas
+ : targetReplicas.subList(0, targetReplicas.size() - 1);
+ }
}
- private static class SpeculateAlwaysExecutor extends AbstractReadExecutor
+ private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
{
- public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+ private final ColumnFamilyStore cfs;
+
+ public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
+ ReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ List<InetAddress> targetReplicas)
{
- super(cfs, command, consistency_level, allReplicas, queryTargets);
+ super(command, consistencyLevel, targetReplicas);
+ this.cfs = cfs;
}
- @Override
- void executeAsync()
+ public void maybeTryAdditionalReplicas()
{
- int limit = unfiltered.size() >= 2 ? 2 : 1;
- for (int i = 0; i < limit; i++)
- {
- InetAddress endpoint = unfiltered.get(i);
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- {
- logger.trace("reading full data locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
- }
- else
- {
- logger.trace("reading full data from {}", endpoint);
- MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
- }
- }
- if (handler.endpoints.size() <= limit)
- return;
+ // no-op
+ }
- ReadCommand digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- MessageOut<?> message = digestCommand.createMessage();
- for (int i = limit; i < handler.endpoints.size(); i++)
- {
- // Send the message
- InetAddress endpoint = handler.endpoints.get(i);
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- {
- logger.trace("reading data locally, isDigest: {}", command.isDigestQuery());
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
- }
- else
- {
- logger.trace("reading full data from {}, isDigest: {}", endpoint, command.isDigestQuery());
- MessagingService.instance().sendRR(message, endpoint, handler);
- }
- }
+ public Iterable<InetAddress> getContactedReplicas()
+ {
+ return targetReplicas;
+ }
+
+ @Override
+ public void executeAsync()
+ {
+ makeDataRequests(targetReplicas.subList(0, 2));
+ if (targetReplicas.size() > 2)
+ makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
+ cfs.metric.speculativeRetry.inc();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 58847ba..7f9c192 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -50,7 +50,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
public final IResponseResolver<TMessage, TResolved> resolver;
private final SimpleCondition condition = new SimpleCondition();
final long start;
- private final int blockfor;
+ final int blockfor;
final List<InetAddress> endpoints;
private final IReadCommand command;
private final ConsistencyLevel consistencyLevel;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20c419b9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e0d5dff..ffc65b9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1197,10 +1197,11 @@ public class StorageProxy implements StorageProxyMBean
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
- private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level)
+ private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
throws UnavailableException, ReadTimeoutException
{
- List<Row> rows = new ArrayList<Row>(initialCommands.size());
+ List<Row> rows = new ArrayList<>(initialCommands.size());
+ // (avoid allocating a new list in the common case of nothing-to-retry)
List<ReadCommand> commandsToRetry = Collections.emptyList();
do
@@ -1217,13 +1218,13 @@ public class StorageProxy implements StorageProxyMBean
ReadCommand command = commands.get(i);
assert !command.isDigestQuery();
- AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
+ AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
exec.executeAsync();
readExecutors[i] = exec;
}
- for (AbstractReadExecutor exec: readExecutors)
- exec.speculate();
+ for (AbstractReadExecutor exec : readExecutors)
+ exec.maybeTryAdditionalReplicas();
// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
@@ -1238,13 +1239,13 @@ public class StorageProxy implements StorageProxyMBean
exec.command.maybeTrim(row);
rows.add(row);
}
+
if (logger.isDebugEnabled())
logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
-
}
catch (ReadTimeoutException ex)
{
- int blockFor = consistency_level.blockFor(Keyspace.open(exec.command.getKeyspace()));
+ int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
int responseCount = exec.handler.getReceivedCount();
String gotData = responseCount > 0
? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
@@ -1273,14 +1274,14 @@ public class StorageProxy implements StorageProxyMBean
if (repairCommands == null)
{
- repairCommands = new ArrayList<ReadCommand>();
- repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
+ repairCommands = new ArrayList<>();
+ repairResponseHandlers = new ArrayList<>();
}
repairCommands.add(exec.command);
repairResponseHandlers.add(repairHandler);
MessageOut<ReadCommand> message = exec.command.createMessage();
- for (InetAddress endpoint : exec.handler.endpoints)
+ for (InetAddress endpoint : exec.getContactedReplicas())
{
Tracing.trace("Enqueuing full data read to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, repairHandler);
@@ -1288,8 +1289,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- if (commandsToRetry != Collections.EMPTY_LIST)
- commandsToRetry.clear();
+ commandsToRetry.clear();
// read the results for the digest mismatch retries
if (repairResponseHandlers != null)
@@ -1319,8 +1319,8 @@ public class StorageProxy implements StorageProxyMBean
catch (TimeoutException e)
{
Tracing.trace("Timed out on digest mismatch retries");
- int blockFor = consistency_level.blockFor(Keyspace.open(command.getKeyspace()));
- throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
+ int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
+ throw new ReadTimeoutException(consistencyLevel, blockFor, blockFor, true);
}
// retry any potential short reads
@@ -1329,7 +1329,7 @@ public class StorageProxy implements StorageProxyMBean
{
Tracing.trace("Issuing retry for read command");
if (commandsToRetry == Collections.EMPTY_LIST)
- commandsToRetry = new ArrayList<ReadCommand>();
+ commandsToRetry = new ArrayList<>();
commandsToRetry.add(retryCommand);
continue;
}