You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/04/16 17:44:53 UTC

[cassandra] 02/02: Ensure repaired data tracking reads a consistent amount of data across replicas

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a8e7cfbc0e146ea82154654ba43b613b058f99d1
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Feb 11 09:59:31 2020 +0000

    Ensure repaired data tracking reads a consistent amount of data across replicas
    
    Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-15601
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/ReadCommand.java  | 310 ++++---------------
 .../org/apache/cassandra/db/RepairedDataInfo.java  | 336 +++++++++++++++++++++
 .../apache/cassandra/metrics/KeyspaceMetrics.java  |  12 +
 .../org/apache/cassandra/metrics/TableMetrics.java |  18 +-
 .../distributed/test/RepairDigestTrackingTest.java | 169 ++++++++++-
 .../org/apache/cassandra/db/ReadCommandTest.java   | 120 +++++++-
 .../apache/cassandra/db/RepairedDataInfoTest.java  | 303 +++++++++++++++++++
 8 files changed, 1003 insertions(+), 266 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 96eeed4..4586c71 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601)
  * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
  * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
  * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 4f8ea3e..4c4c833 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.LongPredicate;
+import java.util.function.Function;
 
 import javax.annotation.Nullable;
 
@@ -62,12 +63,12 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
 
 /**
  * General interface for storage-engine read commands (common to both range and
@@ -91,17 +92,7 @@ public abstract class ReadCommand extends AbstractReadQuery
     // for data queries, coordinators may request information on the repaired data used in constructing the response
     private boolean trackRepairedStatus = false;
     // tracker for repaired data, initialized to singleton null object
-    private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
-    {
-        void trackPartitionKey(DecoratedKey key){}
-        void trackDeletion(DeletionTime deletion){}
-        void trackRangeTombstoneMarker(RangeTombstoneMarker marker){}
-        void trackRow(Row row){}
-        boolean isConclusive(){ return true; }
-        ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
-    };
-
-    private RepairedDataInfo repairedDataInfo = NULL_REPAIRED_DATA_INFO;
+    private RepairedDataInfo repairedDataInfo = RepairedDataInfo.NULL_REPAIRED_DATA_INFO;
 
     int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 
@@ -450,7 +441,13 @@ public abstract class ReadCommand extends AbstractReadQuery
         }
 
         if (isTrackingRepairedStatus())
-            repairedDataInfo = new RepairedDataInfo();
+        {
+            final DataLimits.Counter repairedReadCount = limits().newCounter(nowInSec(),
+                                                                             false,
+                                                                             selectsFullPartition(),
+                                                                             metadata().enforceStrictLiveness()).onlyCount();
+            repairedDataInfo = new RepairedDataInfo(repairedReadCount);
+        }
 
         UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
         iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
@@ -475,7 +472,22 @@ public abstract class ReadCommand extends AbstractReadQuery
 
             // apply the limits/row counter; this transformation is stopping and would close the iterator as soon
             // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
-            iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+            // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
+            // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
+            if (isTrackingRepairedStatus())
+            {
+                DataLimits.Counter limit =
+                    limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness());
+                iterator = limit.applyTo(iterator);
+                // ensure that a consistent amount of repaired data is read on each replica. This causes silent
+                // overreading from the repaired data set, up to limits(). The extra data is not visible to
+                // the caller, only iterated to produce the repaired data digest.
+                iterator = repairedDataInfo.extend(iterator, limit);
+            }
+            else
+            {
+                iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+            }
 
             // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
             return RTBoundCloser.close(iterator);
@@ -723,254 +735,37 @@ public abstract class ReadCommand extends AbstractReadQuery
         return toCQLString();
     }
 
-    private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
-                                                                    final RepairedDataInfo repairedDataInfo)
-    {
-        class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
-        {
-            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
-            {
-                return withRepairedDataInfo(partition, repairedDataInfo);
-            }
-        }
-
-        return Transformation.apply(iterator, new WithRepairedDataTracking());
-    }
-
-    private static UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator,
-                                                              final RepairedDataInfo repairedDataInfo)
-    {
-        class WithTracking extends Transformation
-        {
-            protected DecoratedKey applyToPartitionKey(DecoratedKey key)
-            {
-                repairedDataInfo.onNewPartition(iterator);
-                repairedDataInfo.trackPartitionKey(key);
-                return key;
-            }
-
-            protected DeletionTime applyToDeletion(DeletionTime deletionTime)
-            {
-                repairedDataInfo.trackDeletion(deletionTime);
-                return deletionTime;
-            }
-
-            protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
-            {
-                repairedDataInfo.trackRangeTombstoneMarker(marker);
-                return marker;
-            }
-
-            protected Row applyToStatic(Row row)
-            {
-                repairedDataInfo.trackStaticRow(row);
-                return row;
-            }
-
-            protected Row applyToRow(Row row)
-            {
-                repairedDataInfo.trackRow(row);
-                return row;
-            }
-
-            protected void onPartitionClose()
-            {
-                repairedDataInfo.onPartitionClose();
-            }
-        }
-        return Transformation.apply(iterator, new WithTracking());
-    }
-
-    private static class RepairedDataInfo
-    {
-        // Keeps a digest of the partition currently being processed. Since we won't know
-        // whether a partition will be fully purged from a read result until it's been
-        // consumed, we buffer this per-partition digest and add it to the final digest
-        // when the partition is closed (if it wasn't fully purged).
-        private Digest perPartitionDigest;
-        private Digest perCommandDigest;
-        private boolean isConclusive = true;
-
-        // Doesn't actually purge from the underlying iterators, but excludes from the digest
-        // the purger can't be initialized until we've iterated all the sstables for the query
-        // as it requires the oldest repaired tombstone
-        private RepairedDataPurger purger;
-        private boolean isFullyPurged = true;
-
-        ByteBuffer getDigest()
-        {
-            return perCommandDigest == null
-                   ? ByteBufferUtil.EMPTY_BYTE_BUFFER
-                   : ByteBuffer.wrap(perCommandDigest.digest());
-        }
-
-        protected void onNewPartition(UnfilteredRowIterator partition)
-        {
-            assert purger != null;
-            purger.setCurrentKey(partition.partitionKey());
-            purger.setIsReverseOrder(partition.isReverseOrder());
-        }
-
-        protected void setPurger(RepairedDataPurger purger)
-        {
-            this.purger = purger;
-        }
-
-        boolean isConclusive()
-        {
-            return isConclusive;
-        }
-
-        void markInconclusive()
-        {
-            isConclusive = false;
-        }
-
-        void trackPartitionKey(DecoratedKey key)
-        {
-            getPerPartitionDigest().update(key.getKey());
-        }
-
-        void trackDeletion(DeletionTime deletion)
-        {
-            assert purger != null;
-            DeletionTime purged = purger.applyToDeletion(deletion);
-            if (!purged.isLive())
-                isFullyPurged = false;
-
-            purged.digest(getPerPartitionDigest());
-        }
-
-        void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
-        {
-            assert purger != null;
-            RangeTombstoneMarker purged = purger.applyToMarker(marker);
-            if (purged != null)
-            {
-                isFullyPurged = false;
-                purged.digest(getPerPartitionDigest());
-            }
-        }
-
-        void trackStaticRow(Row row)
-        {
-            assert purger != null;
-            Row purged = purger.applyToRow(row);
-            if (!purged.isEmpty())
-            {
-                isFullyPurged = false;
-                purged.digest(getPerPartitionDigest());
-            }
-        }
-
-        void trackRow(Row row)
-        {
-            assert purger != null;
-            Row purged = purger.applyToRow(row);
-            if (purged != null)
-            {
-                isFullyPurged = false;
-                purged.digest(getPerPartitionDigest());
-            }
-        }
-
-        private Digest getPerPartitionDigest()
-        {
-            if (perPartitionDigest == null)
-                perPartitionDigest = Digest.forRepairedDataTracking();
-
-            return perPartitionDigest;
-        }
-
-        private void onPartitionClose()
-        {
-            if (perPartitionDigest != null)
-            {
-                // If the partition wasn't completely emptied by the purger,
-                // calculate the digest for the partition and use it to
-                // update the overall digest
-                if (!isFullyPurged)
-                {
-                    if (perCommandDigest == null)
-                        perCommandDigest = Digest.forRepairedDataTracking();
-
-                    byte[] partitionDigest = perPartitionDigest.digest();
-                    perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
-                    isFullyPurged = true;
-                }
-
-                perPartitionDigest = null;
-            }
-        }
-    }
-
-    /**
-     * Although PurgeFunction extends Transformation, this is never applied to an iterator.
-     * Instead, it is used by RepairedDataInfo during the generation of a repaired data
-     * digest to exclude data which will actually be purged later on in the read pipeline.
-     */
-    private static class RepairedDataPurger extends PurgeFunction
-    {
-        RepairedDataPurger(ColumnFamilyStore cfs,
-                           int nowInSec,
-                           int oldestUnrepairedTombstone)
-        {
-            super(nowInSec,
-                  cfs.gcBefore(nowInSec),
-                  oldestUnrepairedTombstone,
-                  cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
-                  cfs.metadata.get().enforceStrictLiveness());
-        }
-
-        protected LongPredicate getPurgeEvaluator()
-        {
-            return (time) -> true;
-        }
-
-        void setCurrentKey(DecoratedKey key)
-        {
-            super.onNewPartition(key);
-        }
-
-        void setIsReverseOrder(boolean isReverseOrder)
-        {
-            super.setReverseOrder(isReverseOrder);
-        }
-
-        public DeletionTime applyToDeletion(DeletionTime deletionTime)
-        {
-            return super.applyToDeletion(deletionTime);
-        }
-
-        public Row applyToRow(Row row)
-        {
-            return super.applyToRow(row);
-        }
-
-        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
-        {
-            return super.applyToMarker(marker);
-        }
-    }
-
     @SuppressWarnings("resource") // resultant iterators are closed by their callers
     InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view)
     {
-        BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
-            (unfilteredRowIterators, repairedDataInfo) ->
-                withRepairedDataInfo(UnfilteredRowIterators.merge(unfilteredRowIterators), repairedDataInfo);
+        final BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
+            (unfilteredRowIterators, repairedDataInfo) -> {
+                UnfilteredRowIterator repaired = UnfilteredRowIterators.merge(unfilteredRowIterators);
+                return repairedDataInfo.withRepairedDataInfo(repaired);
+            };
 
-        return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+        // For single partition reads, after reading up to the command's DataLimit nothing extra is required.
+        // The merged & repaired row iterator will be consumed until it's exhausted or the RepairedDataInfo's
+        // internal counter is satisfied
+        final Function<UnfilteredRowIterator, UnfilteredPartitionIterator> postLimitPartitions =
+            (rows) -> EmptyIterators.unfilteredPartition(metadata());
+        return new InputCollector<>(view, repairedDataInfo, merge, postLimitPartitions, isTrackingRepairedStatus());
     }
 
     @SuppressWarnings("resource") // resultant iterators are closed by their callers
     InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view)
     {
-        BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
-            (unfilteredPartitionIterators, repairedDataInfo) ->
-                withRepairedDataInfo(UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, UnfilteredPartitionIterators.MergeListener.NOOP), repairedDataInfo);
+        final BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
+            (unfilteredPartitionIterators, repairedDataInfo) -> {
+                UnfilteredPartitionIterator repaired = UnfilteredPartitionIterators.merge(unfilteredPartitionIterators,
+                                                                                          NOOP);
+                return repairedDataInfo.withRepairedDataInfo(repaired);
+            };
 
-        return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+        // Uses identity function to provide additional partitions to be consumed after the command's
+        // DataLimits are satisfied. The input to the function will be the iterator of merged, repaired partitions
+        // which we'll keep reading until the RepairedDataInfo's internal counter is satisfied.
+        return new InputCollector<>(view, repairedDataInfo, merge, Function.identity(), isTrackingRepairedStatus());
     }
 
     /**
@@ -988,12 +783,14 @@ public abstract class ReadCommand extends AbstractReadQuery
         private final boolean isTrackingRepairedStatus;
         Set<SSTableReader> repairedSSTables;
         BiFunction<List<T>, RepairedDataInfo, T> repairedMerger;
+        Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions;
         List<T> repairedIters;
         List<T> unrepairedIters;
 
         InputCollector(ColumnFamilyStore.ViewFragment view,
                        RepairedDataInfo repairedDataInfo,
                        BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
+                       Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions,
                        boolean isTrackingRepairedStatus)
         {
             this.repairedDataInfo = repairedDataInfo;
@@ -1023,6 +820,7 @@ public abstract class ReadCommand extends AbstractReadQuery
                 unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
             }
             this.repairedMerger = repairedMerger;
+            this.postLimitAdditionalPartitions = postLimitAdditionalPartitions;
         }
 
         void addMemtableIterator(T iter)
@@ -1038,15 +836,17 @@ public abstract class ReadCommand extends AbstractReadQuery
                 unrepairedIters.add(iter);
         }
 
+        @SuppressWarnings("resource") // the returned iterators are closed by the caller
         List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
         {
             if (repairedIters.isEmpty())
                 return unrepairedIters;
 
             // merge the repaired data before returning, wrapping in a digest generator
-            RepairedDataPurger purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
-            repairedDataInfo.setPurger(purger);
-            unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
+            repairedDataInfo.prepare(cfs, nowInSec, oldestUnrepairedTombstone);
+            T repairedIter = repairedMerger.apply(repairedIters, repairedDataInfo);
+            repairedDataInfo.finalize(postLimitAdditionalPartitions.apply(repairedIter));
+            unrepairedIters.add(repairedIter);
             return unrepairedIters;
         }
 
diff --git a/src/java/org/apache/cassandra/db/RepairedDataInfo.java b/src/java/org/apache/cassandra/db/RepairedDataInfo.java
new file mode 100644
index 0000000..be636d3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RepairedDataInfo.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.function.LongPredicate;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PurgeFunction;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class RepairedDataInfo
+{
+    public static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo(null)
+    {
+        boolean isConclusive(){ return true; }
+        ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
+    };
+
+    // Keeps a digest of the partition currently being processed. Since we won't know
+    // whether a partition will be fully purged from a read result until it's been
+    // consumed, we buffer this per-partition digest and add it to the final digest
+    // when the partition is closed (if it wasn't fully purged).
+    private Digest perPartitionDigest;
+    private Digest perCommandDigest;
+    private boolean isConclusive = true;
+    private ByteBuffer calculatedDigest = null;
+
+    // Doesn't actually purge from the underlying iterators, but excludes from the digest
+    // the purger can't be initialized until we've iterated all the sstables for the query
+    // as it requires the oldest repaired tombstone
+    private RepairedDataPurger purger;
+    private boolean isFullyPurged = true;
+
+    // Supplies additional partitions from the repaired data set to be consumed when the limit of
+    // executing ReadCommand has been reached. This is to ensure that each replica attempts to
+    // read the same amount of repaired data, otherwise comparisons of the repaired data digests
+    // may be invalidated by varying amounts of repaired data being present on each replica.
+    // This can't be initialized until after the underlying repaired iterators have been merged.
+    private UnfilteredPartitionIterator postLimitPartitions = null;
+    private final DataLimits.Counter repairedCounter;
+    private UnfilteredRowIterator currentPartition;
+    private TableMetrics metrics;
+
+    public RepairedDataInfo(DataLimits.Counter repairedCounter)
+    {
+        this.repairedCounter = repairedCounter;
+    }
+
+    ByteBuffer getDigest()
+    {
+        if (calculatedDigest != null)
+            return calculatedDigest;
+
+        calculatedDigest = perCommandDigest == null
+                           ? ByteBufferUtil.EMPTY_BYTE_BUFFER
+                           : ByteBuffer.wrap(perCommandDigest.digest());
+
+        return calculatedDigest;
+    }
+
+    void prepare(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
+    {
+        this.purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
+        this.metrics = cfs.metric;
+    }
+
+    void finalize(UnfilteredPartitionIterator postLimitPartitions)
+    {
+        this.postLimitPartitions = postLimitPartitions;
+    }
+
+    boolean isConclusive()
+    {
+        return isConclusive;
+    }
+
+    void markInconclusive()
+    {
+        isConclusive = false;
+    }
+
+    private void onNewPartition(UnfilteredRowIterator partition)
+    {
+        assert purger != null;
+        purger.setCurrentKey(partition.partitionKey());
+        purger.setIsReverseOrder(partition.isReverseOrder());
+        this.currentPartition = partition;
+    }
+
+    private Digest getPerPartitionDigest()
+    {
+        if (perPartitionDigest == null)
+            perPartitionDigest = Digest.forRepairedDataTracking();
+
+        return perPartitionDigest;
+    }
+
+    public UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator)
+    {
+        class WithTracking extends Transformation<UnfilteredRowIterator>
+        {
+            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+            {
+                return withRepairedDataInfo(partition);
+            }
+        }
+        return Transformation.apply(iterator, new WithTracking());
+    }
+
+    public UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator)
+    {
+        class WithTracking extends Transformation<UnfilteredRowIterator>
+        {
+            protected DecoratedKey applyToPartitionKey(DecoratedKey key)
+            {
+                getPerPartitionDigest().update(key.getKey());
+                return key;
+            }
+
+            protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+            {
+                if (repairedCounter.isDone())
+                    return deletionTime;
+
+                assert purger != null;
+                DeletionTime purged = purger.applyToDeletion(deletionTime);
+                if (!purged.isLive())
+                    isFullyPurged = false;
+                purged.digest(getPerPartitionDigest());
+                return deletionTime;
+            }
+
+            protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+            {
+                if (repairedCounter.isDone())
+                    return marker;
+
+                assert purger != null;
+                RangeTombstoneMarker purged = purger.applyToMarker(marker);
+                if (purged != null)
+                {
+                    isFullyPurged = false;
+                    purged.digest(getPerPartitionDigest());
+                }
+                return marker;
+            }
+
+            protected Row applyToStatic(Row row)
+            {
+                if (repairedCounter.isDone())
+                    return row;
+
+                assert purger != null;
+                Row purged = purger.applyToRow(row);
+                if (!purged.isEmpty())
+                {
+                    isFullyPurged = false;
+                    purged.digest(getPerPartitionDigest());
+                }
+                return row;
+            }
+
+            protected Row applyToRow(Row row)
+            {
+                if (repairedCounter.isDone())
+                    return row;
+
+                assert purger != null;
+                Row purged = purger.applyToRow(row);
+                if (purged != null)
+                {
+                    isFullyPurged = false;
+                    purged.digest(getPerPartitionDigest());
+                }
+                return row;
+            }
+
+            protected void onPartitionClose()
+            {
+                if (perPartitionDigest != null)
+                {
+                    // If the partition wasn't completely emptied by the purger,
+                    // calculate the digest for the partition and use it to
+                    // update the overall digest
+                    if (!isFullyPurged)
+                    {
+                        if (perCommandDigest == null)
+                            perCommandDigest = Digest.forRepairedDataTracking();
+
+                        byte[] partitionDigest = perPartitionDigest.digest();
+                        perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
+                    }
+
+                    perPartitionDigest = null;
+                }
+                isFullyPurged = true;
+            }
+        }
+
+        if (repairedCounter.isDone())
+            return iterator;
+
+        UnfilteredRowIterator tracked = repairedCounter.applyTo(Transformation.apply(iterator, new WithTracking()));
+        onNewPartition(tracked);
+        return tracked;
+    }
+
+    public UnfilteredPartitionIterator extend(final UnfilteredPartitionIterator partitions,
+                                              final DataLimits.Counter limit)
+    {
+        class OverreadRepairedData extends Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator>
+        {
+
+            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+            {
+                return MoreRows.extend(partition, this, partition.columns());
+            }
+
+            public UnfilteredRowIterator moreContents()
+            {
+                // We don't need to do anything until the DataLimits of the
+                // of the read have been reached
+                if (!limit.isDone() || repairedCounter.isDone())
+                    return null;
+
+                long countBeforeOverreads = repairedCounter.counted();
+                long overreadStartTime = System.nanoTime();
+                if (currentPartition != null)
+                    consumePartition(currentPartition, repairedCounter);
+
+                if (postLimitPartitions != null)
+                    while (postLimitPartitions.hasNext() && !repairedCounter.isDone())
+                        consumePartition(postLimitPartitions.next(), repairedCounter);
+
+                // we're not actually providing any more rows, just consuming the repaired data
+                long rows = repairedCounter.counted() - countBeforeOverreads;
+                long nanos = System.nanoTime() - overreadStartTime;
+                metrics.repairedDataTrackingOverreadRows.update(rows);
+                metrics.repairedDataTrackingOverreadTime.update(nanos, TimeUnit.NANOSECONDS);
+                Tracing.trace("Read {} additional rows of repaired data for tracking in {}ps", rows, TimeUnit.NANOSECONDS.toMicros(nanos));
+                return null;
+            }
+
+            private void consumePartition(UnfilteredRowIterator partition, DataLimits.Counter counter)
+            {
+                if (partition == null)
+                    return;
+
+                while (!counter.isDone() && partition.hasNext())
+                    partition.next();
+
+                partition.close();
+            }
+        }
+        // If the read didn't touch any sstables prepare() hasn't been called and
+        // we can skip this transformation
+        if (metrics == null || repairedCounter.isDone())
+            return partitions;
+        return Transformation.apply(partitions, new OverreadRepairedData());
+    }
+
+    /**
+     * Although PurgeFunction extends Transformation, this is never applied to an iterator.
+     * Instead, it is used by RepairedDataInfo during the generation of a repaired data
+     * digest to exclude data which will actually be purged later on in the read pipeline.
+     */
+    private static class RepairedDataPurger extends PurgeFunction
+    {
+        RepairedDataPurger(ColumnFamilyStore cfs,
+                           int nowInSec,
+                           int oldestUnrepairedTombstone)
+        {
+            super(nowInSec,
+                  cfs.gcBefore(nowInSec),
+                  oldestUnrepairedTombstone,
+                  cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
+                  cfs.metadata.get().enforceStrictLiveness());
+        }
+
+        protected LongPredicate getPurgeEvaluator()
+        {
+            return (time) -> true;
+        }
+
+        void setCurrentKey(DecoratedKey key)
+        {
+            super.onNewPartition(key);
+        }
+
+        void setIsReverseOrder(boolean isReverseOrder)
+        {
+            super.setReverseOrder(isReverseOrder);
+        }
+
+        public DeletionTime applyToDeletion(DeletionTime deletionTime)
+        {
+            return super.applyToDeletion(deletionTime);
+        }
+
+        public Row applyToRow(Row row)
+        {
+            return super.applyToRow(row);
+        }
+
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+        {
+            return super.applyToMarker(marker);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index ef3338e..9c45dc0 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -142,6 +142,15 @@ public class KeyspaceMetrics
      */
     public final Meter unconfirmedRepairedInconsistencies;
 
+    /**
+     * Tracks the amount overreading of repaired data replicas perform in order to produce digests
+     * at query time. For each query, on a full data read following an initial digest mismatch, the replicas
+     * may read extra repaired data, up to the DataLimit of the command, so that the coordinator can compare
+     * the repaired data on each replica. These are tracked on each replica.
+     */
+    public final Histogram repairedDataTrackingOverreadRows;
+    public final Timer repairedDataTrackingOverreadTime;
+
     public final MetricNameFactory factory;
     private Keyspace keyspace;
 
@@ -305,6 +314,9 @@ public class KeyspaceMetrics
 
         confirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesConfirmed"));
         unconfirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesUnconfirmed"));
+
+        repairedDataTrackingOverreadRows = Metrics.histogram(factory.createMetricName("RepairedOverreadRows"), false);
+        repairedDataTrackingOverreadTime = Metrics.timer(factory.createMetricName("RepairedOverreadTime"));
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 6095f50..775d87c 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -31,6 +31,11 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import com.codahale.metrics.Timer;
+
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Memtable;
@@ -52,9 +57,6 @@ import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.RatioGauge;
-import com.codahale.metrics.Timer;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 
 /**
  * Metrics for {@link ColumnFamilyStore}.
@@ -233,6 +235,13 @@ public class TableMetrics
     // be part of the repaired set whilst others do not.
     public final TableMeter unconfirmedRepairedInconsistencies;
 
+    // Tracks the amount overreading of repaired data replicas perform in order to produce digests
+    // at query time. For each query, on a full data read following an initial digest mismatch, the replicas
+    // may read extra repaired data, up to the DataLimit of the command, so that the coordinator can compare
+    // the repaired data on each replica. These are tracked on each replica.
+    public final TableHistogram repairedDataTrackingOverreadRows;
+    public final TableTimer repairedDataTrackingOverreadTime;
+
     public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -946,6 +955,9 @@ public class TableMetrics
         confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies);
         unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
 
+        repairedDataTrackingOverreadRows = createTableHistogram("RepairedDataTrackingOverreadRows", cfs.keyspace.metric.repairedDataTrackingOverreadRows, false);
+        repairedDataTrackingOverreadTime = createTableTimer("RepairedDataTrackingOverreadTime", cfs.keyspace.metric.repairedDataTrackingOverreadTime);
+
         unleveledSSTables = createTableGauge("UnleveledSSTables", cfs::getUnleveledSSTables, () -> {
             // global gauge
             int cnt = 0;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 664c99d..4b382a1 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -25,6 +25,8 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
@@ -45,6 +47,9 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.SnapshotVerbHandler;
 import org.apache.cassandra.service.StorageProxy;
 
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.junit.Assert.fail;
+
 public class RepairDigestTrackingTest extends TestBaseImpl
 {
     private static final String TABLE = "tbl";
@@ -76,7 +81,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
                                                i, i, i);
             }
             cluster.forEach(i -> i.flush(KEYSPACE));
-            cluster.forEach(i -> assertNotRepaired());
+            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
 
             // mark everything on node 2 repaired
             cluster.get(2).runOnInstance(markAllRepaired());
@@ -120,10 +125,10 @@ public class RepairDigestTrackingTest extends TestBaseImpl
             cluster.forEach(i -> i.flush(KEYSPACE));
 
             // nothing is repaired yet
-            cluster.forEach(i -> assertNotRepaired());
+            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
             // mark everything repaired
-            cluster.forEach(i -> markAllRepaired());
-            cluster.forEach(i -> assertRepaired());
+            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
 
             // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
             for (int i = 0; i < 10; i++)
@@ -198,6 +203,162 @@ public class RepairDigestTrackingTest extends TestBaseImpl
         }
     }
 
+    @Test
+    public void testRepairedReadCountNormalizationWithInitialUnderread() throws Throwable
+    {
+        // Asserts that the amount of repaired data read for digest generation is consistent
+        // across replicas where one has to read less repaired data to satisfy the original
+        // limits of the read request.
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+
+            cluster.get(1).runOnInstance(() -> {
+                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
+            });
+
+            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
+                                 "WITH CLUSTERING ORDER BY (c DESC)");
+
+            // insert data on both nodes and flush
+            for (int i=0; i<20; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
+                                               ConsistencyLevel.ALL, i, i);
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+            }
+            cluster.forEach(c -> c.flush(KEYSPACE));
+            // nothing is repaired yet
+            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
+            // mark everything repaired
+            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
+
+            // Add some unrepaired data to both nodes
+            for (int i=20; i<30; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+            }
+            // And some more unrepaired data to node2 only. This causes node2 to read less repaired data than node1
+            // when satisfying the limits of the read. So node2 needs to overread more repaired data than node1 when
+            // calculating the repaired data digest.
+            cluster.get(2).executeInternal("INSERT INTO "  + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", 30, 30);
+
+            // Verify single partition read
+            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=1 LIMIT 20", ConsistencyLevel.ALL),
+                       rows(1, 30, 11));
+            long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
+
+            // Recreate a mismatch in unrepaired data and verify partition range read
+            cluster.get(2).executeInternal("INSERT INTO "  + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?)", 31, 31);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 30", ConsistencyLevel.ALL),
+                       rows(1, 31, 2));
+            long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
+
+            if (ccAfterPartitionRead != ccAfterRangeRead)
+                if (ccAfterPartitionRead != ccBefore)
+                    fail("Both range and partition reads reported data inconsistencies but none were expected");
+                else
+                    fail("Reported inconsistency during range read but none were expected");
+            else if (ccAfterPartitionRead != ccBefore)
+                fail("Reported inconsistency during partition read but none were expected");
+        }
+    }
+
+    @Test
+    public void testRepairedReadCountNormalizationWithInitialOverread() throws Throwable
+    {
+        // Asserts that the amount of repaired data read for digest generation is consistent
+        // across replicas where one has to read more repaired data to satisfy the original
+        // limits of the read request.
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+
+            cluster.get(1).runOnInstance(() -> {
+                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
+            });
+
+            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
+                                 "WITH CLUSTERING ORDER BY (c DESC)");
+
+            // insert data on both nodes and flush
+            for (int i=0; i<10; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
+                                               ConsistencyLevel.ALL, i, i);
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+            }
+            cluster.forEach(c -> c.flush(KEYSPACE));
+            // nothing is repaired yet
+            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
+            // mark everything repaired
+            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
+
+            // Add some unrepaired data to both nodes
+            for (int i=10; i<13; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+            }
+            cluster.forEach(c -> c.flush(KEYSPACE));
+            // And some row deletions on node2 only which cover data in the repaired set
+            // This will cause node2 to read more repaired data in satisfying the limit of the read request
+            // so it should overread less than node1 (in fact, it should not overread at all) in order to
+            // calculate the repaired data digest.
+            for (int i=7; i<10; i++)
+            {
+                cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 0 AND c = ?", i);
+                cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 1 AND c = ?", i);
+            }
+
+            // Verify single partition read
+            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0 LIMIT 5", ConsistencyLevel.ALL),
+                       rows(rows(0, 12, 10), rows(0, 6, 5)));
+            long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
+
+            // Verify partition range read
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 11", ConsistencyLevel.ALL),
+                       rows(rows(1, 12, 10), rows(1, 6, 0), rows(0, 12, 12)));
+            long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
+
+            if (ccAfterPartitionRead != ccAfterRangeRead)
+                if (ccAfterPartitionRead != ccBefore)
+                    fail("Both range and partition reads reported data inconsistencies but none were expected");
+                else
+                    fail("Reported inconsistency during range read but none were expected");
+            else if (ccAfterPartitionRead != ccBefore)
+                fail("Reported inconsistency during partition read but none were expected");
+        }
+    }
+
+    private Object[][] rows(Object[][] head, Object[][]...tail)
+    {
+        return Stream.concat(Stream.of(head),
+                             Stream.of(tail).flatMap(Stream::of))
+                     .toArray(Object[][]::new);
+    }
+
+    private Object[][] rows(int partitionKey, int start, int end)
+    {
+        if (start == end)
+            return new Object[][] { new Object[] { partitionKey, start, end } };
+
+        IntStream clusterings = start > end
+                                ? IntStream.range(end -1, start).map(i -> start - i + end - 1)
+                                : IntStream.range(start, end);
+
+        return clusterings.mapToObj(i -> new Object[] {partitionKey, i, i}).toArray(Object[][]::new);
+    }
+
     private IIsolatedExecutor.SerializableRunnable assertNotRepaired()
     {
         return () ->
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index e0215b7..0824168 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -23,6 +23,8 @@ import java.io.OutputStream;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -41,6 +43,8 @@ import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ReversedType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
@@ -61,6 +65,7 @@ import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.metrics.ClearableHistogram;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
@@ -94,6 +99,7 @@ public class ReadCommandTest
     private static final String CF6 = "Standard6";
     private static final String CF7 = "Counter7";
     private static final String CF8 = "Standard8";
+    private static final String CF9 = "Standard9";
 
     private static final InetAddressAndPort REPAIR_COORDINATOR;
     static {
@@ -178,6 +184,12 @@ public class ReadCommandTest
                      .addRegularColumn("b", AsciiType.instance)
                      .addRegularColumn("c", SetType.getInstance(AsciiType.instance, true));
 
+        TableMetadata.Builder metadata9 =
+        TableMetadata.builder(KEYSPACE, CF9)
+                     .addPartitionKeyColumn("key", Int32Type.instance)
+                     .addClusteringColumn("col", ReversedType.getInstance(Int32Type.instance))
+                     .addRegularColumn("a", AsciiType.instance);
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
@@ -188,7 +200,8 @@ public class ReadCommandTest
                                     metadata5,
                                     metadata6,
                                     metadata7,
-                                    metadata8);
+                                    metadata8,
+                                    metadata9);
 
         LocalSessionAccessor.startup();
     }
@@ -823,14 +836,113 @@ public class ReadCommandTest
         }
     }
 
+    @Test
+    public void testRepairedDataOverreadMetrics()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF9);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        cfs.metadata().withSwapped(cfs.metadata().params.unbuild()
+                                                        .caching(CachingParams.CACHE_NOTHING)
+                                                        .build());
+        // Insert and repair
+        insert(cfs, IntStream.range(0, 10), () -> IntStream.range(0, 10));
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+        // Insert and leave unrepaired
+        insert(cfs, IntStream.range(0, 10), () -> IntStream.range(10, 20));
+
+        // Single partition reads
+        int limit = 5;
+        ReadCommand cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build();
+        assertEquals(0, getAndResetOverreadCount(cfs));
+
+        // No overreads if not tracking
+        readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+        assertEquals(0, getAndResetOverreadCount(cfs));
+
+        // Overread up to (limit - 1) if tracking is enabled
+        cmd = cmd.copy();
+        cmd.trackRepairedStatus();
+        readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+        // overread count is always < limit as the first read is counted during merging (and so is expected)
+        assertEquals(limit - 1, getAndResetOverreadCount(cfs));
+
+        // if limit already requires reading all repaired data, no overreads should be recorded
+        limit = 20;
+        cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build();
+        readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+        assertEquals(0, getAndResetOverreadCount(cfs));
+
+        // Range reads
+        limit = 5;
+        cmd = Util.cmd(cfs).withLimit(limit).build();
+        assertEquals(0, getAndResetOverreadCount(cfs));
+        // No overreads if not tracking
+        readAndCheckRowCount(Util.getAll(cmd), limit);
+        assertEquals(0, getAndResetOverreadCount(cfs));
+
+        // Overread up to (limit - 1) if tracking is enabled
+        cmd = cmd.copy();
+        cmd.trackRepairedStatus();
+        readAndCheckRowCount(Util.getAll(cmd), limit);
+        assertEquals(limit - 1, getAndResetOverreadCount(cfs));
+
+        // if limit already requires reading all repaired data, no overreads should be recorded
+        limit = 100;
+        cmd = Util.cmd(cfs).withLimit(limit).build();
+        readAndCheckRowCount(Util.getAll(cmd), limit);
+        assertEquals(0, getAndResetOverreadCount(cfs));
+    }
+
     private void setGCGrace(ColumnFamilyStore cfs, int gcGrace)
     {
         TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build();
         KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace);
         Schema.instance.load(
-            keyspaceMetadata.withSwapped(
-                keyspaceMetadata.tables.withSwapped(
-                    cfs.metadata().withSwapped(newParams))));
+        keyspaceMetadata.withSwapped(
+        keyspaceMetadata.tables.withSwapped(
+        cfs.metadata().withSwapped(newParams))));
+    }
+
+    private long getAndResetOverreadCount(ColumnFamilyStore cfs)
+    {
+        // always clear the histogram after reading to make comparisons & asserts easier
+        long rows = cfs.metric.repairedDataTrackingOverreadRows.cf.getSnapshot().getMax();
+        ((ClearableHistogram)cfs.metric.repairedDataTrackingOverreadRows.cf).clear();
+        return rows;
+    }
+
+    private void readAndCheckRowCount(Iterable<FilteredPartition> partitions, int expected)
+    {
+        int count = 0;
+        for (Partition partition : partitions)
+        {
+            assertFalse(partition.isEmpty());
+            try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+            {
+                while (iter.hasNext())
+                {
+                    iter.next();
+                    count++;
+                }
+            }
+        }
+        assertEquals(expected, count);
+    }
+
+    private void insert(ColumnFamilyStore cfs, IntStream partitionIds, Supplier<IntStream> rowIds)
+    {
+        partitionIds.mapToObj(ByteBufferUtil::bytes)
+                    .forEach( pk ->
+                        rowIds.get().forEach( c ->
+                            new RowUpdateBuilder(cfs.metadata(), 0, pk)
+                                .clustering(c)
+                                .add("a", ByteBufferUtil.bytes("abcd"))
+                                .build()
+                                .apply()
+
+                    ));
     }
 
     private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1)
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java b/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java
new file mode 100644
index 0000000..00a1f56
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.stream.IntStream;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Hex;
+
+import static org.apache.cassandra.Util.clustering;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.utils.ByteBufferUtil.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class RepairedDataInfoTest
+{
+    private static ColumnFamilyStore cfs;
+    private static TableMetadata metadata;
+    private static ColumnMetadata valueMetadata;
+    private static ColumnMetadata staticMetadata;
+
+    private final int nowInSec = FBUtilities.nowInSeconds();
+
+    @BeforeClass
+    public static void setUp()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        CommitLog.instance.start();
+        MockSchema.cleanup();
+        String ks = "repaired_data_info_test";
+        cfs = MockSchema.newCFS(ks, metadata -> metadata.addStaticColumn("s", UTF8Type.instance));
+        metadata = cfs.metadata();
+        valueMetadata = metadata.regularColumns().getSimple(0);
+        staticMetadata = metadata.staticColumns().getSimple(0);
+    }
+
+    @Test
+    public void withTrackingAppliesRepairedDataCounter()
+    {
+        DataLimits.Counter counter = DataLimits.cqlLimits(15).newCounter(nowInSec, false, false, false).onlyCount();
+        RepairedDataInfo info = new RepairedDataInfo(counter);
+        info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+        UnfilteredRowIterator[] partitions = new UnfilteredRowIterator[3];
+        for (int i=0; i<3; i++)
+            partitions[i] = partition(bytes(i), rows(0, 5, nowInSec));
+
+        UnfilteredPartitionIterator iter = partitions(partitions);
+        iter = info.withRepairedDataInfo(iter);
+        consume(iter);
+
+        assertEquals(15, counter.counted());
+        assertEquals(5, counter.countedInCurrentPartition());
+    }
+
+    @Test
+    public void digestOfSinglePartitionWithSingleRowAndEmptyStaticRow()
+    {
+        Digest manualDigest = Digest.forRepairedDataTracking();
+        Row[] rows = rows(0, 1, nowInSec);
+        UnfilteredRowIterator partition = partition(bytes(0), rows);
+        addToDigest(manualDigest,
+                    partition.partitionKey().getKey(),
+                    partition.partitionLevelDeletion(),
+                    Rows.EMPTY_STATIC_ROW,
+                    rows);
+        byte[] fromRepairedInfo = consume(partition);
+        assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+    }
+
+    @Test
+    public void digestOfSinglePartitionWithMultipleRowsAndEmptyStaticRow()
+    {
+        Digest manualDigest = Digest.forRepairedDataTracking();
+        Row[] rows = rows(0, 5, nowInSec);
+        UnfilteredRowIterator partition = partition(bytes(0), rows);
+        addToDigest(manualDigest,
+                    partition.partitionKey().getKey(),
+                    partition.partitionLevelDeletion(),
+                    Rows.EMPTY_STATIC_ROW,
+                    rows);
+        byte[] fromRepairedInfo = consume(partition);
+        assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+    }
+
+    @Test
+    public void digestOfSinglePartitionWithMultipleRowsAndTombstones()
+    {
+        Digest manualDigest = Digest.forRepairedDataTracking();
+        Unfiltered[] unfiltereds = new Unfiltered[]
+                                   {
+                                       open(0), close(0),
+                                       row(1, 1, nowInSec),
+                                       open(2), close(4),
+                                       row(5, 7, nowInSec)
+                                   };
+        UnfilteredRowIterator partition = partition(bytes(0), unfiltereds);
+        addToDigest(manualDigest,
+                    partition.partitionKey().getKey(),
+                    partition.partitionLevelDeletion(),
+                    Rows.EMPTY_STATIC_ROW,
+                    unfiltereds);
+        byte[] fromRepairedInfo = consume(partition);
+        assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+    }
+
+    @Test
+    public void digestOfMultiplePartitionsWithMultipleRowsAndNonEmptyStaticRows()
+    {
+        Digest manualDigest = Digest.forRepairedDataTracking();
+        Row staticRow = staticRow(nowInSec);
+        Row[] rows = rows(0, 5, nowInSec);
+        UnfilteredRowIterator[] partitionsArray = new UnfilteredRowIterator[5];
+        for (int i=0; i<5; i++)
+        {
+            UnfilteredRowIterator partition = partitionWithStaticRow(bytes(i), staticRow, rows);
+            partitionsArray[i] = partition;
+            addToDigest(manualDigest,
+                        partition.partitionKey().getKey(),
+                        partition.partitionLevelDeletion(),
+                        staticRow,
+                        rows);
+        }
+
+        UnfilteredPartitionIterator partitions = partitions(partitionsArray);
+        byte[] fromRepairedInfo = consume(partitions);
+        assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+    }
+
+    private RepairedDataInfo info()
+    {
+        return new RepairedDataInfo(DataLimits.NONE.newCounter(nowInSec, false, false, false));
+    }
+
+    private Digest addToDigest(Digest aggregate,
+                               ByteBuffer partitionKey,
+                               DeletionTime deletion,
+                               Row staticRow,
+                               Unfiltered...unfiltereds)
+    {
+        Digest perPartitionDigest = Digest.forRepairedDataTracking();
+        if (!staticRow.isEmpty())
+            staticRow.digest(perPartitionDigest);
+        perPartitionDigest.update(partitionKey);
+        deletion.digest(perPartitionDigest);
+        for (Unfiltered unfiltered : unfiltereds)
+            unfiltered.digest(perPartitionDigest);
+        byte[] rowDigestBytes = perPartitionDigest.digest();
+        aggregate.update(rowDigestBytes, 0, rowDigestBytes.length);
+        return aggregate;
+    }
+
+    private byte[] consume(UnfilteredPartitionIterator partitions)
+    {
+        RepairedDataInfo info = info();
+        info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+        partitions.forEachRemaining(partition ->
+        {
+            try (UnfilteredRowIterator iter = info.withRepairedDataInfo(partition))
+            {
+                iter.forEachRemaining(u -> {});
+            }
+        });
+        return getArray(info.getDigest());
+    }
+
+    private byte[] consume(UnfilteredRowIterator partition)
+    {
+        RepairedDataInfo info = info();
+        info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+        try (UnfilteredRowIterator iter = info.withRepairedDataInfo(partition))
+        {
+            iter.forEachRemaining(u -> {});
+        }
+        return getArray(info.getDigest());
+    }
+
+    public static Cell cell(ColumnMetadata def, Object value)
+    {
+        ByteBuffer bb = value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)def.type).decompose(value);
+        return new BufferCell(def, 1L, BufferCell.NO_TTL, BufferCell.NO_DELETION_TIME, bb, null);
+    }
+
+    private Row staticRow(int nowInSec)
+    {
+        Row.Builder builder = BTreeRow.unsortedBuilder();
+        builder.newRow(Clustering.STATIC_CLUSTERING);
+        builder.addCell(cell(staticMetadata, "static value"));
+        return builder.build();
+    }
+
+    private Row row(int clustering, int value, int nowInSec)
+    {
+        Row.Builder builder = BTreeRow.unsortedBuilder();
+        builder.newRow(clustering(metadata.comparator, Integer.toString(clustering)));
+        builder.addCell(cell(valueMetadata, Integer.toString(value)));
+        return builder.build();
+    }
+
+    private Row[] rows(int clusteringStart, int clusteringEnd, int nowInSec)
+    {
+        return IntStream.range(clusteringStart, clusteringEnd)
+                        .mapToObj(v -> row(v, v, nowInSec))
+                        .toArray(Row[]::new);
+    }
+
+    private RangeTombstoneBoundMarker open(int start)
+    {
+        return new RangeTombstoneBoundMarker(
+            ClusteringBound.create(ClusteringBound.boundKind(true, true),
+                                   new ByteBuffer[] { Clustering.make(Int32Type.instance.decompose(start)).get(0)}),
+            new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
+    }
+
+    private RangeTombstoneBoundMarker close(int close)
+    {
+        return new RangeTombstoneBoundMarker(
+            ClusteringBound.create(ClusteringBound.boundKind(false, true),
+                                   new ByteBuffer[] { Clustering.make(Int32Type.instance.decompose(close)).get(0)}),
+            new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
+    }
+
+    private UnfilteredRowIterator partition(ByteBuffer pk, Unfiltered... unfiltereds)
+    {
+        return partitionWithStaticRow(pk, Rows.EMPTY_STATIC_ROW, unfiltereds);
+    }
+
+    private UnfilteredRowIterator partitionWithStaticRow(ByteBuffer pk, Row staticRow, Unfiltered... unfiltereds)
+    {
+        Iterator<Unfiltered> unfilteredIterator = Arrays.asList(unfiltereds).iterator();
+        return new AbstractUnfilteredRowIterator(metadata, dk(pk), DeletionTime.LIVE, metadata.regularAndStaticColumns(), staticRow, false, EncodingStats.NO_STATS) {
+            protected Unfiltered computeNext()
+            {
+                return unfilteredIterator.hasNext() ? unfilteredIterator.next() : endOfData();
+            }
+        };
+    }
+
+    private static UnfilteredPartitionIterator partitions(UnfilteredRowIterator...partitions)
+    {
+        Iterator<UnfilteredRowIterator> partitionsIter = Arrays.asList(partitions).iterator();
+        return new AbstractUnfilteredPartitionIterator()
+        {
+            public TableMetadata metadata()
+            {
+                return metadata;
+            }
+
+            public boolean hasNext()
+            {
+                return partitionsIter.hasNext();
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                return partitionsIter.next();
+            }
+        };
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org