You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/09/14 09:14:52 UTC
[2/4] cassandra git commit: ReplicaPlan/Layout refactor
follow-up/completion
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 75885ae..c296cba 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -21,8 +21,9 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import com.google.common.base.Predicates;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,6 @@ import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import static com.google.common.collect.Iterables.all;
-import static com.google.common.collect.Iterables.tryFind;
/**
* Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
@@ -65,24 +65,25 @@ public abstract class AbstractReadExecutor
private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
protected final ReadCommand command;
- private final ReplicaLayout.ForToken replicaLayout;
- protected final ReadRepair<EndpointsForToken, ReplicaLayout.ForToken> readRepair;
- protected final DigestResolver<EndpointsForToken, ReplicaLayout.ForToken> digestResolver;
- protected final ReadCallback<EndpointsForToken, ReplicaLayout.ForToken> handler;
+ private final ReplicaPlan.SharedForTokenRead replicaPlan;
+ protected final ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> readRepair;
+ protected final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> digestResolver;
+ protected final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler;
protected final TraceState traceState;
protected final ColumnFamilyStore cfs;
protected final long queryStartNanoTime;
private final int initialDataRequestCount;
protected volatile PartitionIterator result = null;
- AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, int initialDataRequestCount, long queryStartNanoTime)
+ AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, int initialDataRequestCount, long queryStartNanoTime)
{
this.command = command;
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = ReplicaPlan.shared(replicaPlan);
this.initialDataRequestCount = initialDataRequestCount;
- this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
- this.digestResolver = new DigestResolver<>(command, replicaLayout, queryStartNanoTime);
- this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime);
+ // the ReadRepair and DigestResolver both need to see our updated
+ this.readRepair = ReadRepair.create(command, this.replicaPlan, queryStartNanoTime);
+ this.digestResolver = new DigestResolver<>(command, this.replicaPlan, queryStartNanoTime);
+ this.handler = new ReadCallback<>(digestResolver, command, this.replicaPlan, queryStartNanoTime);
this.cfs = cfs;
this.traceState = Tracing.instance.get();
this.queryStartNanoTime = queryStartNanoTime;
@@ -93,7 +94,7 @@ public abstract class AbstractReadExecutor
// TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
// we stop being compatible with pre-3.0 nodes.
int digestVersion = MessagingService.current_version;
- for (Replica replica : replicaLayout.selected())
+ for (Replica replica : replicaPlan.contacts())
digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica.endpoint()));
command.setDigestVersion(digestVersion);
}
@@ -168,7 +169,7 @@ public abstract class AbstractReadExecutor
*/
public void executeAsync()
{
- EndpointsForToken selected = replicaLayout().selected();
+ EndpointsForToken selected = replicaPlan().contacts();
EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount);
makeFullDataRequests(fullDataRequests);
makeTransientDataRequests(selected.filter(Replica::isTransient));
@@ -184,30 +185,25 @@ public abstract class AbstractReadExecutor
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
- // Endpoints for Token
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
+ ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
// Speculative retry is disabled *OR*
// 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
- // TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0
- return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false);
+ return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false);
// There are simply no extra replicas to speculate.
// Handle this separately so it can record failed attempts to speculate due to lack of replicas
- if (replicaLayout.selected().size() == replicaLayout.all().size())
+ if (replicaPlan.contacts().size() == replicaPlan.candidates().size())
{
boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
- return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation);
+ return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation);
}
- // If CL.ALL, upgrade to AlwaysSpeculating;
- // If We are going to contact every node anyway, ask for 2 full data requests instead of 1, for redundancy
- // (same amount of requests in total, but we turn 1 digest request into a full blown data request)
if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
- return new AlwaysSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
+ return new AlwaysSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
else // PERCENTILE or CUSTOM.
- return new SpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
+ return new SpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
}
/**
@@ -223,9 +219,9 @@ public abstract class AbstractReadExecutor
return !handler.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS);
}
- ReplicaLayout.ForToken replicaLayout()
+ ReplicaPlan.ForTokenRead replicaPlan()
{
- return replicaLayout;
+ return replicaPlan.get();
}
void onReadTimeout() {}
@@ -239,9 +235,9 @@ public abstract class AbstractReadExecutor
*/
private final boolean logFailedSpeculation;
- public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, long queryStartNanoTime, boolean logFailedSpeculation)
+ public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime, boolean logFailedSpeculation)
{
- super(cfs, command, replicaLayout, 1, queryStartNanoTime);
+ super(cfs, command, replicaPlan, 1, queryStartNanoTime);
this.logFailedSpeculation = logFailedSpeculation;
}
@@ -260,13 +256,13 @@ public abstract class AbstractReadExecutor
public SpeculatingReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
- ReplicaLayout.ForToken replicaLayout,
+ ReplicaPlan.ForTokenRead replicaPlan,
long queryStartNanoTime)
{
// We're hitting additional targets for read repair (??). Since our "extra" replica is the least-
// preferred by the snitch, we do an extra data read to start with against a replica more
// likely to reply; better to let RR fail than the entire query.
- super(cfs, command, replicaLayout, replicaLayout.blockFor() < replicaLayout.selected().size() ? 2 : 1, queryStartNanoTime);
+ super(cfs, command, replicaPlan, replicaPlan.blockFor() < replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime);
}
public void maybeTryAdditionalReplicas()
@@ -277,12 +273,12 @@ public abstract class AbstractReadExecutor
cfs.metric.speculativeRetries.inc();
speculated = true;
+ ReplicaPlan.ForTokenRead replicaPlan = replicaPlan();
ReadCommand retryCommand = command;
Replica extraReplica;
if (handler.resolver.isDataPresent())
{
- extraReplica = tryFind(replicaLayout().all(),
- r -> !replicaLayout().selected().contains(r)).orNull();
+ extraReplica = replicaPlan.firstUncontactedCandidate(Predicates.alwaysTrue());
// we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
assert extraReplica != null;
@@ -293,8 +289,7 @@ public abstract class AbstractReadExecutor
}
else
{
- extraReplica = tryFind(replicaLayout().all(),
- r -> r.isFull() && !replicaLayout().selected().contains(r)).orNull();
+ extraReplica = replicaPlan.firstUncontactedCandidate(Replica::isFull);
if (extraReplica == null)
{
cfs.metric.speculativeInsufficientReplicas.inc();
@@ -304,6 +299,11 @@ public abstract class AbstractReadExecutor
}
}
+ // we must update the plan to include this new node, else when we come to read-repair, we may not include this
+ // speculated response in the data requests we make again, and we will not be able to 'speculate' an extra repair read,
+ // nor would we be able to speculate a new 'write' if the repair writes are insufficient
+ super.replicaPlan.addToContacts(extraReplica);
+
if (traceState != null)
traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
@@ -325,10 +325,12 @@ public abstract class AbstractReadExecutor
{
public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
- ReplicaLayout.ForToken replicaLayout,
+ ReplicaPlan.ForTokenRead replicaPlan,
long queryStartNanoTime)
{
- super(cfs, command, replicaLayout, replicaLayout.selected().size() > 1 ? 2 : 1, queryStartNanoTime);
+ // presumably, we speculate an extra data request here in case it is our data request that fails to respond,
+ // and there are no more nodes to consult
+ super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 ? 2 : 1, queryStartNanoTime);
}
public void maybeTryAdditionalReplicas()
@@ -403,7 +405,7 @@ public abstract class AbstractReadExecutor
logger.trace("Timed out waiting on digest mismatch repair requests");
// the caught exception here will have CL.ALL from the repair command,
// not whatever CL the initial command was at (CASSANDRA-7947)
- throw new ReadTimeoutException(replicaLayout().consistencyLevel(), handler.blockfor - 1, handler.blockfor, true);
+ throw new ReadTimeoutException(replicaPlan().consistencyLevel(), handler.blockFor - 1, handler.blockFor, true);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index a6901b2..db5f3c8 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.db.transform.Filter;
import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.repair.ReadRepair;
@@ -52,14 +52,14 @@ import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
import static com.google.common.collect.Iterables.*;
import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener;
-public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
+public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
{
private final boolean enforceStrictLiveness;
- private final ReadRepair<E, L> readRepair;
+ private final ReadRepair<E, P> readRepair;
- public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
+ public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
this.readRepair = readRepair;
}
@@ -83,7 +83,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
Collection<MessageIn<ReadResponse>> messages = responses.snapshot();
assert !any(messages, msg -> msg.payload.isDigestResponse());
- E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
+ E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from), false);
List<UnfilteredPartitionIterator> iters = new ArrayList<>(
Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
assert replicas.size() == iters.size();
@@ -121,7 +121,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
- replicaLayout.withSelected(replicas),
+ replicaPlan.getWithContacts(replicas),
mergedResultCounter,
repairedDataTracker);
FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
@@ -135,7 +135,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
}
private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
- L sources,
+ P sources,
DataLimits.Counter mergedResultCounter,
RepairedDataTracker repairedDataTracker)
{
@@ -150,7 +150,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
*/
if (!command.limits().isUnlimited())
for (int i = 0; i < results.size(); i++)
- results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
+ results.set(i, ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
}
@@ -161,7 +161,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
}
private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
- L sources,
+ P sources,
RepairedDataTracker repairedDataTracker)
{
// Avoid wrapping no-op listeners as it doesn't throw
@@ -191,7 +191,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
table,
mergedDeletion == null ? "null" : mergedDeletion.toString(),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
- sources.selected(),
+ sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
@@ -212,7 +212,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
table,
merged == null ? "null" : merged.toString(table),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
- sources.selected(),
+ sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
@@ -238,7 +238,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
table,
merged == null ? "null" : merged.toString(table),
'[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
- sources.selected(),
+ sources.contacts(),
makeResponsesDebugString(partitionKey));
throw new AssertionError(details, e);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 28c2117..0dcae95 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -29,10 +29,10 @@ import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.reads.repair.ReadRepair;
@@ -40,13 +40,13 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import static com.google.common.collect.Iterables.any;
-public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
+public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
{
private volatile MessageIn<ReadResponse> dataResponse;
- public DigestResolver(ReadCommand command, L replicas, long queryStartNanoTime)
+ public DigestResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- super(command, replicas, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
"DigestResolver can only be used with SinglePartitionReadCommand commands");
}
@@ -55,7 +55,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
public void preprocess(MessageIn<ReadResponse> message)
{
super.preprocess(message);
- Replica replica = replicaLayout.getReplicaFor(message.from);
+ Replica replica = replicaPlan().getReplicaFor(message.from);
if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
{
dataResponse = message;
@@ -76,7 +76,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
{
return any(responses,
msg -> !msg.payload.isDigestResponse()
- && replicaLayout.getReplicaFor(msg.from).isTransient());
+ && replicaPlan().getReplicaFor(msg.from).isTransient());
}
public PartitionIterator getData()
@@ -93,16 +93,14 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
{
// This path can be triggered only if we've got responses from full replicas and they match, but
// transient replica response still contains data, which needs to be reconciled.
- DataResolver<E, L> dataResolver = new DataResolver<>(command,
- replicaLayout,
- NoopReadRepair.instance,
- queryStartNanoTime);
+ DataResolver<E, P> dataResolver
+ = new DataResolver<>(command, replicaPlan, NoopReadRepair.instance, queryStartNanoTime);
dataResolver.preprocess(dataResponse);
// Reconcile with transient replicas
for (MessageIn<ReadResponse> response : responses)
{
- Replica replica = replicaLayout.getReplicaFor(response.from);
+ Replica replica = replicaPlan().getReplicaFor(response.from);
if (replica.isTransient())
dataResolver.preprocess(response);
}
@@ -119,7 +117,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
ByteBuffer digest = null;
for (MessageIn<ReadResponse> message : responses.snapshot())
{
- if (replicaLayout.getReplicaFor(message.from).isTransient())
+ if (replicaPlan().getReplicaFor(message.from).isTransient())
continue;
ByteBuffer newDigest = message.payload.digest(command);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 3d39377..7a2385c 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -23,19 +23,18 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
@@ -44,16 +43,17 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
-public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements IAsyncCallbackWithFailure<ReadResponse>
+public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements IAsyncCallbackWithFailure<ReadResponse>
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
public final ResponseResolver resolver;
final SimpleCondition condition = new SimpleCondition();
private final long queryStartNanoTime;
- // TODO: move to replica layout as well?
- final int blockfor;
- final L replicaLayout;
+ final int blockFor; // TODO: move to replica plan as well?
+ // this uses a plain reference, but is initialised before handoff to any other threads; the later updates
+ // may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object
+ final ReplicaPlan.Shared<E, P> replicaPlan;
private final ReadCommand command;
private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
= AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
@@ -63,19 +63,24 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
- public ReadCallback(ResponseResolver resolver, int blockfor, ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
this.command = command;
- this.blockfor = blockfor;
this.resolver = resolver;
this.queryStartNanoTime = queryStartNanoTime;
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
+ this.blockFor = replicaPlan.get().blockFor();
this.failureReasonByEndpoint = new ConcurrentHashMap<>();
// we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
- assert !(command instanceof PartitionRangeReadCommand) || blockfor >= replicaLayout.selected().size();
+ assert !(command instanceof PartitionRangeReadCommand) || blockFor >= replicaPlan().contacts().size();
if (logger.isTraceEnabled())
- logger.trace("Blockfor is {}; setting up requests to {}", blockfor, this.replicaLayout);
+ logger.trace("Blockfor is {}; setting up requests to {}", blockFor, this.replicaPlan);
+ }
+
+ protected P replicaPlan()
+ {
+ return replicaPlan.get();
}
public boolean await(long timePastStart, TimeUnit unit)
@@ -94,30 +99,30 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
- boolean failed = failures > 0 && blockfor + failures > replicaLayout.selected().size();
+ boolean failed = failures > 0 && blockFor + failures > replicaPlan().contacts().size();
if (signaled && !failed)
return;
if (Tracing.isTracing())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
- Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+ Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
}
else if (logger.isDebugEnabled())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
- logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+ logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
}
// Same as for writes, see AbstractWriteResponseHandler
throw failed
- ? new ReadFailureException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
- : new ReadTimeoutException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent());
+ ? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
+ : new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent());
}
public int blockFor()
{
- return blockfor;
+ return blockFor;
}
public void response(MessageIn<ReadResponse> message)
@@ -127,24 +132,16 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
? recievedUpdater.incrementAndGet(this)
: received;
- if (n >= blockfor && resolver.isDataPresent())
+ if (n >= blockFor && resolver.isDataPresent())
condition.signalAll();
}
/**
- * @return true if the message counts towards the blockfor threshold
+ * @return true if the message counts towards the blockFor threshold
*/
private boolean waitingFor(InetAddressAndPort from)
{
- return !replicaLayout.consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
- }
-
- /**
- * @return the current number of received responses
- */
- public int getReceivedCount()
- {
- return received;
+ return !replicaPlan().consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
}
public void response(ReadResponse result)
@@ -157,11 +154,6 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
response(message);
}
- public void assureSufficientLiveNodes() throws UnavailableException
- {
- replicaLayout.consistencyLevel().assureSufficientLiveNodesForRead(replicaLayout.keyspace(), replicaLayout.selected());
- }
-
public boolean isLatencyForSnitch()
{
return true;
@@ -176,7 +168,7 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
failureReasonByEndpoint.put(from, failureReason);
- if (blockfor + n > replicaLayout.selected().size())
+ if (blockFor + n > replicaPlan().contacts().size())
condition.signalAll();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 298f843..0c1e1ba 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -23,30 +23,34 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.concurrent.Accumulator;
-public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public abstract class ResponseResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
{
protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
protected final ReadCommand command;
- protected final L replicaLayout;
+ protected final ReplicaPlan.Shared<E, P> replicaPlan;
// Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
protected final Accumulator<MessageIn<ReadResponse>> responses;
protected final long queryStartNanoTime;
- public ResponseResolver(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public ResponseResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
this.command = command;
- this.replicaLayout = replicaLayout;
- // TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes)
- this.responses = new Accumulator<>(replicaLayout.all().size());
+ this.replicaPlan = replicaPlan;
+ this.responses = new Accumulator<>(replicaPlan.get().candidates().size());
this.queryStartNanoTime = queryStartNanoTime;
}
+ protected P replicaPlan()
+ {
+ return replicaPlan.get();
+ }
+
public abstract boolean isDataPresent();
public void preprocess(MessageIn<ReadResponse> message)
@@ -57,8 +61,8 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica
}
catch (IllegalStateException e)
{
- logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica layout: {}",
- message, command, replicaLayout);
+ logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica plan: {}",
+ message, command, replicaPlan);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 580b790..b16d105 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -19,6 +19,8 @@
package org.apache.cassandra.service.reads;
import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +42,6 @@ import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.ExcludingBounds;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
@@ -87,10 +88,11 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
* If we don't apply the transformation *after* extending the partition with MoreRows,
* applyToRow() method of protection will not be called on the first row of the new extension iterator.
*/
- ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
+ ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
+ ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
command, source,
- (cmd) -> executeReadCommand(cmd, replicaLayout),
+ (cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
singleResultCounter,
mergedResultCounter);
return Transformation.apply(MoreRows.extend(partition, protection), protection);
@@ -169,14 +171,15 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
: new ExcludingBounds<>(lastPartitionKey, bounds.right);
DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
- ReplicaLayout.ForRange replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
- return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), replicaLayout);
+ ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
+ return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan));
}
- private <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, L replicaLayout)
+ private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan)
{
- DataResolver<E, L> resolver = new DataResolver<>(cmd, replicaLayout, (NoopReadRepair<E, L>)NoopReadRepair.instance, queryStartNanoTime);
- ReadCallback<E, L> handler = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), cmd, replicaLayout, queryStartNanoTime);
+ DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
+ ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
if (source.isLocal())
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 528d31b..1b213ff 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
import com.google.common.base.Preconditions;
import com.codahale.metrics.Meter;
+import com.google.common.base.Predicates;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -34,7 +35,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.tracing.Tracing;
-public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ implements ReadRepair<E, P>
{
protected final ReadCommand command;
protected final long queryStartNanoTime;
- protected final L replicaLayout;
+ protected final ReplicaPlan.Shared<E, P> replicaPlan;
protected final ColumnFamilyStore cfs;
private volatile DigestRepair digestRepair = null;
@@ -68,15 +70,20 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
}
public AbstractReadRepair(ReadCommand command,
- L replicaLayout,
+ ReplicaPlan.Shared<E, P> replicaPlan,
long queryStartNanoTime)
{
this.command = command;
this.queryStartNanoTime = queryStartNanoTime;
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
this.cfs = Keyspace.openAndGetStore(command.metadata());
}
+ protected P replicaPlan()
+ {
+ return replicaPlan.get();
+ }
+
void sendReadCommand(Replica to, ReadCallback readCallback)
{
MessageOut<ReadCommand> message = command.createMessage();
@@ -90,14 +97,13 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
abstract Meter getRepairMeter();
// digestResolver isn't used here because we resend read requests to all participants
- public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
+ public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer)
{
getRepairMeter().mark();
// Do a full data read to resolve the correct response (and repair node that need be)
- DataResolver<E, L> resolver = new DataResolver<>(command, replicaLayout, this, queryStartNanoTime);
- ReadCallback<E, L> readCallback = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(cfs.keyspace),
- command, replicaLayout, queryStartNanoTime);
+ DataResolver<E, P> resolver = new DataResolver<>(command, replicaPlan, this, queryStartNanoTime);
+ ReadCallback<E, P> readCallback = new ReadCallback<>(resolver, command, replicaPlan, queryStartNanoTime);
digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
@@ -105,12 +111,12 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
command.trackRepairedStatus();
- for (Replica replica : replicaLayout.selected())
+ for (Replica replica : replicaPlan().contacts())
{
Tracing.trace("Enqueuing full data read to {}", replica);
sendReadCommand(replica, readCallback);
}
- ReadRepairDiagnostics.startRepair(this, replicaLayout, digestResolver);
+ ReadRepairDiagnostics.startRepair(this, replicaPlan(), digestResolver);
}
public void awaitReads() throws ReadTimeoutException
@@ -125,7 +131,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
private boolean shouldSpeculate()
{
- ConsistencyLevel consistency = replicaLayout.consistencyLevel();
+ ConsistencyLevel consistency = replicaPlan().consistencyLevel();
ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
return consistency != ConsistencyLevel.EACH_QUORUM
&& consistency.satisfies(speculativeCL, cfs.keyspace)
@@ -142,15 +148,15 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS))
{
- L uncontacted = replicaLayout.forNaturalUncontacted();
- if (uncontacted.selected().isEmpty())
+ Replica uncontacted = replicaPlan().firstUncontactedCandidate(Predicates.alwaysTrue());
+ if (uncontacted == null)
return;
- Replica replica = uncontacted.selected().iterator().next();
- Tracing.trace("Enqueuing speculative full data read to {}", replica);
- sendReadCommand(replica, repair.readCallback);
+ replicaPlan.addToContacts(uncontacted);
+ Tracing.trace("Enqueuing speculative full data read to {}", uncontacted);
+ sendReadCommand(uncontacted, repair.readCallback);
ReadRepairMetrics.speculatedRead.mark();
- ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted);
+ ReadRepairDiagnostics.speculatedRead(this, uncontacted.endpoint(), replicaPlan());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 54af2cf..f536ea8 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -37,9 +37,9 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.IAsyncCallback;
@@ -49,25 +49,26 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.tracing.Tracing;
-public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractFuture<Object> implements IAsyncCallback<Object>
+public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends AbstractFuture<Object> implements IAsyncCallback<Object>
{
private final DecoratedKey key;
- private final L replicaLayout;
+ private final P replicaPlan;
private final Map<Replica, Mutation> pendingRepairs;
private final CountDownLatch latch;
private volatile long mutationsSentTime;
- public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+ public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
{
this.key = key;
this.pendingRepairs = new ConcurrentHashMap<>(repairs);
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
// here we remove empty repair mutations from the block for total, since
// we're not sending them mutations
int blockFor = maxBlockFor;
- for (Replica participant: replicaLayout.selected())
+ for (Replica participant: replicaPlan.contacts())
{
// remote dcs can sometimes get involved in dc-local reads. We want to repair
// them if they do, but they shouldn't interfere with blocking the client read.
@@ -97,7 +98,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
private boolean shouldBlockOn(InetAddressAndPort endpoint)
{
- return !replicaLayout.consistencyLevel().isDatacenterLocal() || isLocal(endpoint);
+ return !replicaPlan.consistencyLevel().isDatacenterLocal() || isLocal(endpoint);
}
@VisibleForTesting
@@ -105,7 +106,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
{
if (shouldBlockOn(from))
{
- pendingRepairs.remove(replicaLayout.getReplicaFor(from));
+ pendingRepairs.remove(replicaPlan.getReplicaFor(from));
latch.countDown();
}
}
@@ -198,8 +199,8 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
if (awaitRepairs(timeout, timeoutUnit))
return;
- L newCandidates = replicaLayout.forNaturalUncontacted();
- if (newCandidates.selected().isEmpty())
+ E newCandidates = replicaPlan.uncontactedCandidates();
+ if (newCandidates.isEmpty())
return;
PartitionUpdate update = mergeUnackedUpdates();
@@ -212,7 +213,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
Mutation[] versionedMutations = new Mutation[msgVersionIdx(MessagingService.current_version) + 1];
- for (Replica replica : newCandidates.selected())
+ for (Replica replica : newCandidates)
{
int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(replica.endpoint()));
@@ -220,7 +221,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
if (mutation == null)
{
- mutation = BlockingReadRepairs.createRepairMutation(update, replicaLayout.consistencyLevel(), replica.endpoint(), true);
+ mutation = BlockingReadRepairs.createRepairMutation(update, replicaPlan.consistencyLevel(), replica.endpoint(), true);
versionedMutations[versionIdx] = mutation;
}
@@ -239,7 +240,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
Keyspace getKeyspace()
{
- return replicaLayout.keyspace();
+ return replicaPlan.keyspace();
}
DecoratedKey getKey()
@@ -249,6 +250,6 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
ConsistencyLevel getConsistency()
{
- return replicaLayout.consistencyLevel();
+ return replicaPlan.consistencyLevel();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 402aed0..938abaf 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.locator.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +33,9 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.tracing.Tracing;
@@ -44,22 +44,23 @@ import org.apache.cassandra.tracing.Tracing;
* updates have been written to nodes needing correction. Breaks write
* atomicity in some situations
*/
-public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
+public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends AbstractReadRepair<E, P>
{
private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
private final int blockFor;
- BlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
- this.blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
+ super(command, replicaPlan, queryStartNanoTime);
+ this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
}
- public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
{
- return new PartitionIteratorMergeListener(replicaLayout, command, this.replicaLayout.consistencyLevel(), this);
+ return new PartitionIteratorMergeListener<>(replicaPlan, command, this);
}
@Override
@@ -91,20 +92,20 @@ public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
if (timedOut)
{
// We got all responses, but timed out while repairing
- int blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
+ int blockFor = replicaPlan().blockFor();
if (Tracing.isTracing())
Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
else
logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
- throw new ReadTimeoutException(replicaLayout.consistencyLevel(), blockFor - 1, blockFor, true);
+ throw new ReadTimeoutException(replicaPlan().consistencyLevel(), blockFor - 1, blockFor, true);
}
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
{
- BlockingPartitionRepair<E, L> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaLayout);
+ BlockingPartitionRepair<E, P> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan);
blockingRepair.sendInitialRepairs();
repairs.add(blockingRepair);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 4af4a92..6aa6ece 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -26,28 +26,29 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.DigestResolver;
/**
* Bypasses the read repair path for short read protection and testing
*/
-public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ implements ReadRepair<E, P>
{
public static final NoopReadRepair instance = new NoopReadRepair();
private NoopReadRepair() {}
@Override
- public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicas)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicas)
{
return UnfilteredPartitionIterators.MergeListener.NOOP;
}
@Override
- public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
+ public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer)
{
resultConsumer.accept(digestResolver.getData());
}
@@ -75,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 4cae3ae..7247704 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -28,26 +28,26 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
-public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener
+public class PartitionIteratorMergeListener<E extends Endpoints<E>>
+ implements UnfilteredPartitionIterators.MergeListener
{
- private final ReplicaLayout replicaLayout;
+ private final ReplicaPlan.ForRead<E> replicaPlan;
private final ReadCommand command;
- private final ConsistencyLevel consistency;
private final ReadRepair readRepair;
- public PartitionIteratorMergeListener(ReplicaLayout replicaLayout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+ public PartitionIteratorMergeListener(ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
{
- this.replicaLayout = replicaLayout;
+ this.replicaPlan = replicaPlan;
this.command = command;
- this.consistency = consistency;
this.readRepair = readRepair;
}
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
{
- return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), replicaLayout, command, consistency, readRepair);
+ return new RowIteratorMergeListener<>(partitionKey, columns(versions), isReversed(versions), replicaPlan, command, readRepair);
}
protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index c13e2d6..64bfec2 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -27,22 +27,23 @@ import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
/**
* Only performs the collection of data responses and reconciliation of them, doesn't send repair mutations
* to replicas. This preserves write atomicity, but doesn't provide monotonic quorum reads
*/
-public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
+public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ extends AbstractReadRepair<E, P>
{
- ReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ ReadOnlyReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- super(command, replicaLayout, queryStartNanoTime);
+ super(command, replicaPlan, queryStartNanoTime);
}
@Override
- public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
{
return UnfilteredPartitionIterators.MergeListener.NOOP;
}
@@ -60,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
{
throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 168f003..9441945 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service.reads.repair;
import java.util.Map;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.locator.Endpoints;
@@ -29,17 +30,19 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.DigestResolver;
-public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
{
public interface Factory
{
- <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime);
+ <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime);
}
- static <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaPlan, long queryStartNanoTime)
+ static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
return command.metadata().params.readRepair.create(command, replicaPlan, queryStartNanoTime);
}
@@ -47,7 +50,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
/**
* Used by DataResolver to generate corrections as the partition iterator is consumed
*/
- UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout);
+ UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan);
/**
* Called when the digests from the initial read don't match. Reads may block on the
@@ -55,7 +58,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
* @param digestResolver supplied so we can get the original data response
* @param resultConsumer hook for the repair to set it's result on completion
*/
- public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer);
+ public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer);
/**
* Block on the reads (or timeout) sent out in {@link ReadRepair#startRepair}
@@ -90,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
* Repairs a partition _after_ receiving data responses. This method receives replica list, since
* we will block repair only on the replicas that have responded.
*/
- void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout);
+ void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
index 4c74a89..b9167bd 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.diag.DiagnosticEventService;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType;
import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
@@ -38,22 +39,22 @@ final class ReadRepairDiagnostics
{
}
- static void startRepair(AbstractReadRepair readRepair, ReplicaLayout<?, ?> layout, DigestResolver digestResolver)
+ static void startRepair(AbstractReadRepair readRepair, ReplicaPlan.ForRead<?> fullPlan, DigestResolver digestResolver)
{
if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR))
service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR,
readRepair,
- layout.selected().endpoints(),
- layout.all().endpoints(), digestResolver));
+ fullPlan.contacts().endpoints(),
+ fullPlan.candidates().endpoints(), digestResolver));
}
static void speculatedRead(AbstractReadRepair readRepair, InetAddressAndPort endpoint,
- ReplicaLayout<?, ?> replicaLayout)
+ ReplicaPlan.ForRead<?> fullPlan)
{
if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.SPECULATED_READ))
service.publish(new ReadRepairEvent(ReadRepairEventType.SPECULATED_READ,
readRepair, Collections.singletonList(endpoint),
- Lists.newArrayList(replicaLayout.all().endpoints()), null));
+ Lists.newArrayList(fullPlan.candidates().endpoints()), null));
}
static void sendInitialRepair(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
index 9e14362..5cec802 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
@@ -67,7 +67,7 @@ final class ReadRepairEvent extends DiagnosticEvent
this.keyspace = readRepair.cfs.keyspace;
this.tableName = readRepair.cfs.getTableName();
this.cqlCommand = readRepair.command.toCQLString();
- this.consistency = readRepair.replicaLayout.consistencyLevel();
+ this.consistency = readRepair.replicaPlan().consistencyLevel();
this.speculativeRetry = readRepair.cfs.metadata().params.speculativeRetry.kind();
this.destinations = destinations;
this.allEndpoints = allEndpoints;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
index 28c0e9e..7a4b795 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
@@ -21,22 +21,25 @@ package org.apache.cassandra.service.reads.repair;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
public enum ReadRepairStrategy implements ReadRepair.Factory
{
NONE
{
- public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- return new ReadOnlyReadRepair<>(command, replicaLayout, queryStartNanoTime);
+ return new ReadOnlyReadRepair<>(command, replicaPlan, queryStartNanoTime);
}
},
BLOCKING
{
- public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+ public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+ ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
- return new BlockingReadRepair<>(command, replicaLayout, queryStartNanoTime);
+ return new BlockingReadRepair<>(command, replicaPlan, queryStartNanoTime);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 7fe797a..60e0d41 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -46,21 +46,21 @@ import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.schema.ColumnMetadata;
-public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener
+public class RowIteratorMergeListener<E extends Endpoints<E>>
+ implements UnfilteredRowIterators.MergeListener
{
private final DecoratedKey partitionKey;
private final RegularAndStaticColumns columns;
private final boolean isReversed;
private final ReadCommand command;
- private final ConsistencyLevel consistency;
private final PartitionUpdate.Builder[] repairs;
private final Row.Builder[] currentRows;
private final RowDiffListener diffListener;
- private final ReplicaLayout layout;
+ private final ReplicaPlan.ForRead<E> replicaPlan;
// The partition level deletion for the merge row.
private DeletionTime partitionLevelDeletion;
@@ -73,19 +73,18 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
private final ReadRepair readRepair;
- public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaLayout layout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+ public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
{
this.partitionKey = partitionKey;
this.columns = columns;
this.isReversed = isReversed;
- this.layout = layout;
- int size = layout.selected().size();
+ this.replicaPlan = replicaPlan;
+ int size = replicaPlan.contacts().size();
repairs = new PartitionUpdate.Builder[size];
currentRows = new Row.Builder[size];
sourceDeletionTime = new DeletionTime[size];
markerToRepair = new ClusteringBound[size];
this.command = command;
- this.consistency = consistency;
this.readRepair = readRepair;
this.diffListener = new RowDiffListener()
@@ -310,7 +309,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
public void close()
{
Map<Replica, Mutation> mutations = null;
- Endpoints<?> sources = layout.selected();
+ Endpoints<?> sources = replicaPlan.contacts();
for (int i = 0; i < repairs.length; i++)
{
if (repairs[i] == null)
@@ -318,7 +317,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
Replica source = sources.get(i);
- Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, source.endpoint(), false);
+ Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), replicaPlan.consistencyLevel(), source.endpoint(), false);
if (mutation == null)
continue;
@@ -330,7 +329,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
if (mutations != null)
{
- readRepair.repairPartition(partitionKey, mutations, layout);
+ readRepair.repairPartition(partitionKey, mutations, replicaPlan);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
index 66eff23..f937f96 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -22,7 +22,6 @@ import com.google.common.base.Predicates;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -34,13 +33,14 @@ import org.junit.Assert;
import org.junit.Test;
import java.net.UnknownHostException;
-import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static com.google.common.collect.Iterables.*;
+import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.locator.Replica.fullReplica;
import static org.apache.cassandra.locator.Replica.transientReplica;
@@ -111,7 +111,7 @@ public class ReplicaCollectionTest
void testEquals()
{
- Assert.assertTrue(Iterables.elementsEqual(canonicalList, test));
+ Assert.assertTrue(elementsEqual(canonicalList, test));
}
void testEndpoints()
@@ -144,28 +144,6 @@ public class ReplicaCollectionTest
Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints());
}
- void testSelect(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
- {
- TestCase<C> allMatchZeroCapacity = new TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), Collections.emptyList());
- allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, selectDepth - 1);
-
- TestCase<C> noMatchFullCapacity = new TestCase<>(test.select().add(Predicates.alwaysFalse(), canonicalList.size()).get(), Collections.emptyList());
- noMatchFullCapacity.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
-
- if (canonicalList.size() <= 2)
- return;
-
- List<Replica> newOrderList = ImmutableList.of(canonicalList.get(2), canonicalList.get(1), canonicalList.get(0));
- TestCase<C> newOrder = new TestCase<>(
- test.select()
- .add(r -> r == newOrderList.get(0), 3)
- .add(r -> r == newOrderList.get(1), 3)
- .add(r -> r == newOrderList.get(2), 3)
- .get(), newOrderList
- );
- newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
- }
-
private void assertSubList(C subCollection, int from, int to)
{
Assert.assertTrue(subCollection.isSnapshot);
@@ -182,7 +160,7 @@ public class ReplicaCollectionTest
}
}
- void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ void testSubList(int subListDepth, int filterDepth, int sortDepth)
{
if (test.isSnapshot)
Assert.assertSame(test, test.subList(0, test.size()));
@@ -192,34 +170,62 @@ public class ReplicaCollectionTest
TestCase<C> skipFront = new TestCase<>(test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size()));
assertSubList(skipFront.test, 1, canonicalList.size());
- skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+ skipFront.testAll(subListDepth - 1, filterDepth, sortDepth);
TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1));
assertSubList(skipBack.test, 0, canonicalList.size() - 1);
- skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+ skipBack.testAll(subListDepth - 1, filterDepth, sortDepth);
}
- void testFilter(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ void testFilter(int subListDepth, int filterDepth, int sortDepth)
{
if (test.isSnapshot)
Assert.assertSame(test, test.filter(Predicates.alwaysTrue()));
if (test.isEmpty())
return;
+
// remove start
// we recurse on the same subset in testSubList, so just corroborate we have the correct list here
- assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, canonicalList.size());
+ {
+ Predicate<Replica> removeFirst = r -> r != canonicalList.get(0);
+ assertSubList(test.filter(removeFirst), 1, canonicalList.size());
+ assertSubList(test.filter(removeFirst, 1), 1, Math.min(canonicalList.size(), 2));
+ }
if (test.size() <= 1)
return;
+
// remove end
// we recurse on the same subset in testSubList, so just corroborate we have the correct list here
- assertSubList(test.filter(r -> r != canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1);
+ {
+ int last = canonicalList.size() - 1;
+ Predicate<Replica> removeLast = r -> r != canonicalList.get(last);
+ assertSubList(test.filter(removeLast), 0, last);
+ }
if (test.size() <= 2)
return;
+
Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2);
- TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test)));
- filtered.testAll(subListDepth, filterDepth - 1, sortDepth, selectDepth);
+ TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test)));
+ filtered.testAll(subListDepth, filterDepth - 1, sortDepth);
+ }
+
+ void testCount()
+ {
+ Assert.assertEquals(0, test.count(Predicates.alwaysFalse()));
+
+ if (test.isEmpty())
+ {
+ Assert.assertEquals(0, test.count(Predicates.alwaysTrue()));
+ return;
+ }
+
+ for (int i = 0 ; i < canonicalList.size() ; ++i)
+ {
+ Replica discount = canonicalList.get(i);
+ Assert.assertEquals(canonicalList.size() - 1, test.count(r -> r != discount));
+ }
}
void testContains()
@@ -235,7 +241,7 @@ public class ReplicaCollectionTest
Assert.assertEquals(canonicalList.get(i), test.get(i));
}
- void testSort(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ void testSort(int subListDepth, int filterDepth, int sortDepth)
{
final Comparator<Replica> comparator = (o1, o2) ->
{
@@ -244,10 +250,10 @@ public class ReplicaCollectionTest
return f1 == f2 ? 0 : f1 ? 1 : -1;
};
TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList));
- sorted.testAll(subListDepth, filterDepth, sortDepth - 1, selectDepth);
+ sorted.testAll(subListDepth, filterDepth, sortDepth - 1);
}
- private void testAll(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ private void testAll(int subListDepth, int filterDepth, int sortDepth)
{
testEndpoints();
testOrderOfIteration();
@@ -255,19 +261,18 @@ public class ReplicaCollectionTest
testGet();
testEquals();
testSize();
+ testCount();
if (subListDepth > 0)
- testSubList(subListDepth, filterDepth, sortDepth, selectDepth);
+ testSubList(subListDepth, filterDepth, sortDepth);
if (filterDepth > 0)
- testFilter(subListDepth, filterDepth, sortDepth, selectDepth);
+ testFilter(subListDepth, filterDepth, sortDepth);
if (sortDepth > 0)
- testSort(subListDepth, filterDepth, sortDepth, selectDepth);
- if (selectDepth > 0)
- testSelect(subListDepth, filterDepth, sortDepth, selectDepth);
+ testSort(subListDepth, filterDepth, sortDepth);
}
public void testAll()
{
- testAll(2, 2, 2, 2);
+ testAll(2, 2, 2);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org