You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/02/04 14:07:14 UTC
[cassandra] 02/02: Restore monotonic read consistency guarantees
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0f22dab1a015cb84d9857f940de5a256bfbee083
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Thu Jul 18 14:46:06 2019 +0100
Restore monotonic read consistency guarantees
patch by Benedict; reviewed by Sam Tunnicliffe for CASSANDRA-14740
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ConsistencyLevel.java | 2 +-
.../cassandra/locator/ReplicaCollection.java | 5 +
.../org/apache/cassandra/locator/ReplicaPlan.java | 13 ++-
.../org/apache/cassandra/locator/ReplicaPlans.java | 69 +++++++++++-
.../cassandra/service/reads/DigestResolver.java | 8 +-
.../cassandra/service/reads/ResponseResolver.java | 2 +-
.../reads/repair/BlockingPartitionRepair.java | 38 ++++---
.../service/reads/repair/BlockingReadRepair.java | 24 ++--
.../service/reads/repair/NoopReadRepair.java | 2 +-
.../service/reads/repair/ReadOnlyReadRepair.java | 2 +-
.../cassandra/service/reads/repair/ReadRepair.java | 2 +-
.../reads/repair/RowIteratorMergeListener.java | 123 ++++++++++++++------
.../distributed/impl/AbstractCluster.java | 20 ++++
.../distributed/test/DistributedTestBase.java | 7 +-
.../cassandra/distributed/test/ReadRepairTest.java | 125 +++++++++++++++++++++
...WritePathTest.java => SimpleReadWriteTest.java} | 4 +-
.../service/reads/AbstractReadResponseTest.java | 19 +++-
.../cassandra/service/reads/DataResolverTest.java | 55 ++++++---
.../reads/repair/AbstractReadRepairTest.java | 36 ++++++
.../reads/repair/BlockingReadRepairTest.java | 34 +++---
.../repair/DiagEventsBlockingReadRepairTest.java | 21 ++--
.../reads/repair/ReadOnlyReadRepairTest.java | 7 +-
.../service/reads/repair/ReadRepairTest.java | 30 +++--
.../service/reads/repair/TestableReadRepair.java | 2 +-
25 files changed, 495 insertions(+), 156 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 75fae01..6efa148 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha3
+ * Restore monotonic read consistency guarantees for blocking read repair (CASSANDRA-14740)
* Separate exceptions for CAS write timeout exceptions caused by contention and unkown result (CASSANDRA-15350)
* Fix in-jvm dtest java 11 compatibility (CASSANDRA-15463)
* Remove joda time dependency (CASSANDRA-15257)
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 4973915..e685618 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -101,7 +101,7 @@ public enum ConsistencyLevel
public static ObjectIntOpenHashMap<String> eachQuorumForRead(Keyspace keyspace)
{
NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
- ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size());
+ ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(((strategy.getDatacenters().size() + 1) * 4) / 3);
for (String dc : strategy.getDatacenters())
perDc.put(dc, ConsistencyLevel.localQuorumFor(keyspace, dc));
return perDc;
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
index d870316..ec671d5 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -125,6 +125,11 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
public C build();
/**
+ * @return an Immutable clone that assumes this Builder will be modified again
+ */
+ public C snapshot();
+
+ /**
* Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type
* (for Endpoints, it is a duplicate InetAddressAndPort; for RangesAtEndpoint it is a duplicate Range).
*/
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 861c912..16af58a 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -52,6 +52,8 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
public abstract int blockFor();
public E contacts() { return contacts; }
+
+ // TODO: should this semantically return true if we contain the endpoint, not the exact replica?
public boolean contacts(Replica replica) { return contacts.contains(replica); }
public Keyspace keyspace() { return keyspace; }
public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
@@ -72,17 +74,12 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
public E candidates() { return candidates; }
- public E uncontactedCandidates()
- {
- return candidates().filter(r -> !contacts(r));
- }
-
public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate)
{
return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull();
}
- public Replica getReplicaFor(InetAddressAndPort endpoint)
+ public Replica lookup(InetAddressAndPort endpoint)
{
return candidates().byEndpoint().get(endpoint);
}
@@ -151,6 +148,10 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
public E liveUncontacted() { return live().filter(r -> !contacts(r)); }
/** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */
public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); }
+ public Replica lookup(InetAddressAndPort endpoint)
+ {
+ return liveAndDown().byEndpoint().get(endpoint);
+ }
public String toString()
{
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index a6fe53f..236706a 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -320,6 +320,11 @@ public class ReplicaPlans
return result;
}
+ public static ReplicaPlan.ForTokenWrite forReadRepair(Token token, ReplicaPlan.ForRead<?> readPlan) throws UnavailableException
+ {
+ return forWrite(readPlan.keyspace, readPlan.consistencyLevel, token, writeReadRepair(readPlan));
+ }
+
public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
{
return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
@@ -345,7 +350,7 @@ public class ReplicaPlans
public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
{
- EndpointsForToken contacts = selector.select(keyspace, liveAndDown, live);
+ EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending());
return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
}
@@ -353,7 +358,7 @@ public class ReplicaPlans
public interface Selector
{
<E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
- E select(Keyspace keyspace, L liveAndDown, L live);
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
}
/**
@@ -366,7 +371,7 @@ public class ReplicaPlans
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
- E select(Keyspace keyspace, L liveAndDown, L live)
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
{
return liveAndDown.all();
}
@@ -385,7 +390,7 @@ public class ReplicaPlans
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
- E select(Keyspace keyspace, L liveAndDown, L live)
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
{
if (!any(liveAndDown.all(), Replica::isTransient))
return liveAndDown.all();
@@ -417,6 +422,62 @@ public class ReplicaPlans
};
/**
+ * TODO: Transient Replication C-14404/C-14665
+ * TODO: We employ this even when there is no monotonicity to guarantee,
+ * e.g. in case of CL.TWO, CL.ONE with speculation, etc.
+ *
+ * Construct a read-repair write plan to provide monotonicity guarantees on any data we return as part of a read.
+ *
+ * Since this is not a regular write, this is just to guarantee future reads will read this data, we select only
+ * the minimal number of nodes to meet the consistency level, and prefer nodes we contacted on read to minimise
+ * data transfer.
+ */
+ public static Selector writeReadRepair(ReplicaPlan.ForRead<?> readPlan)
+ {
+ return new Selector()
+ {
+ @Override
+ public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+ {
+ assert !any(liveAndDown.all(), Replica::isTransient);
+
+ ReplicaCollection.Builder<E> contacts = live.all().newBuilder(live.all().size());
+ // add all live nodes we might write to that we have already contacted on read
+ contacts.addAll(filter(live.all(), r -> readPlan.contacts().endpoints().contains(r.endpoint())));
+
+ // finally, add sufficient nodes to achieve our consistency level
+ if (consistencyLevel != EACH_QUORUM)
+ {
+ int add = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - contacts.size();
+ if (add > 0)
+ {
+ for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
+ {
+ contacts.add(replica);
+ if (--add == 0)
+ break;
+ }
+ }
+ }
+ else
+ {
+ ObjectIntOpenHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending());
+ addToCountPerDc(requiredPerDc, contacts.snapshot(), -1);
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
+ {
+ String dc = snitch.getDatacenter(replica);
+ if (requiredPerDc.addTo(dc, -1) >= 0)
+ contacts.add(replica);
+ }
+ }
+ return contacts.build();
+ }
+ };
+ }
+
+ /**
* Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
* but for the paxos linearisation agreement.
*
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index cf7ec31..dbb761b 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -54,7 +54,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
public void preprocess(Message<ReadResponse> message)
{
super.preprocess(message);
- Replica replica = replicaPlan().getReplicaFor(message.from());
+ Replica replica = replicaPlan().lookup(message.from());
if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
dataResponse = message;
}
@@ -69,7 +69,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
{
return any(responses,
msg -> !msg.payload.isDigestResponse()
- && replicaPlan().getReplicaFor(msg.from()).isTransient());
+ && replicaPlan().lookup(msg.from()).isTransient());
}
public PartitionIterator getData()
@@ -93,7 +93,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
// Reconcile with transient replicas
for (Message<ReadResponse> response : responses)
{
- Replica replica = replicaPlan().getReplicaFor(response.from());
+ Replica replica = replicaPlan().lookup(response.from());
if (replica.isTransient())
dataResolver.preprocess(response);
}
@@ -115,7 +115,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
// TODO: should also not calculate if only one full node
for (Message<ReadResponse> message : snapshot)
{
- if (replicaPlan().getReplicaFor(message.from()).isTransient())
+ if (replicaPlan().lookup(message.from()).isTransient())
continue;
ByteBuffer newDigest = message.payload.digest(command);
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 8e15c1a..6ae19ac 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -55,7 +55,7 @@ public abstract class ResponseResolver<E extends Endpoints<E>, P extends Replica
public void preprocess(Message<ReadResponse> message)
{
- if (replicaPlan().getReplicaFor(message.from()).isTransient() &&
+ if (replicaPlan().lookup(message.from()).isTransient() &&
message.payload.isDigestResponse())
throw new IllegalArgumentException("Digest response received from transient replica");
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 01fd7f0..edcf14d 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
@@ -53,33 +54,33 @@ import org.apache.cassandra.tracing.Tracing;
import static org.apache.cassandra.net.Verb.*;
-public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+public class BlockingPartitionRepair
extends AbstractFuture<Object> implements RequestCallback<Object>
{
private final DecoratedKey key;
- private final P replicaPlan;
+ private final ReplicaPlan.ForTokenWrite writePlan;
private final Map<Replica, Mutation> pendingRepairs;
private final CountDownLatch latch;
private final Predicate<InetAddressAndPort> shouldBlockOn;
private volatile long mutationsSentTime;
- public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+ public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- this(key, repairs, maxBlockFor, replicaPlan,
- replicaPlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue());
+ this(key, repairs, writePlan,
+ writePlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue());
}
- public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan, Predicate<InetAddressAndPort> shouldBlockOn)
+ public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan, Predicate<InetAddressAndPort> shouldBlockOn)
{
this.key = key;
this.pendingRepairs = new ConcurrentHashMap<>(repairs);
- this.replicaPlan = replicaPlan;
+ this.writePlan = writePlan;
this.shouldBlockOn = shouldBlockOn;
+ int blockFor = writePlan.blockFor();
// here we remove empty repair mutations from the block for total, since
// we're not sending them mutations
- int blockFor = maxBlockFor;
- for (Replica participant: replicaPlan.contacts())
+ for (Replica participant : writePlan.contacts())
{
// remote dcs can sometimes get involved in dc-local reads. We want to repair
// them if they do, but they shouldn't interfere with blocking the client read.
@@ -95,10 +96,15 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
latch = new CountDownLatch(Math.max(blockFor, 0));
}
+ int blockFor()
+ {
+ return writePlan.blockFor();
+ }
+
@VisibleForTesting
- long waitingOn()
+ int waitingOn()
{
- return latch.getCount();
+ return (int) latch.getCount();
}
@VisibleForTesting
@@ -106,7 +112,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
{
if (shouldBlockOn.test(from))
{
- pendingRepairs.remove(replicaPlan.getReplicaFor(from));
+ pendingRepairs.remove(writePlan.lookup(from));
latch.countDown();
}
}
@@ -199,7 +205,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit))
return;
- E newCandidates = replicaPlan.uncontactedCandidates();
+ EndpointsForToken newCandidates = writePlan.liveUncontacted();
if (newCandidates.isEmpty())
return;
@@ -221,7 +227,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
if (mutation == null)
{
- mutation = BlockingReadRepairs.createRepairMutation(update, replicaPlan.consistencyLevel(), replica.endpoint(), true);
+ mutation = BlockingReadRepairs.createRepairMutation(update, writePlan.consistencyLevel(), replica.endpoint(), true);
versionedMutations[versionIdx] = mutation;
}
@@ -240,7 +246,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
Keyspace getKeyspace()
{
- return replicaPlan.keyspace();
+ return writePlan.keyspace();
}
DecoratedKey getKey()
@@ -250,6 +256,6 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
ConsistencyLevel getConsistency()
{
- return replicaPlan.consistencyLevel();
+ return writePlan.consistencyLevel();
}
}
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 764765e..fdc8b50 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.tracing.Tracing;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
@@ -53,12 +52,10 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
- private final int blockFor;
BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
super(command, replicaPlan, queryStartNanoTime);
- this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
}
public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
@@ -84,31 +81,34 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
@Override
public void awaitWrites()
{
- boolean timedOut = false;
- for (BlockingPartitionRepair repair: repairs)
+ BlockingPartitionRepair timedOut = null;
+ for (BlockingPartitionRepair repair : repairs)
{
if (!repair.awaitRepairsUntil(DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) + queryStartNanoTime, NANOSECONDS))
{
- timedOut = true;
+ timedOut = repair;
+ break;
}
}
- if (timedOut)
+ if (timedOut != null)
{
- // We got all responses, but timed out while repairing
- int blockFor = replicaPlan().blockFor();
+ // We got all responses, but timed out while repairing;
+ // pick one of the repairs to throw, as this is better than completely manufacturing the error message
+ int blockFor = timedOut.blockFor();
+ int received = Math.min(blockFor - timedOut.waitingOn(), blockFor - 1);
if (Tracing.isTracing())
Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
else
logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
- throw new ReadTimeoutException(replicaPlan().consistencyLevel(), blockFor - 1, blockFor, true);
+ throw new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, true);
}
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
{
- BlockingPartitionRepair<E, P> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan);
+ BlockingPartitionRepair blockingRepair = new BlockingPartitionRepair(partitionKey, mutations, writePlan);
blockingRepair.sendInitialRepairs();
repairs.add(blockingRepair);
}
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 6aa6ece..2f82c22 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -76,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
{
}
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index 64bfec2..d9293fb 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -61,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
{
throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions");
}
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 9441945..4747651 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -93,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
* Repairs a partition _after_ receiving data responses. This method receives replica list, since
* we will block repair only on the replicas that have responded.
*/
- void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan);
+ void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan);
}
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 60e0d41..fc4c351 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -19,14 +19,17 @@
package org.apache.cassandra.service.reads.repair;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Map;
+import java.util.function.Consumer;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringBound;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.LivenessInfo;
@@ -45,8 +48,10 @@ import org.apache.cassandra.db.rows.RowDiffListener;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.schema.ColumnMetadata;
public class RowIteratorMergeListener<E extends Endpoints<E>>
@@ -57,10 +62,14 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
private final boolean isReversed;
private final ReadCommand command;
+ private final BitSet writeBackTo;
+ private final boolean buildFullDiff;
+ /** the repairs we will send to each source, suffixed by a complete repair of all differences, if {@link #buildFullDiff} */
private final PartitionUpdate.Builder[] repairs;
private final Row.Builder[] currentRows;
private final RowDiffListener diffListener;
- private final ReplicaPlan.ForRead<E> replicaPlan;
+ private final ReplicaPlan.ForRead<E> readPlan;
+ private final ReplicaPlan.ForTokenWrite writePlan;
// The partition level deletion for the merge row.
private DeletionTime partitionLevelDeletion;
@@ -73,17 +82,36 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
private final ReadRepair readRepair;
- public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
+ public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> readPlan, ReadCommand command, ReadRepair readRepair)
{
this.partitionKey = partitionKey;
this.columns = columns;
this.isReversed = isReversed;
- this.replicaPlan = replicaPlan;
- int size = replicaPlan.contacts().size();
- repairs = new PartitionUpdate.Builder[size];
- currentRows = new Row.Builder[size];
- sourceDeletionTime = new DeletionTime[size];
- markerToRepair = new ClusteringBound[size];
+ this.readPlan = readPlan;
+ this.writePlan = ReplicaPlans.forReadRepair(partitionKey.getToken(), readPlan);
+
+ int size = readPlan.contacts().size();
+ this.writeBackTo = new BitSet(size);
+ {
+ int i = 0;
+ for (Replica replica : readPlan.contacts())
+ {
+ if (writePlan.contacts().endpoints().contains(replica.endpoint()))
+ writeBackTo.set(i);
+ ++i;
+ }
+ }
+ // If we are contacting any nodes we didn't read from, we are likely handling a range movement.
+ // In this case we need to send all differences to these nodes, as we do not (with present design) know which
+ // node they bootstrapped from, and so which data we need to duplicate.
+ // In reality, there will be situations where we are simply sending the same number of writes to different nodes
+ // and in this case we could probably avoid building a full difference, and only ensure each write makes it to
+ // some other node, but it is probably not worth special casing this scenario.
+ this.buildFullDiff = Iterables.any(writePlan.contacts().endpoints(), e -> !readPlan.contacts().endpoints().contains(e));
+ this.repairs = new PartitionUpdate.Builder[size + (buildFullDiff ? 1 : 0)];
+ this.currentRows = new Row.Builder[size];
+ this.sourceDeletionTime = new DeletionTime[size];
+ this.markerToRepair = new ClusteringBound[size];
this.command = command;
this.readRepair = readRepair;
@@ -128,13 +156,6 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
};
}
- private PartitionUpdate.Builder update(int i)
- {
- if (repairs[i] == null)
- repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
- return repairs[i];
- }
-
/**
* The partition level deletion with with which source {@code i} is currently repaired, or
* {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
@@ -156,13 +177,30 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
return currentRows[i];
}
+ @Inline
+ private void applyToPartition(int i, Consumer<PartitionUpdate.Builder> f)
+ {
+ if (writeBackTo.get(i))
+ {
+ if (repairs[i] == null)
+ repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+ f.accept(repairs[i]);
+ }
+ if (buildFullDiff)
+ {
+ if (repairs[repairs.length - 1] == null)
+ repairs[repairs.length - 1] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+ f.accept(repairs[repairs.length - 1]);
+ }
+ }
+
public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
{
this.partitionLevelDeletion = mergedDeletion;
for (int i = 0; i < versions.length; i++)
{
if (mergedDeletion.supersedes(versions[i]))
- update(i).addPartitionDeletion(mergedDeletion);
+ applyToPartition(i, p -> p.addPartitionDeletion(mergedDeletion));
}
}
@@ -178,7 +216,10 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
for (int i = 0; i < currentRows.length; i++)
{
if (currentRows[i] != null)
- update(i).add(currentRows[i].build());
+ {
+ Row row = currentRows[i].build();
+ applyToPartition(i, p -> p.add(row));
+ }
}
Arrays.fill(currentRows, null);
}
@@ -302,34 +343,44 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
private void closeOpenMarker(int i, ClusteringBound close)
{
ClusteringBound open = markerToRepair[i];
- update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+ RangeTombstone rt = new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion());
+ applyToPartition(i, p -> p.add(rt));
markerToRepair[i] = null;
}
public void close()
{
- Map<Replica, Mutation> mutations = null;
- Endpoints<?> sources = replicaPlan.contacts();
- for (int i = 0; i < repairs.length; i++)
- {
- if (repairs[i] == null)
- continue;
+ boolean hasRepairs = false;
+ for (int i = 0 ; !hasRepairs && i < repairs.length ; ++i)
+ hasRepairs = repairs[i] != null;
+ if (!hasRepairs)
+ return;
+
+ PartitionUpdate fullDiffRepair = null;
+ if (buildFullDiff && repairs[repairs.length - 1] != null)
+ fullDiffRepair = repairs[repairs.length - 1].build();
- Replica source = sources.get(i);
+ Map<Replica, Mutation> mutations = Maps.newHashMapWithExpectedSize(writePlan.contacts().size());
+ ObjectIntOpenHashMap<InetAddressAndPort> sourceIds = new ObjectIntOpenHashMap<>(((repairs.length + 1) * 4) / 3);
+ for (int i = 0 ; i < readPlan.contacts().size() ; ++i)
+ sourceIds.put(readPlan.contacts().get(i).endpoint(), 1 + i);
- Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), replicaPlan.consistencyLevel(), source.endpoint(), false);
+ for (Replica replica : writePlan.contacts())
+ {
+ PartitionUpdate update = null;
+ int i = -1 + sourceIds.get(replica.endpoint());
+ if (i < 0)
+ update = fullDiffRepair;
+ else if (repairs[i] != null)
+ update = repairs[i].build();
+
+ Mutation mutation = BlockingReadRepairs.createRepairMutation(update, readPlan.consistencyLevel(), replica.endpoint(), false);
if (mutation == null)
continue;
- if (mutations == null)
- mutations = Maps.newHashMapWithExpectedSize(sources.size());
-
- mutations.put(source, mutation);
+ mutations.put(replica, mutation);
}
- if (mutations != null)
- {
- readRepair.repairPartition(partitionKey, mutations, replicaPlan);
- }
+ readRepair.repairPartition(partitionKey, mutations, writePlan);
}
}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 68cc6a2..d487c14 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -47,6 +47,8 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -766,5 +768,23 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
}
+ public List<Token> tokens()
+ {
+ return stream()
+ .map(i ->
+ {
+ try
+ {
+ IPartitioner partitioner = ((IPartitioner)Class.forName(i.config().getString("partitioner")).newInstance());
+ return partitioner.getTokenFactory().fromString(i.config().getString("initial_token"));
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 64dee64..65224ea 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -78,7 +78,12 @@ public class DistributedTestBase
protected static <C extends AbstractCluster<?>> C init(C cluster)
{
- cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
+ return init(cluster, Math.min(3, cluster.size()));
+ }
+
+ protected static <C extends AbstractCluster<?>> C init(C cluster, int replicationFactor)
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
return cluster;
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
new file mode 100644
index 0000000..a8c5fa1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.READ_REQ;
+
+public class ReadRepairTest extends DistributedTestBase
+{
+
+ @Test
+ public void readRepairTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1));
+
+ // Verify that data got repaired to the third node
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+ }
+
+ @Test
+ public void failingReadRepairTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ for (int i = 1 ; i <= 2 ; ++i)
+ cluster.get(i).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+ cluster.verbs(READ_REPAIR_REQ).to(3).drop();
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.QUORUM),
+ row(1, 1, 1));
+
+ // Data was not repaired
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+ }
+ }
+
+ @Test
+ public void movingTokenReadRepairTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(4), 3))
+ {
+ List<Token> tokens = cluster.tokens();
+
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ int i = 0;
+ while (true)
+ {
+ Token t = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(i));
+ if (t.compareTo(tokens.get(2 - 1)) < 0 && t.compareTo(tokens.get(1 - 1)) > 0)
+ break;
+ ++i;
+ }
+
+ // write only to #4
+ cluster.get(4).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1)", i);
+ // mark #2 as leaving in #4
+ cluster.get(4).acceptsOnInstance((InetAddressAndPort endpoint) -> {
+ StorageService.instance.getTokenMetadata().addLeavingEndpoint(endpoint);
+ PendingRangeCalculatorService.instance.update();
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+ }).accept(cluster.get(2).broadcastAddressAndPort());
+
+ // prevent #4 from reading or writing to #3, so our QUORUM must contain #2 and #4
+ // since #1 is taking over the range, this means any read-repair must make it to #1 as well
+ cluster.filters().verbs(READ_REQ.ordinal()).from(4).to(3).drop();
+ cluster.filters().verbs(READ_REPAIR_REQ.ordinal()).from(4).to(3).drop();
+ assertRows(cluster.coordinator(4).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL, i),
+ row(i, 1, 1));
+
+ // verify that #1 receives the write
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", i),
+ row(i, 1, 1));
+ }
+ }
+
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
similarity index 99%
rename from test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 679a90f..4292598 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.distributed.test;
-import java.util.concurrent.TimeUnit;
-
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -38,7 +36,7 @@ import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
-public class DistributedReadWritePathTest extends DistributedTestBase
+public class SimpleReadWriteTest extends DistributedTestBase
{
@BeforeClass
public static void before()
diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
index 545731b..884baa1 100644
--- a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
@@ -82,14 +82,18 @@ import static org.apache.cassandra.net.Verb.READ_REQ;
public abstract class AbstractReadResponseTest
{
public static final String KEYSPACE1 = "DataResolverTest";
+ public static final String KEYSPACE3 = "DataResolverTest3";
public static final String CF_STANDARD = "Standard1";
public static final String CF_COLLECTION = "Collection1";
public static Keyspace ks;
+ public static Keyspace ks3;
public static ColumnFamilyStore cfs;
public static ColumnFamilyStore cfs2;
+ public static ColumnFamilyStore cfs3;
public static TableMetadata cfm;
public static TableMetadata cfm2;
+ public static TableMetadata cfm3;
public static ColumnMetadata m;
public static DecoratedKey dk;
@@ -128,19 +132,32 @@ public abstract class AbstractReadResponseTest
.addRegularColumn("one", AsciiType.instance)
.addRegularColumn("two", AsciiType.instance);
+ TableMetadata.Builder builder3 =
+ TableMetadata.builder(KEYSPACE3, CF_STANDARD)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col1", AsciiType.instance)
+ .addRegularColumn("c1", AsciiType.instance)
+ .addRegularColumn("c2", AsciiType.instance)
+ .addRegularColumn("one", AsciiType.instance)
+ .addRegularColumn("two", AsciiType.instance);
+
TableMetadata.Builder builder2 =
TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
.addPartitionKeyColumn("k", ByteType.instance)
.addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
+ SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(2), builder1, builder2);
+ SchemaLoader.createKeyspace(KEYSPACE3, KeyspaceParams.simple(4), builder3);
ks = Keyspace.open(KEYSPACE1);
cfs = ks.getColumnFamilyStore(CF_STANDARD);
cfm = cfs.metadata();
cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
cfm2 = cfs2.metadata();
+ ks3 = Keyspace.open(KEYSPACE3);
+ cfs3 = ks3.getColumnFamilyStore(CF_STANDARD);
+ cfm3 = cfs3.metadata();
m = cfm2.getColumn(new ColumnIdentifier("m", false));
}
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 17c6e41..50ed09d 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -21,9 +21,17 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
+import java.util.UUID;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.Before;
@@ -59,6 +67,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
@@ -76,28 +85,42 @@ import static org.junit.Assert.assertTrue;
public class DataResolverTest extends AbstractReadResponseTest
{
- public static final String KEYSPACE1 = "DataResolverTest";
- public static final String CF_STANDARD = "Standard1";
-
private ReadCommand command;
private TestableReadRepair readRepair;
+ private Keyspace ks;
+ private ColumnFamilyStore cfs;
- @Before
- public void setup()
+ private EndpointsForRange makeReplicas(int num)
{
+ StorageService.instance.getTokenMetadata().clearUnsafe();
+
+ switch (num)
+ {
+ case 2:
+ ks = AbstractReadResponseTest.ks;
+ cfs = AbstractReadResponseTest.cfs;
+ break;
+ case 4:
+ ks = AbstractReadResponseTest.ks3;
+ cfs = AbstractReadResponseTest.cfs3;
+ break;
+ default:
+ throw new IllegalStateException("This test needs refactoring to cleanly support different replication factors");
+ }
+
command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
command.trackRepairedStatus();
readRepair = new TestableReadRepair(command);
- }
-
- private static EndpointsForRange makeReplicas(int num)
- {
+ Token token = Murmur3Partitioner.instance.getMinimumToken();
EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num);
for (int i = 0; i < num; i++)
{
try
{
- replicas.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) })));
+ InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) });
+ replicas.add(ReplicaUtils.full(endpoint));
+ StorageService.instance.getTokenMetadata().updateNormalToken(token = token.increaseSlightly(), endpoint);
+ Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
}
catch (UnknownHostException e)
{
@@ -231,30 +254,30 @@ public class DataResolverTest extends AbstractReadResponseTest
RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
- PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
+ PartitionUpdate update = new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
.addRangeTombstone(tombstone2)
.buildUpdate();
InetAddressAndPort peer1 = replicas.get(0).endpoint();
- UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
+ UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
.addRangeTombstone(tombstone2)
.buildUpdate());
resolver.preprocess(response(command, peer1, iter1));
// not covered by any range tombstone
InetAddressAndPort peer2 = replicas.get(1).endpoint();
- UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0")
+ UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("0")
.add("c1", "v0")
.buildUpdate());
resolver.preprocess(response(command, peer2, iter2));
// covered by a range tombstone
InetAddressAndPort peer3 = replicas.get(2).endpoint();
- UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10")
+ UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("10")
.add("c2", "v1")
.buildUpdate());
resolver.preprocess(response(command, peer3, iter3));
// range covered by rt, but newer
InetAddressAndPort peer4 = replicas.get(3).endpoint();
- UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3")
+ UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm3, nowInSec, 2L, dk).clustering("3")
.add("one", "A")
.buildUpdate());
resolver.preprocess(response(command, peer4, iter4));
@@ -1294,7 +1317,7 @@ public class DataResolverTest extends AbstractReadResponseTest
private void assertRepairMetadata(Mutation mutation)
{
PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
- assertEquals(update.metadata().keyspace, cfm.keyspace);
+ assertEquals(update.metadata().keyspace, ks.getName());
assertEquals(update.metadata().name, cfm.name);
}
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 3d39732..ed4ef3f 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -1,14 +1,24 @@
package org.apache.cassandra.service.reads.repair;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
import org.junit.Before;
@@ -41,6 +51,7 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -48,6 +59,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.MigrationManager;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.locator.Replica.fullReplica;
@@ -216,6 +228,14 @@ public abstract class AbstractReadRepairTest
replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
+ StorageService.instance.getTokenMetadata().clearUnsafe();
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint());
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint());
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint());
+ Gossiper.instance.initializeNodeUnsafe(replica1.endpoint(), UUID.randomUUID(), 1);
+ Gossiper.instance.initializeNodeUnsafe(replica2.endpoint(), UUID.randomUUID(), 1);
+ Gossiper.instance.initializeNodeUnsafe(replica3.endpoint(), UUID.randomUUID(), 1);
+
// default test values
key = dk(5);
cell1 = cell("v", "val1", now);
@@ -247,6 +267,22 @@ public abstract class AbstractReadRepairTest
return replicaPlan(ks, consistencyLevel, replicas, replicas);
}
+ static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan)
+ {
+ return repairPlan(readPlan, readPlan.candidates());
+ }
+
+ static ReplicaPlan.ForTokenWrite repairPlan(EndpointsForRange liveAndDown, EndpointsForRange targets)
+ {
+ return repairPlan(replicaPlan(liveAndDown, targets), liveAndDown);
+ }
+
+ static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown)
+ {
+ Token token = readPlan.range().left.getToken();
+ EndpointsForToken pending = EndpointsForToken.empty(token);
+ return ReplicaPlans.forWrite(ks, ConsistencyLevel.TWO, liveAndDown.forToken(token), pending, Predicates.alwaysTrue(), ReplicaPlans.writeReadRepair(readPlan));
+ }
static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets)
{
return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 3cc1a63..e4b3a71 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -44,13 +44,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
public class BlockingReadRepairTest extends AbstractReadRepairTest
{
- private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
- extends BlockingPartitionRepair<E, P>
+ private static class InstrumentedReadRepairHandler
+ extends BlockingPartitionRepair
{
- public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+ public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan,
- e -> targets.contains(e));
+ super(Util.dk("not a real usable value"), repairs, writePlan, e -> targets.contains(e));
}
Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -67,16 +66,15 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
configureClass(ReadRepairStrategy.BLOCKING);
}
- private static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
- InstrumentedReadRepairHandler<E, P> createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+ private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
+ return new InstrumentedReadRepairHandler(repairs, writePlan);
}
- private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
+ private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs)
{
EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
- return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
+ return createRepairHandler(repairs, repairPlan(replicas, replicas));
}
private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
@@ -140,8 +138,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica1, repair1);
repairs.put(replica2, repair2);
- ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
- InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaPlan);
+ ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan);
Assert.assertTrue(handler.mutationsSent.isEmpty());
@@ -176,7 +174,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica1, mutation(cell2));
repairs.put(replica2, mutation(cell1));
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
handler.sendInitialRepairs();
handler.ack(target1);
handler.ack(target2);
@@ -197,7 +195,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica1, mutation(cell2));
repairs.put(replica2, mutation(cell1));
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
handler.sendInitialRepairs();
// we've already sent mutations to all candidates, so we shouldn't send any more
@@ -219,7 +217,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica1, repair1);
// check that the correct initial mutations are sent out
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)));
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, repairPlan(replicas, EndpointsForRange.of(replica1, replica2)));
handler.sendInitialRepairs();
Assert.assertEquals(1, handler.mutationsSent.size());
Assert.assertTrue(handler.mutationsSent.containsKey(target1));
@@ -240,7 +238,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica3, mutation(cell3));
Assert.assertEquals(3, repairs.size());
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
handler.sendInitialRepairs();
Assert.assertFalse(getCurrentRepairStatus(handler));
@@ -266,8 +264,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(remote1, mutation(cell1));
EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2);
- ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants);
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
+ ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants));
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan);
handler.sendInitialRepairs();
Assert.assertEquals(2, handler.mutationsSent.size());
Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index c15d7f4..7806a3f 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import com.google.common.collect.Lists;
+
import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.After;
import org.junit.Assert;
@@ -81,8 +82,8 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
repairs.put(replica2, repair2);
- ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
- DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
+ ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+ DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, writePlan);
Assert.assertTrue(handler.updatesByEp.isEmpty());
@@ -117,15 +118,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
return new DiagnosticBlockingRepairHandler(command, replicaPlan, queryStartNanoTime);
}
- private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaPlan.ForRead<?> replicaPlan)
- {
- return new DiagnosticPartitionReadRepairHandler<>(key, repairs, maxBlockFor, replicaPlan);
- }
-
- private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
+ private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
- return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
+ return new DiagnosticPartitionReadRepairHandler(key, repairs, writePlan);
}
private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair
@@ -170,7 +165,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
}
private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
- extends BlockingPartitionRepair<E, P>
+ extends BlockingPartitionRepair
{
private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>();
@@ -180,9 +175,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
return e -> candidates.contains(e);
}
- DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+ DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- super(key, repairs, maxBlockFor, replicaPlan, isLocal());
+ super(key, repairs, writePlan, isLocal());
DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent);
}
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index cc7fdf1..c0af493 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -92,8 +92,9 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
@Test(expected = UnsupportedOperationException.class)
public void repairPartitionFailure()
{
- ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
- InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan);
- repair.repairPartition(null, Collections.emptyMap(), replicaPlan.get());
+ ReplicaPlan.SharedForRangeRead readPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
+ ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, replicas);
+ InstrumentedReadRepair repair = createInstrumentedReadRepair(readPlan);
+ repair.repairPartition(null, Collections.emptyMap(), writePlan);
}
}
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 5ae9dd8..7458e9b 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -49,8 +49,6 @@ import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -72,12 +70,11 @@ public class ReadRepairTest
static EndpointsForRange targets;
private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
- extends BlockingPartitionRepair<E, P>
+ extends BlockingPartitionRepair
{
- public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+ public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan,
- e -> replicaPlan.consistencyLevel().isDatacenterLocal() && targets.endpoints().contains(e));
+ super(Util.dk("not a valid key"), repairs, writePlan, e -> targets.endpoints().contains(e));
}
Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -161,10 +158,11 @@ public class ReadRepairTest
return new Mutation(PartitionUpdate.singleRowUpdate(cfm, key, builder.build()));
}
- private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, EndpointsForRange all, EndpointsForRange targets)
+ private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, EndpointsForRange all, EndpointsForRange targets)
{
- ReplicaPlan.ForRangeRead replicaPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets);
- return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
+ ReplicaPlan.ForRangeRead readPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets);
+ ReplicaPlan.ForTokenWrite writePlan = AbstractReadRepairTest.repairPlan(readPlan);
+ return new InstrumentedReadRepairHandler(repairs, writePlan);
}
@Test
@@ -198,8 +196,7 @@ public class ReadRepairTest
repairs.put(target1, repair1);
repairs.put(target2, repair2);
- InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2,
- targets, EndpointsForRange.of(target1, target2));
+ InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, targets, EndpointsForRange.of(target1, target2));
Assert.assertTrue(handler.mutationsSent.isEmpty());
@@ -234,7 +231,7 @@ public class ReadRepairTest
repairs.put(target2, mutation(cell1));
EndpointsForRange replicas = EndpointsForRange.of(target1, target2);
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, targets);
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, replicas, targets);
handler.sendInitialRepairs();
handler.ack(target1.endpoint());
handler.ack(target2.endpoint());
@@ -255,7 +252,7 @@ public class ReadRepairTest
repairs.put(target1, mutation(cell2));
repairs.put(target2, mutation(cell1));
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, EndpointsForRange.of(target1, target2),
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, EndpointsForRange.of(target1, target2),
EndpointsForRange.of(target1, target2));
handler.sendInitialRepairs();
@@ -278,7 +275,7 @@ public class ReadRepairTest
repairs.put(target1, repair1);
// check that the correct initial mutations are sent out
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, targets, EndpointsForRange.of(target1, target2));
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, targets, EndpointsForRange.of(target1, target2));
handler.sendInitialRepairs();
Assert.assertEquals(1, handler.mutationsSent.size());
Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
@@ -301,7 +298,7 @@ public class ReadRepairTest
Assert.assertEquals(3, repairs.size());
EndpointsForRange replicas = EndpointsForRange.of(target1, target2, target3);
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, replicas);
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, replicas, replicas);
handler.sendInitialRepairs();
Assert.assertFalse(getCurrentRepairStatus(handler));
@@ -311,7 +308,6 @@ public class ReadRepairTest
// here we should stop blocking, even though we've sent 3 repairs
handler.ack(target2.endpoint());
Assert.assertTrue(getCurrentRepairStatus(handler));
-
}
/**
@@ -330,7 +326,7 @@ public class ReadRepairTest
EndpointsForRange participants = EndpointsForRange.of(target1, target2, remote1, remote2);
EndpointsForRange targets = EndpointsForRange.of(target1, target2);
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants, targets);
+ InstrumentedReadRepairHandler handler = createRepairHandler(repairs, participants, targets);
handler.sendInitialRepairs();
Assert.assertEquals(2, handler.mutationsSent.size());
Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 53964cb..84276d5 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -113,7 +113,7 @@ public class TestableReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
}
@Override
- public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
{
for (Map.Entry<Replica, Mutation> entry: mutations.entrySet())
sent.put(entry.getKey().endpoint(), entry.getValue());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org