You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/04/16 17:44:53 UTC
[cassandra] 02/02: Ensure repaired data tracking reads a consistent
amount of data across replicas
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a8e7cfbc0e146ea82154654ba43b613b058f99d1
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Feb 11 09:59:31 2020 +0000
Ensure repaired data tracking reads a consistent amount of data across replicas
Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-15601
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ReadCommand.java | 310 ++++---------------
.../org/apache/cassandra/db/RepairedDataInfo.java | 336 +++++++++++++++++++++
.../apache/cassandra/metrics/KeyspaceMetrics.java | 12 +
.../org/apache/cassandra/metrics/TableMetrics.java | 18 +-
.../distributed/test/RepairDigestTrackingTest.java | 169 ++++++++++-
.../org/apache/cassandra/db/ReadCommandTest.java | 120 +++++++-
.../apache/cassandra/db/RepairedDataInfoTest.java | 303 +++++++++++++++++++
8 files changed, 1003 insertions(+), 266 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 96eeed4..4586c71 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601)
* Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
* Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
* Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 4f8ea3e..4c4c833 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -23,6 +23,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.LongPredicate;
+import java.util.function.Function;
import javax.annotation.Nullable;
@@ -62,12 +63,12 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
/**
* General interface for storage-engine read commands (common to both range and
@@ -91,17 +92,7 @@ public abstract class ReadCommand extends AbstractReadQuery
// for data queries, coordinators may request information on the repaired data used in constructing the response
private boolean trackRepairedStatus = false;
// tracker for repaired data, initialized to singleton null object
- private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
- {
- void trackPartitionKey(DecoratedKey key){}
- void trackDeletion(DeletionTime deletion){}
- void trackRangeTombstoneMarker(RangeTombstoneMarker marker){}
- void trackRow(Row row){}
- boolean isConclusive(){ return true; }
- ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
- };
-
- private RepairedDataInfo repairedDataInfo = NULL_REPAIRED_DATA_INFO;
+ private RepairedDataInfo repairedDataInfo = RepairedDataInfo.NULL_REPAIRED_DATA_INFO;
int oldestUnrepairedTombstone = Integer.MAX_VALUE;
@@ -450,7 +441,13 @@ public abstract class ReadCommand extends AbstractReadQuery
}
if (isTrackingRepairedStatus())
- repairedDataInfo = new RepairedDataInfo();
+ {
+ final DataLimits.Counter repairedReadCount = limits().newCounter(nowInSec(),
+ false,
+ selectsFullPartition(),
+ metadata().enforceStrictLiveness()).onlyCount();
+ repairedDataInfo = new RepairedDataInfo(repairedReadCount);
+ }
UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
@@ -475,7 +472,22 @@ public abstract class ReadCommand extends AbstractReadQuery
// apply the limits/row counter; this transformation is stopping and would close the iterator as soon
// as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
- iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+ // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
+ // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
+ if (isTrackingRepairedStatus())
+ {
+ DataLimits.Counter limit =
+ limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness());
+ iterator = limit.applyTo(iterator);
+ // ensure that a consistent amount of repaired data is read on each replica. This causes silent
+ // overreading from the repaired data set, up to limits(). The extra data is not visible to
+ // the caller, only iterated to produce the repaired data digest.
+ iterator = repairedDataInfo.extend(iterator, limit);
+ }
+ else
+ {
+ iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+ }
// because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
return RTBoundCloser.close(iterator);
@@ -723,254 +735,37 @@ public abstract class ReadCommand extends AbstractReadQuery
return toCQLString();
}
- private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
- final RepairedDataInfo repairedDataInfo)
- {
- class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
- {
- protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
- {
- return withRepairedDataInfo(partition, repairedDataInfo);
- }
- }
-
- return Transformation.apply(iterator, new WithRepairedDataTracking());
- }
-
- private static UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator,
- final RepairedDataInfo repairedDataInfo)
- {
- class WithTracking extends Transformation
- {
- protected DecoratedKey applyToPartitionKey(DecoratedKey key)
- {
- repairedDataInfo.onNewPartition(iterator);
- repairedDataInfo.trackPartitionKey(key);
- return key;
- }
-
- protected DeletionTime applyToDeletion(DeletionTime deletionTime)
- {
- repairedDataInfo.trackDeletion(deletionTime);
- return deletionTime;
- }
-
- protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
- {
- repairedDataInfo.trackRangeTombstoneMarker(marker);
- return marker;
- }
-
- protected Row applyToStatic(Row row)
- {
- repairedDataInfo.trackStaticRow(row);
- return row;
- }
-
- protected Row applyToRow(Row row)
- {
- repairedDataInfo.trackRow(row);
- return row;
- }
-
- protected void onPartitionClose()
- {
- repairedDataInfo.onPartitionClose();
- }
- }
- return Transformation.apply(iterator, new WithTracking());
- }
-
- private static class RepairedDataInfo
- {
- // Keeps a digest of the partition currently being processed. Since we won't know
- // whether a partition will be fully purged from a read result until it's been
- // consumed, we buffer this per-partition digest and add it to the final digest
- // when the partition is closed (if it wasn't fully purged).
- private Digest perPartitionDigest;
- private Digest perCommandDigest;
- private boolean isConclusive = true;
-
- // Doesn't actually purge from the underlying iterators, but excludes from the digest
- // the purger can't be initialized until we've iterated all the sstables for the query
- // as it requires the oldest repaired tombstone
- private RepairedDataPurger purger;
- private boolean isFullyPurged = true;
-
- ByteBuffer getDigest()
- {
- return perCommandDigest == null
- ? ByteBufferUtil.EMPTY_BYTE_BUFFER
- : ByteBuffer.wrap(perCommandDigest.digest());
- }
-
- protected void onNewPartition(UnfilteredRowIterator partition)
- {
- assert purger != null;
- purger.setCurrentKey(partition.partitionKey());
- purger.setIsReverseOrder(partition.isReverseOrder());
- }
-
- protected void setPurger(RepairedDataPurger purger)
- {
- this.purger = purger;
- }
-
- boolean isConclusive()
- {
- return isConclusive;
- }
-
- void markInconclusive()
- {
- isConclusive = false;
- }
-
- void trackPartitionKey(DecoratedKey key)
- {
- getPerPartitionDigest().update(key.getKey());
- }
-
- void trackDeletion(DeletionTime deletion)
- {
- assert purger != null;
- DeletionTime purged = purger.applyToDeletion(deletion);
- if (!purged.isLive())
- isFullyPurged = false;
-
- purged.digest(getPerPartitionDigest());
- }
-
- void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
- {
- assert purger != null;
- RangeTombstoneMarker purged = purger.applyToMarker(marker);
- if (purged != null)
- {
- isFullyPurged = false;
- purged.digest(getPerPartitionDigest());
- }
- }
-
- void trackStaticRow(Row row)
- {
- assert purger != null;
- Row purged = purger.applyToRow(row);
- if (!purged.isEmpty())
- {
- isFullyPurged = false;
- purged.digest(getPerPartitionDigest());
- }
- }
-
- void trackRow(Row row)
- {
- assert purger != null;
- Row purged = purger.applyToRow(row);
- if (purged != null)
- {
- isFullyPurged = false;
- purged.digest(getPerPartitionDigest());
- }
- }
-
- private Digest getPerPartitionDigest()
- {
- if (perPartitionDigest == null)
- perPartitionDigest = Digest.forRepairedDataTracking();
-
- return perPartitionDigest;
- }
-
- private void onPartitionClose()
- {
- if (perPartitionDigest != null)
- {
- // If the partition wasn't completely emptied by the purger,
- // calculate the digest for the partition and use it to
- // update the overall digest
- if (!isFullyPurged)
- {
- if (perCommandDigest == null)
- perCommandDigest = Digest.forRepairedDataTracking();
-
- byte[] partitionDigest = perPartitionDigest.digest();
- perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
- isFullyPurged = true;
- }
-
- perPartitionDigest = null;
- }
- }
- }
-
- /**
- * Although PurgeFunction extends Transformation, this is never applied to an iterator.
- * Instead, it is used by RepairedDataInfo during the generation of a repaired data
- * digest to exclude data which will actually be purged later on in the read pipeline.
- */
- private static class RepairedDataPurger extends PurgeFunction
- {
- RepairedDataPurger(ColumnFamilyStore cfs,
- int nowInSec,
- int oldestUnrepairedTombstone)
- {
- super(nowInSec,
- cfs.gcBefore(nowInSec),
- oldestUnrepairedTombstone,
- cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
- cfs.metadata.get().enforceStrictLiveness());
- }
-
- protected LongPredicate getPurgeEvaluator()
- {
- return (time) -> true;
- }
-
- void setCurrentKey(DecoratedKey key)
- {
- super.onNewPartition(key);
- }
-
- void setIsReverseOrder(boolean isReverseOrder)
- {
- super.setReverseOrder(isReverseOrder);
- }
-
- public DeletionTime applyToDeletion(DeletionTime deletionTime)
- {
- return super.applyToDeletion(deletionTime);
- }
-
- public Row applyToRow(Row row)
- {
- return super.applyToRow(row);
- }
-
- public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
- {
- return super.applyToMarker(marker);
- }
- }
-
@SuppressWarnings("resource") // resultant iterators are closed by their callers
InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view)
{
- BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
- (unfilteredRowIterators, repairedDataInfo) ->
- withRepairedDataInfo(UnfilteredRowIterators.merge(unfilteredRowIterators), repairedDataInfo);
+ final BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
+ (unfilteredRowIterators, repairedDataInfo) -> {
+ UnfilteredRowIterator repaired = UnfilteredRowIterators.merge(unfilteredRowIterators);
+ return repairedDataInfo.withRepairedDataInfo(repaired);
+ };
- return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+ // For single partition reads, after reading up to the command's DataLimit nothing extra is required.
+ // The merged & repaired row iterator will be consumed until it's exhausted or the RepairedDataInfo's
+ // internal counter is satisfied
+ final Function<UnfilteredRowIterator, UnfilteredPartitionIterator> postLimitPartitions =
+ (rows) -> EmptyIterators.unfilteredPartition(metadata());
+ return new InputCollector<>(view, repairedDataInfo, merge, postLimitPartitions, isTrackingRepairedStatus());
}
@SuppressWarnings("resource") // resultant iterators are closed by their callers
InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view)
{
- BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
- (unfilteredPartitionIterators, repairedDataInfo) ->
- withRepairedDataInfo(UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, UnfilteredPartitionIterators.MergeListener.NOOP), repairedDataInfo);
+ final BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
+ (unfilteredPartitionIterators, repairedDataInfo) -> {
+ UnfilteredPartitionIterator repaired = UnfilteredPartitionIterators.merge(unfilteredPartitionIterators,
+ NOOP);
+ return repairedDataInfo.withRepairedDataInfo(repaired);
+ };
- return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+ // Uses identity function to provide additional partitions to be consumed after the command's
+ // DataLimits are satisfied. The input to the function will be the iterator of merged, repaired partitions
+ // which we'll keep reading until the RepairedDataInfo's internal counter is satisfied.
+ return new InputCollector<>(view, repairedDataInfo, merge, Function.identity(), isTrackingRepairedStatus());
}
/**
@@ -988,12 +783,14 @@ public abstract class ReadCommand extends AbstractReadQuery
private final boolean isTrackingRepairedStatus;
Set<SSTableReader> repairedSSTables;
BiFunction<List<T>, RepairedDataInfo, T> repairedMerger;
+ Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions;
List<T> repairedIters;
List<T> unrepairedIters;
InputCollector(ColumnFamilyStore.ViewFragment view,
RepairedDataInfo repairedDataInfo,
BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
+ Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions,
boolean isTrackingRepairedStatus)
{
this.repairedDataInfo = repairedDataInfo;
@@ -1023,6 +820,7 @@ public abstract class ReadCommand extends AbstractReadQuery
unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
}
this.repairedMerger = repairedMerger;
+ this.postLimitAdditionalPartitions = postLimitAdditionalPartitions;
}
void addMemtableIterator(T iter)
@@ -1038,15 +836,17 @@ public abstract class ReadCommand extends AbstractReadQuery
unrepairedIters.add(iter);
}
+ @SuppressWarnings("resource") // the returned iterators are closed by the caller
List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
{
if (repairedIters.isEmpty())
return unrepairedIters;
// merge the repaired data before returning, wrapping in a digest generator
- RepairedDataPurger purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
- repairedDataInfo.setPurger(purger);
- unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
+ repairedDataInfo.prepare(cfs, nowInSec, oldestUnrepairedTombstone);
+ T repairedIter = repairedMerger.apply(repairedIters, repairedDataInfo);
+ repairedDataInfo.finalize(postLimitAdditionalPartitions.apply(repairedIter));
+ unrepairedIters.add(repairedIter);
return unrepairedIters;
}
diff --git a/src/java/org/apache/cassandra/db/RepairedDataInfo.java b/src/java/org/apache/cassandra/db/RepairedDataInfo.java
new file mode 100644
index 0000000..be636d3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RepairedDataInfo.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.function.LongPredicate;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PurgeFunction;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class RepairedDataInfo
+{
+ public static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo(null)
+ {
+ boolean isConclusive(){ return true; }
+ ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
+ };
+
+ // Keeps a digest of the partition currently being processed. Since we won't know
+ // whether a partition will be fully purged from a read result until it's been
+ // consumed, we buffer this per-partition digest and add it to the final digest
+ // when the partition is closed (if it wasn't fully purged).
+ private Digest perPartitionDigest;
+ private Digest perCommandDigest;
+ private boolean isConclusive = true;
+ private ByteBuffer calculatedDigest = null;
+
+ // Doesn't actually purge from the underlying iterators, but excludes from the digest
+ // the purger can't be initialized until we've iterated all the sstables for the query
+ // as it requires the oldest repaired tombstone
+ private RepairedDataPurger purger;
+ private boolean isFullyPurged = true;
+
+ // Supplies additional partitions from the repaired data set to be consumed when the limit of
+ // executing ReadCommand has been reached. This is to ensure that each replica attempts to
+ // read the same amount of repaired data, otherwise comparisons of the repaired data digests
+ // may be invalidated by varying amounts of repaired data being present on each replica.
+ // This can't be initialized until after the underlying repaired iterators have been merged.
+ private UnfilteredPartitionIterator postLimitPartitions = null;
+ private final DataLimits.Counter repairedCounter;
+ private UnfilteredRowIterator currentPartition;
+ private TableMetrics metrics;
+
+ public RepairedDataInfo(DataLimits.Counter repairedCounter)
+ {
+ this.repairedCounter = repairedCounter;
+ }
+
+ ByteBuffer getDigest()
+ {
+ if (calculatedDigest != null)
+ return calculatedDigest;
+
+ calculatedDigest = perCommandDigest == null
+ ? ByteBufferUtil.EMPTY_BYTE_BUFFER
+ : ByteBuffer.wrap(perCommandDigest.digest());
+
+ return calculatedDigest;
+ }
+
+ void prepare(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
+ {
+ this.purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
+ this.metrics = cfs.metric;
+ }
+
+ void finalize(UnfilteredPartitionIterator postLimitPartitions)
+ {
+ this.postLimitPartitions = postLimitPartitions;
+ }
+
+ boolean isConclusive()
+ {
+ return isConclusive;
+ }
+
+ void markInconclusive()
+ {
+ isConclusive = false;
+ }
+
+ private void onNewPartition(UnfilteredRowIterator partition)
+ {
+ assert purger != null;
+ purger.setCurrentKey(partition.partitionKey());
+ purger.setIsReverseOrder(partition.isReverseOrder());
+ this.currentPartition = partition;
+ }
+
+ private Digest getPerPartitionDigest()
+ {
+ if (perPartitionDigest == null)
+ perPartitionDigest = Digest.forRepairedDataTracking();
+
+ return perPartitionDigest;
+ }
+
+ public UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator)
+ {
+ class WithTracking extends Transformation<UnfilteredRowIterator>
+ {
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ return withRepairedDataInfo(partition);
+ }
+ }
+ return Transformation.apply(iterator, new WithTracking());
+ }
+
+ public UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator)
+ {
+ class WithTracking extends Transformation<UnfilteredRowIterator>
+ {
+ protected DecoratedKey applyToPartitionKey(DecoratedKey key)
+ {
+ getPerPartitionDigest().update(key.getKey());
+ return key;
+ }
+
+ protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ if (repairedCounter.isDone())
+ return deletionTime;
+
+ assert purger != null;
+ DeletionTime purged = purger.applyToDeletion(deletionTime);
+ if (!purged.isLive())
+ isFullyPurged = false;
+ purged.digest(getPerPartitionDigest());
+ return deletionTime;
+ }
+
+ protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ if (repairedCounter.isDone())
+ return marker;
+
+ assert purger != null;
+ RangeTombstoneMarker purged = purger.applyToMarker(marker);
+ if (purged != null)
+ {
+ isFullyPurged = false;
+ purged.digest(getPerPartitionDigest());
+ }
+ return marker;
+ }
+
+ protected Row applyToStatic(Row row)
+ {
+ if (repairedCounter.isDone())
+ return row;
+
+ assert purger != null;
+ Row purged = purger.applyToRow(row);
+ if (!purged.isEmpty())
+ {
+ isFullyPurged = false;
+ purged.digest(getPerPartitionDigest());
+ }
+ return row;
+ }
+
+ protected Row applyToRow(Row row)
+ {
+ if (repairedCounter.isDone())
+ return row;
+
+ assert purger != null;
+ Row purged = purger.applyToRow(row);
+ if (purged != null)
+ {
+ isFullyPurged = false;
+ purged.digest(getPerPartitionDigest());
+ }
+ return row;
+ }
+
+ protected void onPartitionClose()
+ {
+ if (perPartitionDigest != null)
+ {
+ // If the partition wasn't completely emptied by the purger,
+ // calculate the digest for the partition and use it to
+ // update the overall digest
+ if (!isFullyPurged)
+ {
+ if (perCommandDigest == null)
+ perCommandDigest = Digest.forRepairedDataTracking();
+
+ byte[] partitionDigest = perPartitionDigest.digest();
+ perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
+ }
+
+ perPartitionDigest = null;
+ }
+ isFullyPurged = true;
+ }
+ }
+
+ if (repairedCounter.isDone())
+ return iterator;
+
+ UnfilteredRowIterator tracked = repairedCounter.applyTo(Transformation.apply(iterator, new WithTracking()));
+ onNewPartition(tracked);
+ return tracked;
+ }
+
+ public UnfilteredPartitionIterator extend(final UnfilteredPartitionIterator partitions,
+ final DataLimits.Counter limit)
+ {
+ class OverreadRepairedData extends Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator>
+ {
+
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ return MoreRows.extend(partition, this, partition.columns());
+ }
+
+ public UnfilteredRowIterator moreContents()
+ {
+ // We don't need to do anything until the DataLimits of the
+ // of the read have been reached
+ if (!limit.isDone() || repairedCounter.isDone())
+ return null;
+
+ long countBeforeOverreads = repairedCounter.counted();
+ long overreadStartTime = System.nanoTime();
+ if (currentPartition != null)
+ consumePartition(currentPartition, repairedCounter);
+
+ if (postLimitPartitions != null)
+ while (postLimitPartitions.hasNext() && !repairedCounter.isDone())
+ consumePartition(postLimitPartitions.next(), repairedCounter);
+
+ // we're not actually providing any more rows, just consuming the repaired data
+ long rows = repairedCounter.counted() - countBeforeOverreads;
+ long nanos = System.nanoTime() - overreadStartTime;
+ metrics.repairedDataTrackingOverreadRows.update(rows);
+ metrics.repairedDataTrackingOverreadTime.update(nanos, TimeUnit.NANOSECONDS);
+ Tracing.trace("Read {} additional rows of repaired data for tracking in {}ps", rows, TimeUnit.NANOSECONDS.toMicros(nanos));
+ return null;
+ }
+
+ private void consumePartition(UnfilteredRowIterator partition, DataLimits.Counter counter)
+ {
+ if (partition == null)
+ return;
+
+ while (!counter.isDone() && partition.hasNext())
+ partition.next();
+
+ partition.close();
+ }
+ }
+ // If the read didn't touch any sstables prepare() hasn't been called and
+ // we can skip this transformation
+ if (metrics == null || repairedCounter.isDone())
+ return partitions;
+ return Transformation.apply(partitions, new OverreadRepairedData());
+ }
+
+ /**
+ * Although PurgeFunction extends Transformation, this is never applied to an iterator.
+ * Instead, it is used by RepairedDataInfo during the generation of a repaired data
+ * digest to exclude data which will actually be purged later on in the read pipeline.
+ */
+ private static class RepairedDataPurger extends PurgeFunction
+ {
+ RepairedDataPurger(ColumnFamilyStore cfs,
+ int nowInSec,
+ int oldestUnrepairedTombstone)
+ {
+ super(nowInSec,
+ cfs.gcBefore(nowInSec),
+ oldestUnrepairedTombstone,
+ cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
+ cfs.metadata.get().enforceStrictLiveness());
+ }
+
+ protected LongPredicate getPurgeEvaluator()
+ {
+ return (time) -> true;
+ }
+
+ void setCurrentKey(DecoratedKey key)
+ {
+ super.onNewPartition(key);
+ }
+
+ void setIsReverseOrder(boolean isReverseOrder)
+ {
+ super.setReverseOrder(isReverseOrder);
+ }
+
+ public DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ return super.applyToDeletion(deletionTime);
+ }
+
+ public Row applyToRow(Row row)
+ {
+ return super.applyToRow(row);
+ }
+
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ return super.applyToMarker(marker);
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index ef3338e..9c45dc0 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -142,6 +142,15 @@ public class KeyspaceMetrics
*/
public final Meter unconfirmedRepairedInconsistencies;
+ /**
+ * Tracks the amount overreading of repaired data replicas perform in order to produce digests
+ * at query time. For each query, on a full data read following an initial digest mismatch, the replicas
+ * may read extra repaired data, up to the DataLimit of the command, so that the coordinator can compare
+ * the repaired data on each replica. These are tracked on each replica.
+ */
+ public final Histogram repairedDataTrackingOverreadRows;
+ public final Timer repairedDataTrackingOverreadTime;
+
public final MetricNameFactory factory;
private Keyspace keyspace;
@@ -305,6 +314,9 @@ public class KeyspaceMetrics
confirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesConfirmed"));
unconfirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesUnconfirmed"));
+
+ repairedDataTrackingOverreadRows = Metrics.histogram(factory.createMetricName("RepairedOverreadRows"), false);
+ repairedDataTrackingOverreadTime = Metrics.timer(factory.createMetricName("RepairedOverreadTime"));
}
/**
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 6095f50..775d87c 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -31,6 +31,11 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import com.codahale.metrics.Timer;
+
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Memtable;
@@ -52,9 +57,6 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.RatioGauge;
-import com.codahale.metrics.Timer;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
/**
* Metrics for {@link ColumnFamilyStore}.
@@ -233,6 +235,13 @@ public class TableMetrics
// be part of the repaired set whilst others do not.
public final TableMeter unconfirmedRepairedInconsistencies;
+ // Tracks the amount overreading of repaired data replicas perform in order to produce digests
+ // at query time. For each query, on a full data read following an initial digest mismatch, the replicas
+ // may read extra repaired data, up to the DataLimit of the command, so that the coordinator can compare
+ // the repaired data on each replica. These are tracked on each replica.
+ public final TableHistogram repairedDataTrackingOverreadRows;
+ public final TableTimer repairedDataTrackingOverreadTime;
+
public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -946,6 +955,9 @@ public class TableMetrics
confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies);
unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
+ repairedDataTrackingOverreadRows = createTableHistogram("RepairedDataTrackingOverreadRows", cfs.keyspace.metric.repairedDataTrackingOverreadRows, false);
+ repairedDataTrackingOverreadTime = createTableTimer("RepairedDataTrackingOverreadTime", cfs.keyspace.metric.repairedDataTrackingOverreadTime);
+
unleveledSSTables = createTableGauge("UnleveledSSTables", cfs::getUnleveledSSTables, () -> {
// global gauge
int cnt = 0;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 664c99d..4b382a1 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -25,6 +25,8 @@ import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
@@ -45,6 +47,9 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.SnapshotVerbHandler;
import org.apache.cassandra.service.StorageProxy;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.junit.Assert.fail;
+
public class RepairDigestTrackingTest extends TestBaseImpl
{
private static final String TABLE = "tbl";
@@ -76,7 +81,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
i, i, i);
}
cluster.forEach(i -> i.flush(KEYSPACE));
- cluster.forEach(i -> assertNotRepaired());
+ cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
// mark everything on node 2 repaired
cluster.get(2).runOnInstance(markAllRepaired());
@@ -120,10 +125,10 @@ public class RepairDigestTrackingTest extends TestBaseImpl
cluster.forEach(i -> i.flush(KEYSPACE));
// nothing is repaired yet
- cluster.forEach(i -> assertNotRepaired());
+ cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
// mark everything repaired
- cluster.forEach(i -> markAllRepaired());
- cluster.forEach(i -> assertRepaired());
+ cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+ cluster.forEach(i -> i.runOnInstance(assertRepaired()));
// now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
for (int i = 0; i < 10; i++)
@@ -198,6 +203,162 @@ public class RepairDigestTrackingTest extends TestBaseImpl
}
}
+ @Test
+ public void testRepairedReadCountNormalizationWithInitialUnderread() throws Throwable
+ {
+ // Asserts that the amount of repaired data read for digest generation is consistent
+ // across replicas where one has to read less repaired data to satisfy the original
+ // limits of the read request.
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+
+ cluster.get(1).runOnInstance(() -> {
+ StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+ StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
+ });
+
+ cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
+ "WITH CLUSTERING ORDER BY (c DESC)");
+
+ // insert data on both nodes and flush
+ for (int i=0; i<20; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
+ ConsistencyLevel.ALL, i, i);
+ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+ ConsistencyLevel.ALL, i, i);
+ }
+ cluster.forEach(c -> c.flush(KEYSPACE));
+ // nothing is repaired yet
+ cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
+ // mark everything repaired
+ cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+ cluster.forEach(i -> i.runOnInstance(assertRepaired()));
+
+ // Add some unrepaired data to both nodes
+ for (int i=20; i<30; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+ ConsistencyLevel.ALL, i, i);
+ }
+ // And some more unrepaired data to node2 only. This causes node2 to read less repaired data than node1
+ // when satisfying the limits of the read. So node2 needs to overread more repaired data than node1 when
+ // calculating the repaired data digest.
+ cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", 30, 30);
+
+ // Verify single partition read
+ long ccBefore = getConfirmedInconsistencies(cluster.get(1));
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=1 LIMIT 20", ConsistencyLevel.ALL),
+ rows(1, 30, 11));
+ long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
+
+ // Recreate a mismatch in unrepaired data and verify partition range read
+ cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?)", 31, 31);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 30", ConsistencyLevel.ALL),
+ rows(1, 31, 2));
+ long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
+
+ if (ccAfterPartitionRead != ccAfterRangeRead)
+ if (ccAfterPartitionRead != ccBefore)
+ fail("Both range and partition reads reported data inconsistencies but none were expected");
+ else
+ fail("Reported inconsistency during range read but none were expected");
+ else if (ccAfterPartitionRead != ccBefore)
+ fail("Reported inconsistency during partition read but none were expected");
+ }
+ }
+
+ @Test
+ public void testRepairedReadCountNormalizationWithInitialOverread() throws Throwable
+ {
+ // Asserts that the amount of repaired data read for digest generation is consistent
+ // across replicas where one has to read more repaired data to satisfy the original
+ // limits of the read request.
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+
+ cluster.get(1).runOnInstance(() -> {
+ StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+ StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
+ });
+
+ cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
+ "WITH CLUSTERING ORDER BY (c DESC)");
+
+ // insert data on both nodes and flush
+ for (int i=0; i<10; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
+ ConsistencyLevel.ALL, i, i);
+ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+ ConsistencyLevel.ALL, i, i);
+ }
+ cluster.forEach(c -> c.flush(KEYSPACE));
+ // nothing is repaired yet
+ cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
+ // mark everything repaired
+ cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+ cluster.forEach(i -> i.runOnInstance(assertRepaired()));
+
+ // Add some unrepaired data to both nodes
+ for (int i=10; i<13; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 1",
+ ConsistencyLevel.ALL, i, i);
+ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+ ConsistencyLevel.ALL, i, i);
+ }
+ cluster.forEach(c -> c.flush(KEYSPACE));
+ // And some row deletions on node2 only which cover data in the repaired set
+ // This will cause node2 to read more repaired data in satisfying the limit of the read request
+ // so it should overread less than node1 (in fact, it should not overread at all) in order to
+ // calculate the repaired data digest.
+ for (int i=7; i<10; i++)
+ {
+ cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 0 AND c = ?", i);
+ cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 1 AND c = ?", i);
+ }
+
+ // Verify single partition read
+ long ccBefore = getConfirmedInconsistencies(cluster.get(1));
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0 LIMIT 5", ConsistencyLevel.ALL),
+ rows(rows(0, 12, 10), rows(0, 6, 5)));
+ long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
+
+ // Verify partition range read
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 11", ConsistencyLevel.ALL),
+ rows(rows(1, 12, 10), rows(1, 6, 0), rows(0, 12, 12)));
+ long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
+
+ if (ccAfterPartitionRead != ccAfterRangeRead)
+ if (ccAfterPartitionRead != ccBefore)
+ fail("Both range and partition reads reported data inconsistencies but none were expected");
+ else
+ fail("Reported inconsistency during range read but none were expected");
+ else if (ccAfterPartitionRead != ccBefore)
+ fail("Reported inconsistency during partition read but none were expected");
+ }
+ }
+
+ private Object[][] rows(Object[][] head, Object[][]...tail)
+ {
+ return Stream.concat(Stream.of(head),
+ Stream.of(tail).flatMap(Stream::of))
+ .toArray(Object[][]::new);
+ }
+
+ private Object[][] rows(int partitionKey, int start, int end)
+ {
+ if (start == end)
+ return new Object[][] { new Object[] { partitionKey, start, end } };
+
+ IntStream clusterings = start > end
+ ? IntStream.range(end -1, start).map(i -> start - i + end - 1)
+ : IntStream.range(start, end);
+
+ return clusterings.mapToObj(i -> new Object[] {partitionKey, i, i}).toArray(Object[][]::new);
+ }
+
private IIsolatedExecutor.SerializableRunnable assertNotRepaired()
{
return () ->
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index e0215b7..0824168 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -23,6 +23,8 @@ import java.io.OutputStream;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -41,6 +43,8 @@ import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.Row;
@@ -61,6 +65,7 @@ import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.metrics.ClearableHistogram;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
@@ -94,6 +99,7 @@ public class ReadCommandTest
private static final String CF6 = "Standard6";
private static final String CF7 = "Counter7";
private static final String CF8 = "Standard8";
+ private static final String CF9 = "Standard9";
private static final InetAddressAndPort REPAIR_COORDINATOR;
static {
@@ -178,6 +184,12 @@ public class ReadCommandTest
.addRegularColumn("b", AsciiType.instance)
.addRegularColumn("c", SetType.getInstance(AsciiType.instance, true));
+ TableMetadata.Builder metadata9 =
+ TableMetadata.builder(KEYSPACE, CF9)
+ .addPartitionKeyColumn("key", Int32Type.instance)
+ .addClusteringColumn("col", ReversedType.getInstance(Int32Type.instance))
+ .addRegularColumn("a", AsciiType.instance);
+
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
@@ -188,7 +200,8 @@ public class ReadCommandTest
metadata5,
metadata6,
metadata7,
- metadata8);
+ metadata8,
+ metadata9);
LocalSessionAccessor.startup();
}
@@ -823,14 +836,113 @@ public class ReadCommandTest
}
}
+ @Test
+ public void testRepairedDataOverreadMetrics()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF9);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ cfs.metadata().withSwapped(cfs.metadata().params.unbuild()
+ .caching(CachingParams.CACHE_NOTHING)
+ .build());
+ // Insert and repair
+ insert(cfs, IntStream.range(0, 10), () -> IntStream.range(0, 10));
+ cfs.forceBlockingFlush();
+ cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+ // Insert and leave unrepaired
+ insert(cfs, IntStream.range(0, 10), () -> IntStream.range(10, 20));
+
+ // Single partition reads
+ int limit = 5;
+ ReadCommand cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build();
+ assertEquals(0, getAndResetOverreadCount(cfs));
+
+ // No overreads if not tracking
+ readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+ assertEquals(0, getAndResetOverreadCount(cfs));
+
+ // Overread up to (limit - 1) if tracking is enabled
+ cmd = cmd.copy();
+ cmd.trackRepairedStatus();
+ readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+ // overread count is always < limit as the first read is counted during merging (and so is expected)
+ assertEquals(limit - 1, getAndResetOverreadCount(cfs));
+
+ // if limit already requires reading all repaired data, no overreads should be recorded
+ limit = 20;
+ cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build();
+ readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+ assertEquals(0, getAndResetOverreadCount(cfs));
+
+ // Range reads
+ limit = 5;
+ cmd = Util.cmd(cfs).withLimit(limit).build();
+ assertEquals(0, getAndResetOverreadCount(cfs));
+ // No overreads if not tracking
+ readAndCheckRowCount(Util.getAll(cmd), limit);
+ assertEquals(0, getAndResetOverreadCount(cfs));
+
+ // Overread up to (limit - 1) if tracking is enabled
+ cmd = cmd.copy();
+ cmd.trackRepairedStatus();
+ readAndCheckRowCount(Util.getAll(cmd), limit);
+ assertEquals(limit - 1, getAndResetOverreadCount(cfs));
+
+ // if limit already requires reading all repaired data, no overreads should be recorded
+ limit = 100;
+ cmd = Util.cmd(cfs).withLimit(limit).build();
+ readAndCheckRowCount(Util.getAll(cmd), limit);
+ assertEquals(0, getAndResetOverreadCount(cfs));
+ }
+
private void setGCGrace(ColumnFamilyStore cfs, int gcGrace)
{
TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build();
KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace);
Schema.instance.load(
- keyspaceMetadata.withSwapped(
- keyspaceMetadata.tables.withSwapped(
- cfs.metadata().withSwapped(newParams))));
+ keyspaceMetadata.withSwapped(
+ keyspaceMetadata.tables.withSwapped(
+ cfs.metadata().withSwapped(newParams))));
+ }
+
+ private long getAndResetOverreadCount(ColumnFamilyStore cfs)
+ {
+ // always clear the histogram after reading to make comparisons & asserts easier
+ long rows = cfs.metric.repairedDataTrackingOverreadRows.cf.getSnapshot().getMax();
+ ((ClearableHistogram)cfs.metric.repairedDataTrackingOverreadRows.cf).clear();
+ return rows;
+ }
+
+ private void readAndCheckRowCount(Iterable<FilteredPartition> partitions, int expected)
+ {
+ int count = 0;
+ for (Partition partition : partitions)
+ {
+ assertFalse(partition.isEmpty());
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+ {
+ while (iter.hasNext())
+ {
+ iter.next();
+ count++;
+ }
+ }
+ }
+ assertEquals(expected, count);
+ }
+
+ private void insert(ColumnFamilyStore cfs, IntStream partitionIds, Supplier<IntStream> rowIds)
+ {
+ partitionIds.mapToObj(ByteBufferUtil::bytes)
+ .forEach( pk ->
+ rowIds.get().forEach( c ->
+ new RowUpdateBuilder(cfs.metadata(), 0, pk)
+ .clustering(c)
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply()
+
+ ));
}
private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1)
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java b/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java
new file mode 100644
index 0000000..00a1f56
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.stream.IntStream;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Hex;
+
+import static org.apache.cassandra.Util.clustering;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.utils.ByteBufferUtil.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class RepairedDataInfoTest
+{
+ private static ColumnFamilyStore cfs;
+ private static TableMetadata metadata;
+ private static ColumnMetadata valueMetadata;
+ private static ColumnMetadata staticMetadata;
+
+ private final int nowInSec = FBUtilities.nowInSeconds();
+
+ @BeforeClass
+ public static void setUp()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ CommitLog.instance.start();
+ MockSchema.cleanup();
+ String ks = "repaired_data_info_test";
+ cfs = MockSchema.newCFS(ks, metadata -> metadata.addStaticColumn("s", UTF8Type.instance));
+ metadata = cfs.metadata();
+ valueMetadata = metadata.regularColumns().getSimple(0);
+ staticMetadata = metadata.staticColumns().getSimple(0);
+ }
+
+ @Test
+ public void withTrackingAppliesRepairedDataCounter()
+ {
+ DataLimits.Counter counter = DataLimits.cqlLimits(15).newCounter(nowInSec, false, false, false).onlyCount();
+ RepairedDataInfo info = new RepairedDataInfo(counter);
+ info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+ UnfilteredRowIterator[] partitions = new UnfilteredRowIterator[3];
+ for (int i=0; i<3; i++)
+ partitions[i] = partition(bytes(i), rows(0, 5, nowInSec));
+
+ UnfilteredPartitionIterator iter = partitions(partitions);
+ iter = info.withRepairedDataInfo(iter);
+ consume(iter);
+
+ assertEquals(15, counter.counted());
+ assertEquals(5, counter.countedInCurrentPartition());
+ }
+
+ @Test
+ public void digestOfSinglePartitionWithSingleRowAndEmptyStaticRow()
+ {
+ Digest manualDigest = Digest.forRepairedDataTracking();
+ Row[] rows = rows(0, 1, nowInSec);
+ UnfilteredRowIterator partition = partition(bytes(0), rows);
+ addToDigest(manualDigest,
+ partition.partitionKey().getKey(),
+ partition.partitionLevelDeletion(),
+ Rows.EMPTY_STATIC_ROW,
+ rows);
+ byte[] fromRepairedInfo = consume(partition);
+ assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+ }
+
+ @Test
+ public void digestOfSinglePartitionWithMultipleRowsAndEmptyStaticRow()
+ {
+ Digest manualDigest = Digest.forRepairedDataTracking();
+ Row[] rows = rows(0, 5, nowInSec);
+ UnfilteredRowIterator partition = partition(bytes(0), rows);
+ addToDigest(manualDigest,
+ partition.partitionKey().getKey(),
+ partition.partitionLevelDeletion(),
+ Rows.EMPTY_STATIC_ROW,
+ rows);
+ byte[] fromRepairedInfo = consume(partition);
+ assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+ }
+
+ @Test
+ public void digestOfSinglePartitionWithMultipleRowsAndTombstones()
+ {
+ Digest manualDigest = Digest.forRepairedDataTracking();
+ Unfiltered[] unfiltereds = new Unfiltered[]
+ {
+ open(0), close(0),
+ row(1, 1, nowInSec),
+ open(2), close(4),
+ row(5, 7, nowInSec)
+ };
+ UnfilteredRowIterator partition = partition(bytes(0), unfiltereds);
+ addToDigest(manualDigest,
+ partition.partitionKey().getKey(),
+ partition.partitionLevelDeletion(),
+ Rows.EMPTY_STATIC_ROW,
+ unfiltereds);
+ byte[] fromRepairedInfo = consume(partition);
+ assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+ }
+
+ @Test
+ public void digestOfMultiplePartitionsWithMultipleRowsAndNonEmptyStaticRows()
+ {
+ Digest manualDigest = Digest.forRepairedDataTracking();
+ Row staticRow = staticRow(nowInSec);
+ Row[] rows = rows(0, 5, nowInSec);
+ UnfilteredRowIterator[] partitionsArray = new UnfilteredRowIterator[5];
+ for (int i=0; i<5; i++)
+ {
+ UnfilteredRowIterator partition = partitionWithStaticRow(bytes(i), staticRow, rows);
+ partitionsArray[i] = partition;
+ addToDigest(manualDigest,
+ partition.partitionKey().getKey(),
+ partition.partitionLevelDeletion(),
+ staticRow,
+ rows);
+ }
+
+ UnfilteredPartitionIterator partitions = partitions(partitionsArray);
+ byte[] fromRepairedInfo = consume(partitions);
+ assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+ }
+
+ private RepairedDataInfo info()
+ {
+ return new RepairedDataInfo(DataLimits.NONE.newCounter(nowInSec, false, false, false));
+ }
+
+ private Digest addToDigest(Digest aggregate,
+ ByteBuffer partitionKey,
+ DeletionTime deletion,
+ Row staticRow,
+ Unfiltered...unfiltereds)
+ {
+ Digest perPartitionDigest = Digest.forRepairedDataTracking();
+ if (!staticRow.isEmpty())
+ staticRow.digest(perPartitionDigest);
+ perPartitionDigest.update(partitionKey);
+ deletion.digest(perPartitionDigest);
+ for (Unfiltered unfiltered : unfiltereds)
+ unfiltered.digest(perPartitionDigest);
+ byte[] rowDigestBytes = perPartitionDigest.digest();
+ aggregate.update(rowDigestBytes, 0, rowDigestBytes.length);
+ return aggregate;
+ }
+
+ private byte[] consume(UnfilteredPartitionIterator partitions)
+ {
+ RepairedDataInfo info = info();
+ info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+ partitions.forEachRemaining(partition ->
+ {
+ try (UnfilteredRowIterator iter = info.withRepairedDataInfo(partition))
+ {
+ iter.forEachRemaining(u -> {});
+ }
+ });
+ return getArray(info.getDigest());
+ }
+
+ private byte[] consume(UnfilteredRowIterator partition)
+ {
+ RepairedDataInfo info = info();
+ info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+ try (UnfilteredRowIterator iter = info.withRepairedDataInfo(partition))
+ {
+ iter.forEachRemaining(u -> {});
+ }
+ return getArray(info.getDigest());
+ }
+
+ public static Cell cell(ColumnMetadata def, Object value)
+ {
+ ByteBuffer bb = value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)def.type).decompose(value);
+ return new BufferCell(def, 1L, BufferCell.NO_TTL, BufferCell.NO_DELETION_TIME, bb, null);
+ }
+
+ private Row staticRow(int nowInSec)
+ {
+ Row.Builder builder = BTreeRow.unsortedBuilder();
+ builder.newRow(Clustering.STATIC_CLUSTERING);
+ builder.addCell(cell(staticMetadata, "static value"));
+ return builder.build();
+ }
+
+ private Row row(int clustering, int value, int nowInSec)
+ {
+ Row.Builder builder = BTreeRow.unsortedBuilder();
+ builder.newRow(clustering(metadata.comparator, Integer.toString(clustering)));
+ builder.addCell(cell(valueMetadata, Integer.toString(value)));
+ return builder.build();
+ }
+
+ private Row[] rows(int clusteringStart, int clusteringEnd, int nowInSec)
+ {
+ return IntStream.range(clusteringStart, clusteringEnd)
+ .mapToObj(v -> row(v, v, nowInSec))
+ .toArray(Row[]::new);
+ }
+
+ private RangeTombstoneBoundMarker open(int start)
+ {
+ return new RangeTombstoneBoundMarker(
+ ClusteringBound.create(ClusteringBound.boundKind(true, true),
+ new ByteBuffer[] { Clustering.make(Int32Type.instance.decompose(start)).get(0)}),
+ new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
+ }
+
+ private RangeTombstoneBoundMarker close(int close)
+ {
+ return new RangeTombstoneBoundMarker(
+ ClusteringBound.create(ClusteringBound.boundKind(false, true),
+ new ByteBuffer[] { Clustering.make(Int32Type.instance.decompose(close)).get(0)}),
+ new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
+ }
+
+ private UnfilteredRowIterator partition(ByteBuffer pk, Unfiltered... unfiltereds)
+ {
+ return partitionWithStaticRow(pk, Rows.EMPTY_STATIC_ROW, unfiltereds);
+ }
+
+ private UnfilteredRowIterator partitionWithStaticRow(ByteBuffer pk, Row staticRow, Unfiltered... unfiltereds)
+ {
+ Iterator<Unfiltered> unfilteredIterator = Arrays.asList(unfiltereds).iterator();
+ return new AbstractUnfilteredRowIterator(metadata, dk(pk), DeletionTime.LIVE, metadata.regularAndStaticColumns(), staticRow, false, EncodingStats.NO_STATS) {
+ protected Unfiltered computeNext()
+ {
+ return unfilteredIterator.hasNext() ? unfilteredIterator.next() : endOfData();
+ }
+ };
+ }
+
+ private static UnfilteredPartitionIterator partitions(UnfilteredRowIterator...partitions)
+ {
+ Iterator<UnfilteredRowIterator> partitionsIter = Arrays.asList(partitions).iterator();
+ return new AbstractUnfilteredPartitionIterator()
+ {
+ public TableMetadata metadata()
+ {
+ return metadata;
+ }
+
+ public boolean hasNext()
+ {
+ return partitionsIter.hasNext();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ return partitionsIter.next();
+ }
+ };
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org