You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:00 UTC
[08/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 61b9948..031326e 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -17,11 +17,12 @@
*/
package org.apache.cassandra.service.reads;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.locator.ReplicaLayout;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,15 +38,20 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
+import static com.google.common.collect.Iterables.all;
+import static com.google.common.collect.Iterables.tryFind;
+
/**
* Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
*
@@ -59,32 +65,27 @@ public abstract class AbstractReadExecutor
private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
protected final ReadCommand command;
- protected final ConsistencyLevel consistency;
- protected final List<InetAddressAndPort> targetReplicas;
- protected final ReadRepair readRepair;
- protected final DigestResolver digestResolver;
- protected final ReadCallback handler;
+ private final ReplicaLayout.ForToken replicaLayout;
+ protected final ReadRepair<EndpointsForToken, ReplicaLayout.ForToken> readRepair;
+ protected final DigestResolver<EndpointsForToken, ReplicaLayout.ForToken> digestResolver;
+ protected final ReadCallback<EndpointsForToken, ReplicaLayout.ForToken> handler;
protected final TraceState traceState;
protected final ColumnFamilyStore cfs;
protected final long queryStartNanoTime;
+ private final int initialDataRequestCount;
protected volatile PartitionIterator result = null;
- protected final Keyspace keyspace;
- protected final int blockFor;
-
- AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime)
+ AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, int initialDataRequestCount, long queryStartNanoTime)
{
this.command = command;
- this.consistency = consistency;
- this.targetReplicas = targetReplicas;
- this.readRepair = ReadRepair.create(command, queryStartNanoTime, consistency);
- this.digestResolver = new DigestResolver(keyspace, command, consistency, readRepair, targetReplicas.size());
- this.handler = new ReadCallback(digestResolver, consistency, command, targetReplicas, queryStartNanoTime);
+ this.replicaLayout = replicaLayout;
+ this.initialDataRequestCount = initialDataRequestCount;
+ this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
+ this.digestResolver = new DigestResolver<>(command, replicaLayout, readRepair, queryStartNanoTime);
+ this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime);
this.cfs = cfs;
this.traceState = Tracing.instance.get();
this.queryStartNanoTime = queryStartNanoTime;
- this.keyspace = keyspace;
- this.blockFor = consistency.blockFor(keyspace);
// Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
@@ -92,8 +93,8 @@ public abstract class AbstractReadExecutor
// TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
// we stop being compatible with pre-3.0 nodes.
int digestVersion = MessagingService.current_version;
- for (InetAddressAndPort replica : targetReplicas)
- digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica));
+ for (Replica replica : replicaLayout.selected())
+ digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica.endpoint()));
command.setDigestVersion(digestVersion);
}
@@ -109,24 +110,34 @@ public abstract class AbstractReadExecutor
return readRepair;
}
- protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
+ protected void makeFullDataRequests(ReplicaCollection<?> replicas)
{
- makeRequests(command, endpoints);
+ assert all(replicas, Replica::isFull);
+ makeRequests(command, replicas.filter(Replica::isFull));
+ }
+ protected void makeTransientDataRequests(ReplicaCollection<?> replicas)
+ {
+ makeRequests(command.copyAsTransientQuery(), replicas);
}
- protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
+ protected void makeDigestRequests(ReplicaCollection<?> replicas)
{
- makeRequests(command.copyAsDigestQuery(), endpoints);
+ assert all(replicas, Replica::isFull);
+ // only send digest requests to full replicas, send data requests instead to the transient replicas
+ makeRequests(command.copyAsDigestQuery(), replicas);
}
- private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints)
+ private void makeRequests(ReadCommand readCommand, ReplicaCollection<?> replicas)
{
boolean hasLocalEndpoint = false;
- for (InetAddressAndPort endpoint : endpoints)
+ Preconditions.checkArgument(replicas.stream().allMatch(replica -> replica.isFull() || !readCommand.isDigestQuery()),
+ "Can not send digest requests to transient replicas");
+ for (Replica replica: replicas)
{
- if (StorageProxy.canDoLocalRequest(endpoint))
+ InetAddressAndPort endpoint = replica.endpoint();
+ if (replica.isLocal())
{
hasLocalEndpoint = true;
continue;
@@ -134,7 +145,6 @@ public abstract class AbstractReadExecutor
if (traceState != null)
traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
- logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
MessageOut<ReadCommand> message = readCommand.createMessage();
MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
@@ -154,16 +164,16 @@ public abstract class AbstractReadExecutor
public abstract void maybeTryAdditionalReplicas();
/**
- * Get the replicas involved in the [finished] request.
- *
- * @return target replicas + the extra replica, *IF* we speculated.
- */
- public abstract List<InetAddressAndPort> getContactedReplicas();
-
- /**
* send the initial set of requests
*/
- public abstract void executeAsync();
+ public void executeAsync()
+ {
+ EndpointsForToken selected = replicaLayout().selected();
+ EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount);
+ makeFullDataRequests(fullDataRequests);
+ makeTransientDataRequests(selected.filter(Replica::isTransient));
+ makeDigestRequests(selected.filter(r -> r.isFull() && !fullDataRequests.contains(r)));
+ }
/**
* @return an executor appropriate for the configured speculative read policy
@@ -171,34 +181,33 @@ public abstract class AbstractReadExecutor
public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
{
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-
- List<InetAddressAndPort> allLiveReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
- List<InetAddressAndPort> selectedReplicas = consistencyLevel.filterForQuery(keyspace, allLiveReplicas);
-
- // Throw UAE early if we don't have enough replicas.
- consistencyLevel.assureSufficientLiveNodes(keyspace, selectedReplicas);
-
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
+ // Endpoints for Token
+ ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
+
// Speculative retry is disabled *OR*
// 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
- return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, selectedReplicas, queryStartNanoTime, false);
+ // TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0
+ return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false);
// There are simply no extra replicas to speculate.
// Handle this separately so it can record failed attempts to speculate due to lack of replicas
- if (selectedReplicas.size() == allLiveReplicas.size())
+ if (replicaLayout.selected().size() == replicaLayout.all().size())
{
boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
- return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, selectedReplicas, queryStartNanoTime, recordFailedSpeculation);
+ return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation);
}
- selectedReplicas.add(allLiveReplicas.get(selectedReplicas.size()));
+ // If CL.ALL, upgrade to AlwaysSpeculating;
+ // If We are going to contact every node anyway, ask for 2 full data requests instead of 1, for redundancy
+ // (same amount of requests in total, but we turn 1 digest request into a full blown data request)
if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
- return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, selectedReplicas, queryStartNanoTime);
+ return new AlwaysSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
else // PERCENTILE or CUSTOM.
- return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, selectedReplicas, queryStartNanoTime);
+ return new SpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
}
/**
@@ -208,10 +217,15 @@ public abstract class AbstractReadExecutor
boolean shouldSpeculateAndMaybeWait()
{
// no latency information, or we're overloaded
- if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
+ if (cfs.sampleReadLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
return false;
- return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
+ return !handler.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS);
+ }
+
+ ReplicaLayout.ForToken replicaLayout()
+ {
+ return replicaLayout;
}
void onReadTimeout() {}
@@ -223,78 +237,36 @@ public abstract class AbstractReadExecutor
* log it is as a failure if it should have happened
* but couldn't due to lack of replicas
*/
- private final boolean recordFailedSpeculation;
-
- NeverSpeculatingReadExecutor(Keyspace keyspace,
- ColumnFamilyStore cfs,
- ReadCommand command,
- ConsistencyLevel consistencyLevel,
- List<InetAddressAndPort> targetReplicas,
- long queryStartNanoTime,
- boolean recordFailedSpeculation)
- {
- super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
- this.recordFailedSpeculation = recordFailedSpeculation;
- }
+ private final boolean logFailedSpeculation;
- public void executeAsync()
+ public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, long queryStartNanoTime, boolean logFailedSpeculation)
{
- makeDataRequests(targetReplicas.subList(0, 1));
- if (targetReplicas.size() > 1)
- makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
+ super(cfs, command, replicaLayout, 1, queryStartNanoTime);
+ this.logFailedSpeculation = logFailedSpeculation;
}
public void maybeTryAdditionalReplicas()
{
- if (shouldSpeculateAndMaybeWait() && recordFailedSpeculation)
+ if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
{
cfs.metric.speculativeInsufficientReplicas.inc();
}
}
-
- public List<InetAddressAndPort> getContactedReplicas()
- {
- return targetReplicas;
- }
}
static class SpeculatingReadExecutor extends AbstractReadExecutor
{
private volatile boolean speculated = false;
- public SpeculatingReadExecutor(Keyspace keyspace,
- ColumnFamilyStore cfs,
+ public SpeculatingReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
- ConsistencyLevel consistencyLevel,
- List<InetAddressAndPort> targetReplicas,
+ ReplicaLayout.ForToken replicaLayout,
long queryStartNanoTime)
{
- super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
- }
-
- public void executeAsync()
- {
- // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know
- // that the last replica in our list is "extra."
- List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
-
- if (handler.blockfor < initialReplicas.size())
- {
- // We're hitting additional targets for read repair. Since our "extra" replica is the least-
- // preferred by the snitch, we do an extra data read to start with against a replica more
- // likely to reply; better to let RR fail than the entire query.
- makeDataRequests(initialReplicas.subList(0, 2));
- if (initialReplicas.size() > 2)
- makeDigestRequests(initialReplicas.subList(2, initialReplicas.size()));
- }
- else
- {
- // not doing read repair; all replies are important, so it doesn't matter which nodes we
- // perform data reads against vs digest.
- makeDataRequests(initialReplicas.subList(0, 1));
- if (initialReplicas.size() > 1)
- makeDigestRequests(initialReplicas.subList(1, initialReplicas.size()));
- }
+ // We're hitting additional targets for read repair (??). Since our "extra" replica is the least-
+ // preferred by the snitch, we do an extra data read to start with against a replica more
+ // likely to reply; better to let RR fail than the entire query.
+ super(cfs, command, replicaLayout, replicaLayout.blockFor() < replicaLayout.selected().size() ? 2 : 1, queryStartNanoTime);
}
public void maybeTryAdditionalReplicas()
@@ -302,28 +274,43 @@ public abstract class AbstractReadExecutor
if (shouldSpeculateAndMaybeWait())
{
//Handle speculation stats first in case the callback fires immediately
- speculated = true;
cfs.metric.speculativeRetries.inc();
- // Could be waiting on the data, or on enough digests.
+ speculated = true;
+
ReadCommand retryCommand = command;
+ Replica extraReplica;
if (handler.resolver.isDataPresent())
- retryCommand = command.copyAsDigestQuery();
+ {
+ extraReplica = tryFind(replicaLayout().all(),
+ r -> !replicaLayout().selected().contains(r)).orNull();
+
+ // we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
+ assert extraReplica != null;
+
+ retryCommand = extraReplica.isTransient()
+ ? command.copyAsTransientQuery()
+ : command.copyAsDigestQuery();
+ }
+ else
+ {
+ extraReplica = tryFind(replicaLayout().all(),
+ r -> r.isFull() && !replicaLayout().selected().contains(r)).orNull();
+ if (extraReplica == null)
+ {
+ cfs.metric.speculativeInsufficientReplicas.inc();
+ // cannot safely speculate a new data request, without more work - requests assumed to be
+ // unique per endpoint, and we have no full nodes left to speculate against
+ return;
+ }
+ }
- InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas);
if (traceState != null)
traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
- MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
+ MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica.endpoint(), handler);
}
}
- public List<InetAddressAndPort> getContactedReplicas()
- {
- return speculated
- ? targetReplicas
- : targetReplicas.subList(0, targetReplicas.size() - 1);
- }
-
@Override
void onReadTimeout()
{
@@ -336,14 +323,12 @@ public abstract class AbstractReadExecutor
private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
{
- public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
- ColumnFamilyStore cfs,
+ public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
- ConsistencyLevel consistencyLevel,
- List<InetAddressAndPort> targetReplicas,
+ ReplicaLayout.ForToken replicaLayout,
long queryStartNanoTime)
{
- super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+ super(cfs, command, replicaLayout, replicaLayout.selected().size() > 1 ? 2 : 1, queryStartNanoTime);
}
public void maybeTryAdditionalReplicas()
@@ -351,17 +336,10 @@ public abstract class AbstractReadExecutor
// no-op
}
- public List<InetAddressAndPort> getContactedReplicas()
- {
- return targetReplicas;
- }
-
@Override
public void executeAsync()
{
- makeDataRequests(targetReplicas.subList(0, targetReplicas.size() > 1 ? 2 : 1));
- if (targetReplicas.size() > 2)
- makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
+ super.executeAsync();
cfs.metric.speculativeRetries.inc();
}
@@ -407,7 +385,7 @@ public abstract class AbstractReadExecutor
else
{
Tracing.trace("Digest mismatch: Mismatch for key {}", getKey());
- readRepair.startRepair(digestResolver, handler.endpoints, getContactedReplicas(), this::setResult);
+ readRepair.startRepair(digestResolver, this::setResult);
}
}
@@ -425,8 +403,7 @@ public abstract class AbstractReadExecutor
logger.trace("Timed out waiting on digest mismatch repair requests");
// the caught exception here will have CL.ALL from the repair command,
// not whatever CL the initial command was at (CASSANDRA-7947)
- int blockFor = consistency.blockFor(Keyspace.open(command.metadata().keyspace));
- throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
+ throw new ReadTimeoutException(replicaLayout().consistencyLevel(), handler.blockfor - 1, handler.blockfor, true);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index c0bff7a..9043e87 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -17,39 +17,55 @@
*/
package org.apache.cassandra.service.reads;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.db.transform.*;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
+import org.apache.cassandra.db.transform.Filter;
+import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Replicas;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.repair.ReadRepair;
-public class DataResolver extends ResponseResolver
+import static com.google.common.collect.Iterables.*;
+
+public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
{
- private final long queryStartNanoTime;
private final boolean enforceStrictLiveness;
- public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime, ReadRepair readRepair)
+ public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
{
- super(keyspace, command, consistency, readRepair, maxResponseCount);
- this.queryStartNanoTime = queryStartNanoTime;
+ super(command, replicaLayout, readRepair, queryStartNanoTime);
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
}
public PartitionIterator getData()
{
- ReadResponse response = responses.iterator().next().payload;
+ ReadResponse response = responses.get(0).payload;
return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
}
@@ -63,15 +79,13 @@ public class DataResolver extends ResponseResolver
{
// We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
// at the beginning of this method), so grab the response count once and use that through the method.
- int count = responses.size();
- List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
- InetAddressAndPort[] sources = new InetAddressAndPort[count];
- for (int i = 0; i < count; i++)
- {
- MessageIn<ReadResponse> msg = responses.get(i);
- iters.add(msg.payload.makeIterator(command));
- sources[i] = msg.from;
- }
+ Collection<MessageIn<ReadResponse>> messages = responses.snapshot();
+ assert !any(messages, msg -> msg.payload.isDigestResponse());
+
+ E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
+ List<UnfilteredPartitionIterator> iters = new ArrayList<>(
+ Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
+ assert replicas.size() == iters.size();
/*
* Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
@@ -86,18 +100,19 @@ public class DataResolver extends ResponseResolver
*
* See CASSANDRA-13747 for more details.
*/
-
DataLimits.Counter mergedResultCounter =
command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
- UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
+ replicaLayout.withSelected(replicas),
+ mergedResultCounter);
FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
return Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
- InetAddressAndPort[] sources,
+ L sources,
DataLimits.Counter mergedResultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
@@ -110,17 +125,17 @@ public class DataResolver extends ResponseResolver
*/
if (!command.limits().isUnlimited())
for (int i = 0; i < results.size(); i++)
- results.set(i, ShortReadProtection.extend(sources[i], results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
+ results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources));
}
private String makeResponsesDebugString(DecoratedKey partitionKey)
{
- return Joiner.on(",\n").join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
+ return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
}
- private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, InetAddressAndPort[] sources)
+ private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources)
{
return new UnfilteredPartitionIterators.MergeListener()
{
@@ -144,8 +159,8 @@ public class DataResolver extends ResponseResolver
String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
table,
mergedDeletion == null ? "null" : mergedDeletion.toString(),
- '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
- Arrays.toString(sources),
+ '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
+ sources.selected(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
@@ -165,8 +180,8 @@ public class DataResolver extends ResponseResolver
String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
table,
merged == null ? "null" : merged.toString(table),
- '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
- Arrays.toString(sources),
+ '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+ sources.selected(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
@@ -191,8 +206,8 @@ public class DataResolver extends ResponseResolver
String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
table,
merged == null ? "null" : merged.toString(table),
- '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
- Arrays.toString(sources),
+ '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+ sources.selected(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 897892f..c3eee43 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -18,25 +18,35 @@
package org.apache.cassandra.service.reads;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class DigestResolver extends ResponseResolver
+import static com.google.common.collect.Iterables.any;
+
+public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
{
- private volatile ReadResponse dataResponse;
+ private volatile MessageIn<ReadResponse> dataResponse;
- public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount)
+ public DigestResolver(ReadCommand command, L replicas, ReadRepair<E, L> readRepair, long queryStartNanoTime)
{
- super(keyspace, command, consistency, readRepair, maxResponseCount);
+ super(command, replicas, readRepair, queryStartNanoTime);
Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
"DigestResolver can only be used with SinglePartitionReadCommand commands");
}
@@ -45,14 +55,60 @@ public class DigestResolver extends ResponseResolver
public void preprocess(MessageIn<ReadResponse> message)
{
super.preprocess(message);
- if (dataResponse == null && !message.payload.isDigestResponse())
- dataResponse = message.payload;
+ Replica replica = replicaLayout.getReplicaFor(message.from);
+ if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
+ {
+ dataResponse = message;
+ }
+ else if (replica.isTransient() && message.payload.isDigestResponse())
+ {
+ throw new IllegalStateException("digest response received from transient replica");
+ }
+ }
+
+ @VisibleForTesting
+ public boolean hasTransientResponse()
+ {
+ return hasTransientResponse(responses.snapshot());
+ }
+
+ private boolean hasTransientResponse(Collection<MessageIn<ReadResponse>> responses)
+ {
+ return any(responses,
+ msg -> !msg.payload.isDigestResponse()
+ && replicaLayout.getReplicaFor(msg.from).isTransient());
}
public PartitionIterator getData()
{
assert isDataPresent();
- return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
+
+ Collection<MessageIn<ReadResponse>> responses = this.responses.snapshot();
+
+ if (!hasTransientResponse(responses))
+ {
+ return UnfilteredPartitionIterators.filter(dataResponse.payload.makeIterator(command), command.nowInSec());
+ }
+ else
+ {
+ // This path can be triggered only if we've got responses from full replicas and they match, but
+ // transient replica response still contains data, which needs to be reconciled.
+ DataResolver<E, L> dataResolver = new DataResolver<>(command,
+ replicaLayout,
+ (ReadRepair<E, L>) NoopReadRepair.instance,
+ queryStartNanoTime);
+
+ dataResolver.preprocess(dataResponse);
+ // Forward differences to all full nodes
+ for (MessageIn<ReadResponse> response : responses)
+ {
+ Replica replica = replicaLayout.getReplicaFor(response.from);
+ if (replica.isTransient())
+ dataResolver.preprocess(response);
+ }
+
+ return dataResolver.resolve();
+ }
}
public boolean responsesMatch()
@@ -61,11 +117,12 @@ public class DigestResolver extends ResponseResolver
// validate digests against each other; return false immediately on mismatch.
ByteBuffer digest = null;
- for (MessageIn<ReadResponse> message : responses)
+ for (MessageIn<ReadResponse> message : responses.snapshot())
{
- ReadResponse response = message.payload;
+ if (replicaLayout.getReplicaFor(message.from).isTransient())
+ continue;
- ByteBuffer newDigest = response.digest(command);
+ ByteBuffer newDigest = message.payload.digest(command);
if (digest == null)
digest = newDigest;
else if (!digest.equals(newDigest))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 537e684..3d39377 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -18,42 +18,43 @@
package org.apache.cassandra.service.reads;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
-public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
+public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements IAsyncCallbackWithFailure<ReadResponse>
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
public final ResponseResolver resolver;
final SimpleCondition condition = new SimpleCondition();
private final long queryStartNanoTime;
+ // TODO: move to replica layout as well?
final int blockfor;
- final List<InetAddressAndPort> endpoints;
+ final L replicaLayout;
private final ReadCommand command;
- private final ConsistencyLevel consistencyLevel;
private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
= AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
private volatile int received = 0;
@@ -62,37 +63,19 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
- private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
-
- /**
- * Constructor when response count has to be calculated and blocked for.
- */
- public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddressAndPort> filteredEndpoints, long queryStartNanoTime)
- {
- this(resolver,
- consistencyLevel,
- consistencyLevel.blockFor(Keyspace.open(command.metadata().keyspace)),
- command,
- Keyspace.open(command.metadata().keyspace),
- filteredEndpoints,
- queryStartNanoTime);
- }
-
- public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddressAndPort> endpoints, long queryStartNanoTime)
+ public ReadCallback(ResponseResolver resolver, int blockfor, ReadCommand command, L replicaLayout, long queryStartNanoTime)
{
this.command = command;
- this.keyspace = keyspace;
this.blockfor = blockfor;
- this.consistencyLevel = consistencyLevel;
this.resolver = resolver;
this.queryStartNanoTime = queryStartNanoTime;
- this.endpoints = endpoints;
+ this.replicaLayout = replicaLayout;
this.failureReasonByEndpoint = new ConcurrentHashMap<>();
// we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
- assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
+ assert !(command instanceof PartitionRangeReadCommand) || blockfor >= replicaLayout.selected().size();
if (logger.isTraceEnabled())
- logger.trace("Blockfor is {}; setting up requests to {}", blockfor, StringUtils.join(this.endpoints, ","));
+ logger.trace("Blockfor is {}; setting up requests to {}", blockfor, this.replicaLayout);
}
public boolean await(long timePastStart, TimeUnit unit)
@@ -111,7 +94,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
- boolean failed = blockfor + failures > endpoints.size();
+ boolean failed = failures > 0 && blockfor + failures > replicaLayout.selected().size();
if (signaled && !failed)
return;
@@ -128,8 +111,8 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
// Same as for writes, see AbstractWriteResponseHandler
throw failed
- ? new ReadFailureException(consistencyLevel, received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
- : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
+ ? new ReadFailureException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
+ : new ReadTimeoutException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent());
}
public int blockFor()
@@ -153,9 +136,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
*/
private boolean waitingFor(InetAddressAndPort from)
{
- return consistencyLevel.isDatacenterLocal()
- ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
- : true;
+ return !replicaLayout.consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
}
/**
@@ -178,7 +159,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
public void assureSufficientLiveNodes() throws UnavailableException
{
- consistencyLevel.assureSufficientLiveNodes(keyspace, endpoints);
+ replicaLayout.consistencyLevel().assureSufficientLiveNodesForRead(replicaLayout.keyspace(), replicaLayout.selected());
}
public boolean isLatencyForSnitch()
@@ -195,7 +176,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
failureReasonByEndpoint.put(from, failureReason);
- if (blockfor + n > endpoints.size())
+ if (blockfor + n > replicaLayout.selected().size())
condition.signalAll();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index f4f00a2..e306b4d 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -20,37 +20,49 @@ package org.apache.cassandra.service.reads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.utils.concurrent.Accumulator;
-public abstract class ResponseResolver
+public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
{
protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
- protected final Keyspace keyspace;
protected final ReadCommand command;
- protected final ConsistencyLevel consistency;
- protected final ReadRepair readRepair;
+ protected final L replicaLayout;
+ protected final ReadRepair<E, L> readRepair;
// Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
protected final Accumulator<MessageIn<ReadResponse>> responses;
+ protected final long queryStartNanoTime;
- public ResponseResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount)
+ public ResponseResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
{
- this.keyspace = keyspace;
this.command = command;
- this.consistency = consistency;
+ this.replicaLayout = replicaLayout;
this.readRepair = readRepair;
- this.responses = new Accumulator<>(maxResponseCount);
+ // TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes)
+ this.responses = new Accumulator<>(replicaLayout.all().size());
+ this.queryStartNanoTime = queryStartNanoTime;
}
public abstract boolean isDataPresent();
public void preprocess(MessageIn<ReadResponse> message)
{
- responses.add(message);
+ try
+ {
+ responses.add(message);
+ }
+ catch (IllegalStateException e)
+ {
+ logger.error("Encountered error while trying to preprocess the message {}: %s in command {}, replicas: {}", message, command, readRepair, replicaLayout.consistencyLevel(), replicaLayout.selected());
+ throw e;
+ }
}
public Accumulator<MessageIn<ReadResponse>> getMessages()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index d4e8957..580b790 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -18,12 +18,13 @@
package org.apache.cassandra.service.reads;
-import java.util.Collections;
+import org.apache.cassandra.locator.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
@@ -39,7 +40,8 @@ import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.ExcludingBounds;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.StorageProxy;
@@ -47,8 +49,9 @@ import org.apache.cassandra.tracing.Tracing;
public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator>
{
+ private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
private final ReadCommand command;
- private final InetAddressAndPort source;
+ private final Replica source;
private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
private final DataLimits.Counter mergedResultCounter; // merged end-result counter
@@ -59,7 +62,7 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
private final long queryStartNanoTime;
- public ShortReadPartitionsProtection(ReadCommand command, InetAddressAndPort source,
+ public ShortReadPartitionsProtection(ReadCommand command, Replica source,
DataLimits.Counter singleResultCounter,
DataLimits.Counter mergedResultCounter,
long queryStartNanoTime)
@@ -84,9 +87,10 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
* If we don't apply the transformation *after* extending the partition with MoreRows,
* applyToRow() method of protection will not be called on the first row of the new extension iterator.
*/
+ ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
command, source,
- this::executeReadCommand,
+ (cmd) -> executeReadCommand(cmd, replicaLayout),
singleResultCounter,
mergedResultCounter);
return Transformation.apply(MoreRows.extend(partition, protection), protection);
@@ -140,9 +144,9 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+ logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source);
- PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery);
- return executeReadCommand(cmd);
+ return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
}
// Counts the number of rows for regular queries and the number of groups for GROUP BY queries
@@ -153,7 +157,7 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
: counter.counted();
}
- private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery)
+ private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery)
{
PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
@@ -165,19 +169,19 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
: new ExcludingBounds<>(lastPartitionKey, bounds.right);
DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
- return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+ ReplicaLayout.ForRange replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
+ return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), replicaLayout);
}
- private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
+ private <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, L replicaLayout)
{
- Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
- DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime, NoopReadRepair.instance);
- ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime);
+ DataResolver<E, L> resolver = new DataResolver<>(cmd, replicaLayout, (NoopReadRepair<E, L>)NoopReadRepair.instance, queryStartNanoTime);
+ ReadCallback<E, L> handler = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), cmd, replicaLayout, queryStartNanoTime);
- if (StorageProxy.canDoLocalRequest(source))
+ if (source.isLocal())
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
else
- MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, handler);
+ MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source.endpoint(), handler);
// We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
handler.awaitResults();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
index f603e9b..ef1d45b 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.transform.MorePartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
/**
* We have a potential short read if the result from a given node contains the requested number of rows
@@ -40,7 +41,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
public class ShortReadProtection
{
@SuppressWarnings("resource")
- public static UnfilteredPartitionIterator extend(InetAddressAndPort source, UnfilteredPartitionIterator partitions,
+ public static UnfilteredPartitionIterator extend(Replica source, UnfilteredPartitionIterator partitions,
ReadCommand command, DataLimits.Counter mergedResultCounter,
long queryStartNanoTime, boolean enforceStrictLiveness)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
index 6b1da0b..8dc7fc7 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
@@ -33,14 +33,14 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.transform.MoreRows;
import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.tracing.Tracing;
class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
{
private final ReadCommand command;
- private final InetAddressAndPort source;
+ private final Replica source;
private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
private final DataLimits.Counter mergedResultCounter; // merged end-result counter
private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor;
@@ -53,7 +53,7 @@ class ShortReadRowsProtection extends Transformation implements MoreRows<Unfilte
private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
private int lastQueried = 0; // # extra rows requested from the replica last time
- ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, InetAddressAndPort source,
+ ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, Replica source,
Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor,
DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 7e3f0ae..30dea74 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -18,29 +18,23 @@
package org.apache.cassandra.service.reads.repair;
-import java.util.List;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
import com.codahale.metrics.Meter;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.reads.DataResolver;
@@ -48,11 +42,11 @@ import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.tracing.Tracing;
-public abstract class AbstractReadRepair implements ReadRepair
+public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
{
protected final ReadCommand command;
protected final long queryStartNanoTime;
- protected final ConsistencyLevel consistency;
+ protected final L replicaLayout;
protected final ColumnFamilyStore cfs;
private volatile DigestRepair digestRepair = null;
@@ -62,41 +56,25 @@ public abstract class AbstractReadRepair implements ReadRepair
private final DataResolver dataResolver;
private final ReadCallback readCallback;
private final Consumer<PartitionIterator> resultConsumer;
- private final List<InetAddressAndPort> initialContacts;
- public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer, List<InetAddressAndPort> initialContacts)
+ public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer)
{
this.dataResolver = dataResolver;
this.readCallback = readCallback;
this.resultConsumer = resultConsumer;
- this.initialContacts = initialContacts;
}
}
public AbstractReadRepair(ReadCommand command,
- long queryStartNanoTime,
- ConsistencyLevel consistency)
+ L replicaLayout,
+ long queryStartNanoTime)
{
this.command = command;
this.queryStartNanoTime = queryStartNanoTime;
- this.consistency = consistency;
+ this.replicaLayout = replicaLayout;
this.cfs = Keyspace.openAndGetStore(command.metadata());
}
- private int getMaxResponses()
- {
- AbstractReplicationStrategy strategy = cfs.keyspace.getReplicationStrategy();
- if (consistency.isDatacenterLocal() && strategy instanceof NetworkTopologyStrategy)
- {
- NetworkTopologyStrategy nts = (NetworkTopologyStrategy) strategy;
- return nts.getReplicationFactor(DatabaseDescriptor.getLocalDataCenter());
- }
- else
- {
- return strategy.getReplicationFactor();
- }
- }
-
void sendReadCommand(InetAddressAndPort to, ReadCallback readCallback)
{
MessagingService.instance().sendRRWithFailure(command.createMessage(), to, readCallback);
@@ -105,24 +83,23 @@ public abstract class AbstractReadRepair implements ReadRepair
abstract Meter getRepairMeter();
// digestResolver isn't used here because we resend read requests to all participants
- public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+ public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
{
getRepairMeter().mark();
// Do a full data read to resolve the correct response (and repair node that need be)
- Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
- DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, getMaxResponses(), queryStartNanoTime, this);
- ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, consistency.blockFor(cfs.keyspace), command,
- keyspace, allEndpoints, queryStartNanoTime);
+ DataResolver<E, L> resolver = new DataResolver<>(command, replicaLayout, this, queryStartNanoTime);
+ ReadCallback<E, L> readCallback = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(cfs.keyspace),
+ command, replicaLayout, queryStartNanoTime);
- digestRepair = new DigestRepair(resolver, readCallback, resultConsumer, contactedEndpoints);
+ digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
- for (InetAddressAndPort endpoint : contactedEndpoints)
+ for (Replica replica : replicaLayout.selected())
{
- Tracing.trace("Enqueuing full data read to {}", endpoint);
- sendReadCommand(endpoint, readCallback);
+ Tracing.trace("Enqueuing full data read to {}", replica);
+ sendReadCommand(replica.endpoint(), readCallback);
}
- ReadRepairDiagnostics.startRepair(this, contactedEndpoints, digestResolver, allEndpoints);
+ ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints());
}
public void awaitReads() throws ReadTimeoutException
@@ -137,15 +114,11 @@ public abstract class AbstractReadRepair implements ReadRepair
private boolean shouldSpeculate()
{
+ ConsistencyLevel consistency = replicaLayout.consistencyLevel();
ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
return consistency != ConsistencyLevel.EACH_QUORUM
&& consistency.satisfies(speculativeCL, cfs.keyspace)
- && cfs.sampleLatencyNanos <= TimeUnit.MILLISECONDS.toNanos(command.getTimeout());
- }
-
- Iterable<InetAddressAndPort> getCandidatesForToken(Token token)
- {
- return BlockingReadRepairs.getCandidateEndpoints(cfs.keyspace, token, consistency);
+ && cfs.sampleReadLatencyNanos <= TimeUnit.MILLISECONDS.toNanos(command.getTimeout());
}
public void maybeSendAdditionalReads()
@@ -156,20 +129,17 @@ public abstract class AbstractReadRepair implements ReadRepair
if (repair == null)
return;
- if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS))
+ if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS))
{
- Set<InetAddressAndPort> contacted = Sets.newHashSet(repair.initialContacts);
- Token replicaToken = ((SinglePartitionReadCommand) command).partitionKey().getToken();
- Iterable<InetAddressAndPort> candidates = getCandidatesForToken(replicaToken);
-
- Optional<InetAddressAndPort> endpoint = Iterables.tryFind(candidates, e -> !contacted.contains(e));
- if (endpoint.isPresent())
- {
- Tracing.trace("Enqueuing speculative full data read to {}", endpoint);
- sendReadCommand(endpoint.get(), repair.readCallback);
- ReadRepairMetrics.speculatedRead.mark();
- ReadRepairDiagnostics.speculatedRead(this, endpoint.get(), candidates);
- }
+ L uncontacted = replicaLayout.forNaturalUncontacted();
+ if (uncontacted.selected().isEmpty())
+ return;
+
+ Replica replica = uncontacted.selected().iterator().next();
+ Tracing.trace("Enqueuing speculative full data read to {}", replica);
+ sendReadCommand(replica.endpoint(), repair.readCallback);
+ ReadRepairMetrics.speculatedRead.mark();
+ ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 8d69bef..54af2cf 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -20,16 +20,14 @@ package org.apache.cassandra.service.reads.repair;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -38,7 +36,11 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
@@ -47,33 +49,29 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.tracing.Tracing;
-public class BlockingPartitionRepair extends AbstractFuture<Object> implements IAsyncCallback<Object>
+public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractFuture<Object> implements IAsyncCallback<Object>
{
- private final Keyspace keyspace;
private final DecoratedKey key;
- private final ConsistencyLevel consistency;
- private final InetAddressAndPort[] participants;
- private final ConcurrentMap<InetAddressAndPort, Mutation> pendingRepairs;
+ private final L replicaLayout;
+ private final Map<Replica, Mutation> pendingRepairs;
private final CountDownLatch latch;
private volatile long mutationsSentTime;
- public BlockingPartitionRepair(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, InetAddressAndPort[] participants)
+ public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
{
- this.keyspace = keyspace;
this.key = key;
- this.consistency = consistency;
this.pendingRepairs = new ConcurrentHashMap<>(repairs);
- this.participants = participants;
+ this.replicaLayout = replicaLayout;
// here we remove empty repair mutations from the block for total, since
// we're not sending them mutations
int blockFor = maxBlockFor;
- for (InetAddressAndPort participant: participants)
+ for (Replica participant: replicaLayout.selected())
{
// remote dcs can sometimes get involved in dc-local reads. We want to repair
// them if they do, but they shouldn't interfere with blocking the client read.
- if (!repairs.containsKey(participant) && shouldBlockOn(participant))
+ if (!repairs.containsKey(participant) && shouldBlockOn(participant.endpoint()))
blockFor--;
}
@@ -99,7 +97,7 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
private boolean shouldBlockOn(InetAddressAndPort endpoint)
{
- return !consistency.isDatacenterLocal() || isLocal(endpoint);
+ return !replicaLayout.consistencyLevel().isDatacenterLocal() || isLocal(endpoint);
}
@VisibleForTesting
@@ -107,7 +105,7 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
{
if (shouldBlockOn(from))
{
- pendingRepairs.remove(from);
+ pendingRepairs.remove(replicaLayout.getReplicaFor(from));
latch.countDown();
}
}
@@ -148,20 +146,23 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
public void sendInitialRepairs()
{
mutationsSentTime = System.nanoTime();
- for (Map.Entry<InetAddressAndPort, Mutation> entry: pendingRepairs.entrySet())
+ Replicas.assertFull(pendingRepairs.keySet());
+
+ for (Map.Entry<Replica, Mutation> entry: pendingRepairs.entrySet())
{
- InetAddressAndPort destination = entry.getKey();
+ Replica destination = entry.getKey();
+ Preconditions.checkArgument(destination.isFull(), "Can't send repairs to transient replicas: %s", destination);
Mutation mutation = entry.getValue();
TableId tableId = extractUpdate(mutation).metadata().id;
Tracing.trace("Sending read-repair-mutation to {}", destination);
// use a separate verb here to avoid writing hints on timeouts
- sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), destination);
+ sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), destination.endpoint());
ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark();
- if (!shouldBlockOn(destination))
+ if (!shouldBlockOn(destination.endpoint()))
pendingRepairs.remove(destination);
- ReadRepairDiagnostics.sendInitialRepair(this, destination, mutation);
+ ReadRepairDiagnostics.sendInitialRepair(this, destination.endpoint(), mutation);
}
}
@@ -197,9 +198,8 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
if (awaitRepairs(timeout, timeoutUnit))
return;
- Set<InetAddressAndPort> exclude = Sets.newHashSet(participants);
- Iterable<InetAddressAndPort> candidates = Iterables.filter(getCandidateEndpoints(), e -> !exclude.contains(e));
- if (Iterables.isEmpty(candidates))
+ L newCandidates = replicaLayout.forNaturalUncontacted();
+ if (newCandidates.selected().isEmpty())
return;
PartitionUpdate update = mergeUnackedUpdates();
@@ -212,34 +212,34 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
Mutation[] versionedMutations = new Mutation[msgVersionIdx(MessagingService.current_version) + 1];
- for (InetAddressAndPort endpoint: candidates)
+ for (Replica replica : newCandidates.selected())
{
- int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(endpoint));
+ int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(replica.endpoint()));
Mutation mutation = versionedMutations[versionIdx];
if (mutation == null)
{
- mutation = BlockingReadRepairs.createRepairMutation(update, consistency, endpoint, true);
+ mutation = BlockingReadRepairs.createRepairMutation(update, replicaLayout.consistencyLevel(), replica.endpoint(), true);
versionedMutations[versionIdx] = mutation;
}
if (mutation == null)
{
// the mutation is too large to send.
- ReadRepairDiagnostics.speculatedWriteOversized(this, endpoint);
+ ReadRepairDiagnostics.speculatedWriteOversized(this, replica.endpoint());
continue;
}
- Tracing.trace("Sending speculative read-repair-mutation to {}", endpoint);
- sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), endpoint);
- ReadRepairDiagnostics.speculatedWrite(this, endpoint, mutation);
+ Tracing.trace("Sending speculative read-repair-mutation to {}", replica);
+ sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), replica.endpoint());
+ ReadRepairDiagnostics.speculatedWrite(this, replica.endpoint(), mutation);
}
}
Keyspace getKeyspace()
{
- return keyspace;
+ return replicaLayout.keyspace();
}
DecoratedKey getKey()
@@ -249,13 +249,6 @@ public class BlockingPartitionRepair extends AbstractFuture<Object> implements I
ConsistencyLevel getConsistency()
{
- return consistency;
- }
-
- @VisibleForTesting
- protected Iterable<InetAddressAndPort> getCandidateEndpoints()
- {
- return BlockingReadRepairs.getCandidateEndpoints(keyspace, key.getToken(), consistency);
+ return replicaLayout.consistencyLevel();
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index e46372e..402aed0 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -23,18 +23,19 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.locator.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.tracing.Tracing;
@@ -43,20 +44,22 @@ import org.apache.cassandra.tracing.Tracing;
* updates have been written to nodes needing correction. Breaks write
* atomicity in some situations
*/
-public class BlockingReadRepair extends AbstractReadRepair
+public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
{
private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
+ private final int blockFor;
- public BlockingReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
+ BlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
{
- super(command, queryStartNanoTime, consistency);
+ super(command, replicaLayout, queryStartNanoTime);
+ this.blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
}
- public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
{
- return new PartitionIteratorMergeListener(endpoints, command, consistency, this);
+ return new PartitionIteratorMergeListener(replicaLayout, command, this.replicaLayout.consistencyLevel(), this);
}
@Override
@@ -70,7 +73,7 @@ public class BlockingReadRepair extends AbstractReadRepair
{
for (BlockingPartitionRepair repair: repairs)
{
- repair.maybeSendAdditionalWrites(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
+ repair.maybeSendAdditionalWrites(cfs.transientWriteLatencyNanos, TimeUnit.NANOSECONDS);
}
}
@@ -88,20 +91,20 @@ public class BlockingReadRepair extends AbstractReadRepair
if (timedOut)
{
// We got all responses, but timed out while repairing
- int blockFor = consistency.blockFor(cfs.keyspace);
+ int blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
if (Tracing.isTracing())
Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
else
logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
- throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
+ throw new ReadTimeoutException(replicaLayout.consistencyLevel(), blockFor - 1, blockFor, true);
}
}
@Override
- public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
{
- BlockingPartitionRepair blockingRepair = new BlockingPartitionRepair(cfs.keyspace, key, consistency, mutations, consistency.blockFor(cfs.keyspace), destinations);
+ BlockingPartitionRepair<E, L> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaLayout);
blockingRepair.sendInitialRepairs();
repairs.add(blockingRepair);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
index e5f7179..ceb1765 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
@@ -18,10 +18,6 @@
package org.apache.cassandra.service.reads.repair;
-import java.util.List;
-
-import com.google.common.collect.Iterables;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,13 +27,10 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.tracing.Tracing;
public class BlockingReadRepairs
@@ -48,18 +41,6 @@ public class BlockingReadRepairs
Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
/**
- * Returns all of the endpoints that are replicas for the given key. If the consistency level is datacenter
- * local, only the endpoints in the local dc will be returned.
- */
- static Iterable<InetAddressAndPort> getCandidateEndpoints(Keyspace keyspace, Token token, ConsistencyLevel consistency)
- {
- List<InetAddressAndPort> endpoints = StorageProxy.getLiveSortedEndpoints(keyspace, token);
- return consistency.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy
- ? Iterables.filter(endpoints, ConsistencyLevel::isLocal)
- : endpoints;
- }
-
- /**
* Create a read repair mutation from the given update, if the mutation is not larger than the maximum
* mutation size, otherwise return null. Or, if we're configured to be strict, throw an exception.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org