You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/12 14:58:02 UTC
cassandra git commit: Remove mentions of transient replication from
repair path
Repository: cassandra
Updated Branches:
refs/heads/trunk f100024eb -> 8a73427c6
Remove mentions of transient replication from repair path
Patch by Alex Petrov; reviewed by Blake Eggleston and Ariel Weisberg for CASSANDRA-14698
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a73427c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a73427c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a73427c
Branch: refs/heads/trunk
Commit: 8a73427c6543c94ce49da0ed1f833ec5b8ed4f18
Parents: f100024
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Sep 6 12:37:13 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Wed Sep 12 16:56:57 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../service/reads/AbstractReadExecutor.java | 2 +-
.../cassandra/service/reads/DataResolver.java | 13 +-
.../cassandra/service/reads/DigestResolver.java | 8 +-
.../service/reads/ResponseResolver.java | 8 +-
.../reads/repair/AbstractReadRepair.java | 4 +-
.../reads/repair/ReadRepairDiagnostics.java | 14 +-
.../reads/repair/RowIteratorMergeListener.java | 44 ++--
.../reads/DataResolverTransientTest.java | 226 -------------------
.../service/reads/DigestResolverTest.java | 35 ++-
10 files changed, 73 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3cfdcff..264c80f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
4.0
- * fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
+ * Remove mentions of transient replication from repair path (CASSANDRA-14698)
+ * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
* Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
* DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696)
* AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 5543fcc..75885ae 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -81,7 +81,7 @@ public abstract class AbstractReadExecutor
this.replicaLayout = replicaLayout;
this.initialDataRequestCount = initialDataRequestCount;
this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
- this.digestResolver = new DigestResolver<>(command, replicaLayout, readRepair, queryStartNanoTime);
+ this.digestResolver = new DigestResolver<>(command, replicaLayout, queryStartNanoTime);
this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime);
this.cfs = cfs;
this.traceState = Tracing.instance.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 1f69d6a..a6901b2 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -50,15 +50,18 @@ import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
import static com.google.common.collect.Iterables.*;
+import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener;
public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
{
private final boolean enforceStrictLiveness;
+ private final ReadRepair<E, L> readRepair;
public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
{
- super(command, replicaLayout, readRepair, queryStartNanoTime);
+ super(command, replicaLayout, queryStartNanoTime);
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+ this.readRepair = readRepair;
}
public PartitionIterator getData()
@@ -157,8 +160,14 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
}
- private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources, RepairedDataTracker repairedDataTracker)
+ private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
+ L sources,
+ RepairedDataTracker repairedDataTracker)
{
+ // Avoid wrapping no-op listeners as it doesn't throw
+ if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP)
+ return partitionListener;
+
return new UnfilteredPartitionIterators.MergeListener()
{
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index c3eee43..28c2117 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -44,9 +44,9 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
{
private volatile MessageIn<ReadResponse> dataResponse;
- public DigestResolver(ReadCommand command, L replicas, ReadRepair<E, L> readRepair, long queryStartNanoTime)
+ public DigestResolver(ReadCommand command, L replicas, long queryStartNanoTime)
{
- super(command, replicas, readRepair, queryStartNanoTime);
+ super(command, replicas, queryStartNanoTime);
Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
"DigestResolver can only be used with SinglePartitionReadCommand commands");
}
@@ -95,11 +95,11 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
// transient replica response still contains data, which needs to be reconciled.
DataResolver<E, L> dataResolver = new DataResolver<>(command,
replicaLayout,
- (ReadRepair<E, L>) NoopReadRepair.instance,
+ NoopReadRepair.instance,
queryStartNanoTime);
dataResolver.preprocess(dataResponse);
- // Forward differences to all full nodes
+ // Reconcile with transient replicas
for (MessageIn<ReadResponse> response : responses)
{
Replica replica = replicaLayout.getReplicaFor(response.from);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index e306b4d..298f843 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.utils.concurrent.Accumulator;
public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
@@ -34,17 +33,15 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica
protected final ReadCommand command;
protected final L replicaLayout;
- protected final ReadRepair<E, L> readRepair;
// Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
protected final Accumulator<MessageIn<ReadResponse>> responses;
protected final long queryStartNanoTime;
- public ResponseResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
+ public ResponseResolver(ReadCommand command, L replicaLayout, long queryStartNanoTime)
{
this.command = command;
this.replicaLayout = replicaLayout;
- this.readRepair = readRepair;
// TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes)
this.responses = new Accumulator<>(replicaLayout.all().size());
this.queryStartNanoTime = queryStartNanoTime;
@@ -60,7 +57,8 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica
}
catch (IllegalStateException e)
{
- logger.error("Encountered error while trying to preprocess the message {}: %s in command {}, replicas: {}", message, command, readRepair, replicaLayout.consistencyLevel(), replicaLayout.selected());
+ logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica layout: {}",
+ message, command, replicaLayout);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 493b9d0..528d31b 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -110,7 +110,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
Tracing.trace("Enqueuing full data read to {}", replica);
sendReadCommand(replica, readCallback);
}
- ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints());
+ ReadRepairDiagnostics.startRepair(this, replicaLayout, digestResolver);
}
public void awaitReads() throws ReadTimeoutException
@@ -150,7 +150,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
Tracing.trace("Enqueuing speculative full data read to {}", replica);
sendReadCommand(replica, repair.readCallback);
ReadRepairMetrics.speculatedRead.mark();
- ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints());
+ ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
index 6eff395..4c74a89 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -18,15 +18,14 @@
package org.apache.cassandra.service.reads.repair;
-import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import com.google.common.collect.Lists;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.diag.DiagnosticEventService;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType;
import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
@@ -39,21 +38,22 @@ final class ReadRepairDiagnostics
{
}
- static void startRepair(AbstractReadRepair readRepair, Collection<InetAddressAndPort> endpointDestinations,
- DigestResolver digestResolver, Collection<InetAddressAndPort> allEndpoints)
+ static void startRepair(AbstractReadRepair readRepair, ReplicaLayout<?, ?> layout, DigestResolver digestResolver)
{
if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR))
service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR,
- readRepair, endpointDestinations, allEndpoints, digestResolver));
+ readRepair,
+ layout.selected().endpoints(),
+ layout.all().endpoints(), digestResolver));
}
static void speculatedRead(AbstractReadRepair readRepair, InetAddressAndPort endpoint,
- Iterable<InetAddressAndPort> allEndpoints)
+ ReplicaLayout<?, ?> replicaLayout)
{
if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.SPECULATED_READ))
service.publish(new ReadRepairEvent(ReadRepairEventType.SPECULATED_READ,
readRepair, Collections.singletonList(endpoint),
- Lists.newArrayList(allEndpoints), null));
+ Lists.newArrayList(replicaLayout.all().endpoints()), null));
}
static void sendInitialRepair(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index b0c019a..7fe797a 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -58,7 +58,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
private final ConsistencyLevel consistency;
private final PartitionUpdate.Builder[] repairs;
- private final Replica[] sources;
private final Row.Builder[] currentRows;
private final RowDiffListener diffListener;
private final ReplicaLayout layout;
@@ -79,16 +78,12 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
this.partitionKey = partitionKey;
this.columns = columns;
this.isReversed = isReversed;
- Endpoints<?> sources = layout.selected();
- this.sources = new Replica[sources.size()];
- for (int i = 0; i < sources.size(); i++)
- this.sources[i] = sources.get(i);
-
this.layout = layout;
- repairs = new PartitionUpdate.Builder[sources.size()];
- currentRows = new Row.Builder[sources.size()];
- sourceDeletionTime = new DeletionTime[sources.size()];
- markerToRepair = new ClusteringBound[sources.size()];
+ int size = layout.selected().size();
+ repairs = new PartitionUpdate.Builder[size];
+ currentRows = new Row.Builder[size];
+ sourceDeletionTime = new DeletionTime[size];
+ markerToRepair = new ClusteringBound[size];
this.command = command;
this.consistency = consistency;
this.readRepair = readRepair;
@@ -97,25 +92,25 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
{
public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
{
- if (merged != null && !merged.equals(original) && !isTransient(i))
+ if (merged != null && !merged.equals(original))
currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
}
public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
{
- if (merged != null && !merged.equals(original) && !isTransient(i))
+ if (merged != null && !merged.equals(original))
currentRow(i, clustering).addRowDeletion(merged);
}
public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
{
- if (merged != null && !merged.equals(original) && !isTransient(i))
+ if (merged != null && !merged.equals(original))
currentRow(i, clustering).addComplexDeletion(column, merged);
}
public void onCell(int i, Clustering clustering, Cell merged, Cell original)
{
- if (merged != null && !merged.equals(original) && isQueried(merged) && !isTransient(i))
+ if (merged != null && !merged.equals(original) && isQueried(merged))
currentRow(i, clustering).addCell(merged);
}
@@ -134,11 +129,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
};
}
- private boolean isTransient(int i)
- {
- return sources[i].isTransient();
- }
-
private PartitionUpdate.Builder update(int i)
{
if (repairs[i] == null)
@@ -172,9 +162,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
this.partitionLevelDeletion = mergedDeletion;
for (int i = 0; i < versions.length; i++)
{
- if (isTransient(i))
- continue;
-
if (mergedDeletion.supersedes(versions[i]))
update(i).addPartitionDeletion(mergedDeletion);
}
@@ -209,9 +196,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
for (int i = 0; i < versions.length; i++)
{
- if (isTransient(i))
- continue;
-
RangeTombstoneMarker marker = versions[i];
// Update what the source now thinks is the current deletion
@@ -326,20 +310,22 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
public void close()
{
Map<Replica, Mutation> mutations = null;
+ Endpoints<?> sources = layout.selected();
for (int i = 0; i < repairs.length; i++)
{
if (repairs[i] == null)
continue;
- Preconditions.checkState(!isTransient(i), "cannot read repair transient replicas");
- Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i].endpoint(), false);
+ Replica source = sources.get(i);
+
+ Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, source.endpoint(), false);
if (mutation == null)
continue;
if (mutations == null)
- mutations = Maps.newHashMapWithExpectedSize(sources.length);
+ mutations = Maps.newHashMapWithExpectedSize(sources.size());
- mutations.put(sources[i], mutation);
+ mutations.put(source, mutation);
}
if (mutations != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
deleted file mode 100644
index 8119400..0000000
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.reads;
-
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.primitives.Ints;
-
-import org.apache.cassandra.Util;
-import org.apache.cassandra.db.DecoratedKey;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.EmptyIterators;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.SimpleBuilders;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeRow;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.locator.EndpointsForToken;
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.reads.repair.TestableReadRepair;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
-import static org.apache.cassandra.locator.Replica.fullReplica;
-import static org.apache.cassandra.locator.Replica.transientReplica;
-import static org.apache.cassandra.locator.ReplicaUtils.full;
-import static org.apache.cassandra.locator.ReplicaUtils.trans;
-
-/**
- * Tests DataResolvers handing of transient replicas
- */
-public class DataResolverTransientTest extends AbstractReadResponseTest
-{
- private static DecoratedKey key;
-
- @Before
- public void setUp()
- {
- key = Util.dk("key1");
- }
-
- private static PartitionUpdate.Builder update(TableMetadata metadata, String key, Row... rows)
- {
- PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), rows.length, false);
- for (Row row: rows)
- {
- builder.add(row);
- }
- return builder;
- }
-
- private static PartitionUpdate.Builder update(Row... rows)
- {
- return update(cfm, "key1", rows);
- }
-
- private static Row.SimpleBuilder rowBuilder(int clustering)
- {
- return new SimpleBuilders.RowBuilder(cfm, Integer.toString(clustering));
- }
-
- private static Row row(long timestamp, int clustering, int value)
- {
- return rowBuilder(clustering).timestamp(timestamp).add("c1", Integer.toString(value)).build();
- }
-
- private static DeletionTime deletion(long timeMillis)
- {
- TimeUnit MILLIS = TimeUnit.MILLISECONDS;
- return new DeletionTime(MILLIS.toMicros(timeMillis), Ints.checkedCast(MILLIS.toSeconds(timeMillis)));
- }
-
- /**
- * Tests that the given update doesn't cause data resolver to attempt to repair a transient replica
- */
- private void assertNoTransientRepairs(PartitionUpdate update)
- {
- SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key);
- EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
- TestableReadRepair repair = new TestableReadRepair(command, QUORUM);
- DataResolver resolver = new DataResolver(command, plan(targetReplicas, ConsistencyLevel.QUORUM), repair, 0);
-
- Assert.assertFalse(resolver.isDataPresent());
- resolver.preprocess(response(command, EP1, iter(update), false));
- resolver.preprocess(response(command, EP2, iter(update), false));
- resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(update.metadata()), false));
-
- Assert.assertFalse(repair.dataWasConsumed());
- assertPartitionsEqual(filter(iter(update)), resolver.resolve());
- Assert.assertTrue(repair.dataWasConsumed());
- Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty());
- }
-
- @Test
- public void emptyRowRepair()
- {
- assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 5)).build());
- }
-
- @Test
- public void emptyPartitionDeletionRepairs()
- {
- PartitionUpdate.Builder builder = update();
- builder.addPartitionDeletion(deletion(1999));
- assertNoTransientRepairs(builder.build());
- }
-
- /**
- * Partition level deletion responses shouldn't sent data to a transient replica
- */
- @Test
- public void emptyRowDeletionRepairs()
- {
- PartitionUpdate.Builder builder = update();
- builder.add(rowBuilder(1).timestamp(1999).delete().build());
- assertNoTransientRepairs(builder.build());
- }
-
- @Test
- public void emptyComplexDeletionRepair()
- {
-
- long[] ts = {1000, 2000};
-
- Row.Builder builder = BTreeRow.unsortedBuilder();
- builder.newRow(Clustering.EMPTY);
- builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
- assertNoTransientRepairs(update(cfm2, "key", builder.build()).build());
-
- }
-
- @Test
- public void emptyRangeTombstoneRepairs()
- {
- Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), Clustering.make(ByteBufferUtil.bytes("b")));
- PartitionUpdate.Builder builder = update();
- builder.add(new RangeTombstone(slice, deletion(2000)));
- assertNoTransientRepairs(builder.build());
- }
-
- /**
- * If the full replicas need to repair each other, repairs shouldn't be sent to transient replicas
- */
- @Test
- public void fullRepairsIgnoreTransientReplicas()
- {
- SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
- EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
- TestableReadRepair repair = new TestableReadRepair(command, QUORUM);
- DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0);
-
- Assert.assertFalse(resolver.isDataPresent());
- resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 5)).build()), false));
- resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 4)).build()), false));
- resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(cfm), false));
-
- Assert.assertFalse(repair.dataWasConsumed());
-
- consume(resolver.resolve());
-
- Assert.assertTrue(repair.dataWasConsumed());
-
- Assert.assertTrue(repair.sent.containsKey(EP1));
- Assert.assertTrue(repair.sent.containsKey(EP2));
- Assert.assertFalse(repair.sent.containsKey(EP3));
- }
-
- /**
- * If the transient replica has new data, the full replicas shoould be repaired, the transient one should not
- */
- @Test
- public void transientMismatchesRepairFullReplicas()
- {
- SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
- EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
- TestableReadRepair<?, ?> repair = new TestableReadRepair(command, QUORUM);
- DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0);
-
- Assert.assertFalse(resolver.isDataPresent());
- PartitionUpdate transData = update(row(1000, 5, 5)).build();
- resolver.preprocess(response(command, EP1, EmptyIterators.unfilteredPartition(cfm), false));
- resolver.preprocess(response(command, EP2, EmptyIterators.unfilteredPartition(cfm), false));
- resolver.preprocess(response(command, EP3, iter(transData), false));
-
- Assert.assertFalse(repair.dataWasConsumed());
-
- assertPartitionsEqual(filter(iter(transData)), resolver.resolve());
-
- Assert.assertTrue(repair.dataWasConsumed());
-
- assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm))));
- assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm))));
- Assert.assertFalse(repair.sent.containsKey(EP3));
-
- }
-
- private ReplicaLayout.ForToken plan(EndpointsForToken replicas, ConsistencyLevel consistencyLevel)
- {
- return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 5306a74..8454d6a 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -64,8 +64,7 @@ public class DigestResolverTest extends AbstractReadResponseTest
{
SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2));
- TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
- DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0);
+ DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
PartitionUpdate response = update(row(1000, 4, 4), row(1000, 5, 5)).build();
@@ -83,7 +82,7 @@ public class DigestResolverTest extends AbstractReadResponseTest
{
SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2));
- DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance,0);
+ DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 5)).build();
PartitionUpdate response2 = update(row(2000, 4, 5)).build();
@@ -104,8 +103,7 @@ public class DigestResolverTest extends AbstractReadResponseTest
{
SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), trans(EP2));
- TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
- DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0);
+ DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 5)).build();
PartitionUpdate response2 = update(row(1000, 5, 5)).build();
@@ -116,7 +114,6 @@ public class DigestResolverTest extends AbstractReadResponseTest
Assert.assertTrue(resolver.isDataPresent());
Assert.assertTrue(resolver.responsesMatch());
Assert.assertTrue(resolver.hasTransientResponse());
- Assert.assertTrue(readRepair.sent.isEmpty());
}
/**
@@ -127,7 +124,7 @@ public class DigestResolverTest extends AbstractReadResponseTest
{
SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), trans(EP2));
- DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance, 0);
+ DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
PartitionUpdate response2 = update(row(1000, 5, 5)).build();
Assert.assertFalse(resolver.isDataPresent());
@@ -137,6 +134,30 @@ public class DigestResolverTest extends AbstractReadResponseTest
Assert.assertTrue(resolver.hasTransientResponse());
}
+ @Test
+ public void transientResponseData()
+ {
+ SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+ EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2), trans(EP3));
+ DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0);
+
+ PartitionUpdate fullResponse = update(row(1000, 1, 1)).build();
+ PartitionUpdate digestResponse = update(row(1000, 1, 1)).build();
+ PartitionUpdate transientResponse = update(row(1000, 2, 2)).build();
+ Assert.assertFalse(resolver.isDataPresent());
+ Assert.assertFalse(resolver.hasTransientResponse());
+ resolver.preprocess(response(command, EP1, iter(fullResponse), false));
+ Assert.assertTrue(resolver.isDataPresent());
+ resolver.preprocess(response(command, EP2, iter(digestResponse), true));
+ resolver.preprocess(response(command, EP3, iter(transientResponse), false));
+ Assert.assertTrue(resolver.hasTransientResponse());
+
+ assertPartitionsEqual(filter(iter(dk,
+ row(1000, 1, 1),
+ row(1000, 2, 2))),
+ resolver.getData());
+ }
+
private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas)
{
return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org