You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/04/16 17:44:51 UTC

[cassandra] branch trunk updated (781e486 -> a8e7cfb)

This is an automated email from the ASF dual-hosted git repository.

samt pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 781e486  Fix CQLSH to avoid arguments being evaluated
     new 092915a  Don't skip sstables with partition deletes
     new d6beb01  Merge branch 'cassandra-3.0' into cassandra-3.11
     new b3ecbf3  Merge branch 'cassandra-3.11' into trunk
     new a8e7cfb  Ensure repaired data tracking reads a consistent amount of data across replicas

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   2 +
 src/java/org/apache/cassandra/config/Config.java   |   6 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 +
 src/java/org/apache/cassandra/db/ReadCommand.java  | 310 +++------------
 .../org/apache/cassandra/db/RepairedDataInfo.java  | 336 ++++++++++++++++
 .../cassandra/db/SinglePartitionReadCommand.java   |  83 ++--
 .../apache/cassandra/metrics/KeyspaceMetrics.java  |  12 +
 .../org/apache/cassandra/metrics/TableMetrics.java |  18 +-
 .../apache/cassandra/repair/RepairRunnable.java    |  76 +++-
 .../cassandra/service/SnapshotVerbHandler.java     |  67 ++++
 .../org/apache/cassandra/service/StorageProxy.java |  18 +
 .../cassandra/service/StorageProxyMBean.java       |   4 +
 .../cassandra/service/reads/DataResolver.java      |   2 +-
 .../service/reads/repair/RepairedDataVerifier.java |  80 +++-
 .../distributed/test/PreviewRepairTest.java        | 106 ++++-
 .../distributed/test/RepairDigestTrackingTest.java | 440 +++++++++++++++++----
 .../distributed/test/SimpleReadWriteTest.java      | 101 +++++
 .../org/apache/cassandra/db/ReadCommandTest.java   | 120 +++++-
 .../apache/cassandra/db/RepairedDataInfoTest.java  | 303 ++++++++++++++
 .../reads/repair/RepairedDataVerifierTest.java     |   4 +-
 20 files changed, 1693 insertions(+), 405 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/db/RepairedDataInfo.java
 create mode 100644 test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Ensure repaired data tracking reads a consistent amount of data across replicas

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a8e7cfbc0e146ea82154654ba43b613b058f99d1
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Feb 11 09:59:31 2020 +0000

    Ensure repaired data tracking reads a consistent amount of data across replicas
    
    Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-15601
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/ReadCommand.java  | 310 ++++---------------
 .../org/apache/cassandra/db/RepairedDataInfo.java  | 336 +++++++++++++++++++++
 .../apache/cassandra/metrics/KeyspaceMetrics.java  |  12 +
 .../org/apache/cassandra/metrics/TableMetrics.java |  18 +-
 .../distributed/test/RepairDigestTrackingTest.java | 169 ++++++++++-
 .../org/apache/cassandra/db/ReadCommandTest.java   | 120 +++++++-
 .../apache/cassandra/db/RepairedDataInfoTest.java  | 303 +++++++++++++++++++
 8 files changed, 1003 insertions(+), 266 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 96eeed4..4586c71 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601)
  * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
  * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
  * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 4f8ea3e..4c4c833 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.LongPredicate;
+import java.util.function.Function;
 
 import javax.annotation.Nullable;
 
@@ -62,12 +63,12 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
 
 /**
  * General interface for storage-engine read commands (common to both range and
@@ -91,17 +92,7 @@ public abstract class ReadCommand extends AbstractReadQuery
     // for data queries, coordinators may request information on the repaired data used in constructing the response
     private boolean trackRepairedStatus = false;
     // tracker for repaired data, initialized to singleton null object
-    private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
-    {
-        void trackPartitionKey(DecoratedKey key){}
-        void trackDeletion(DeletionTime deletion){}
-        void trackRangeTombstoneMarker(RangeTombstoneMarker marker){}
-        void trackRow(Row row){}
-        boolean isConclusive(){ return true; }
-        ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
-    };
-
-    private RepairedDataInfo repairedDataInfo = NULL_REPAIRED_DATA_INFO;
+    private RepairedDataInfo repairedDataInfo = RepairedDataInfo.NULL_REPAIRED_DATA_INFO;
 
     int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 
@@ -450,7 +441,13 @@ public abstract class ReadCommand extends AbstractReadQuery
         }
 
         if (isTrackingRepairedStatus())
-            repairedDataInfo = new RepairedDataInfo();
+        {
+            final DataLimits.Counter repairedReadCount = limits().newCounter(nowInSec(),
+                                                                             false,
+                                                                             selectsFullPartition(),
+                                                                             metadata().enforceStrictLiveness()).onlyCount();
+            repairedDataInfo = new RepairedDataInfo(repairedReadCount);
+        }
 
         UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
         iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
@@ -475,7 +472,22 @@ public abstract class ReadCommand extends AbstractReadQuery
 
             // apply the limits/row counter; this transformation is stopping and would close the iterator as soon
             // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
-            iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+            // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
+            // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
+            if (isTrackingRepairedStatus())
+            {
+                DataLimits.Counter limit =
+                    limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness());
+                iterator = limit.applyTo(iterator);
+                // ensure that a consistent amount of repaired data is read on each replica. This causes silent
+                // overreading from the repaired data set, up to limits(). The extra data is not visible to
+                // the caller, only iterated to produce the repaired data digest.
+                iterator = repairedDataInfo.extend(iterator, limit);
+            }
+            else
+            {
+                iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+            }
 
             // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
             return RTBoundCloser.close(iterator);
@@ -723,254 +735,37 @@ public abstract class ReadCommand extends AbstractReadQuery
         return toCQLString();
     }
 
-    private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
-                                                                    final RepairedDataInfo repairedDataInfo)
-    {
-        class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
-        {
-            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
-            {
-                return withRepairedDataInfo(partition, repairedDataInfo);
-            }
-        }
-
-        return Transformation.apply(iterator, new WithRepairedDataTracking());
-    }
-
-    private static UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator,
-                                                              final RepairedDataInfo repairedDataInfo)
-    {
-        class WithTracking extends Transformation
-        {
-            protected DecoratedKey applyToPartitionKey(DecoratedKey key)
-            {
-                repairedDataInfo.onNewPartition(iterator);
-                repairedDataInfo.trackPartitionKey(key);
-                return key;
-            }
-
-            protected DeletionTime applyToDeletion(DeletionTime deletionTime)
-            {
-                repairedDataInfo.trackDeletion(deletionTime);
-                return deletionTime;
-            }
-
-            protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
-            {
-                repairedDataInfo.trackRangeTombstoneMarker(marker);
-                return marker;
-            }
-
-            protected Row applyToStatic(Row row)
-            {
-                repairedDataInfo.trackStaticRow(row);
-                return row;
-            }
-
-            protected Row applyToRow(Row row)
-            {
-                repairedDataInfo.trackRow(row);
-                return row;
-            }
-
-            protected void onPartitionClose()
-            {
-                repairedDataInfo.onPartitionClose();
-            }
-        }
-        return Transformation.apply(iterator, new WithTracking());
-    }
-
-    private static class RepairedDataInfo
-    {
-        // Keeps a digest of the partition currently being processed. Since we won't know
-        // whether a partition will be fully purged from a read result until it's been
-        // consumed, we buffer this per-partition digest and add it to the final digest
-        // when the partition is closed (if it wasn't fully purged).
-        private Digest perPartitionDigest;
-        private Digest perCommandDigest;
-        private boolean isConclusive = true;
-
-        // Doesn't actually purge from the underlying iterators, but excludes from the digest
-        // the purger can't be initialized until we've iterated all the sstables for the query
-        // as it requires the oldest repaired tombstone
-        private RepairedDataPurger purger;
-        private boolean isFullyPurged = true;
-
-        ByteBuffer getDigest()
-        {
-            return perCommandDigest == null
-                   ? ByteBufferUtil.EMPTY_BYTE_BUFFER
-                   : ByteBuffer.wrap(perCommandDigest.digest());
-        }
-
-        protected void onNewPartition(UnfilteredRowIterator partition)
-        {
-            assert purger != null;
-            purger.setCurrentKey(partition.partitionKey());
-            purger.setIsReverseOrder(partition.isReverseOrder());
-        }
-
-        protected void setPurger(RepairedDataPurger purger)
-        {
-            this.purger = purger;
-        }
-
-        boolean isConclusive()
-        {
-            return isConclusive;
-        }
-
-        void markInconclusive()
-        {
-            isConclusive = false;
-        }
-
-        void trackPartitionKey(DecoratedKey key)
-        {
-            getPerPartitionDigest().update(key.getKey());
-        }
-
-        void trackDeletion(DeletionTime deletion)
-        {
-            assert purger != null;
-            DeletionTime purged = purger.applyToDeletion(deletion);
-            if (!purged.isLive())
-                isFullyPurged = false;
-
-            purged.digest(getPerPartitionDigest());
-        }
-
-        void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
-        {
-            assert purger != null;
-            RangeTombstoneMarker purged = purger.applyToMarker(marker);
-            if (purged != null)
-            {
-                isFullyPurged = false;
-                purged.digest(getPerPartitionDigest());
-            }
-        }
-
-        void trackStaticRow(Row row)
-        {
-            assert purger != null;
-            Row purged = purger.applyToRow(row);
-            if (!purged.isEmpty())
-            {
-                isFullyPurged = false;
-                purged.digest(getPerPartitionDigest());
-            }
-        }
-
-        void trackRow(Row row)
-        {
-            assert purger != null;
-            Row purged = purger.applyToRow(row);
-            if (purged != null)
-            {
-                isFullyPurged = false;
-                purged.digest(getPerPartitionDigest());
-            }
-        }
-
-        private Digest getPerPartitionDigest()
-        {
-            if (perPartitionDigest == null)
-                perPartitionDigest = Digest.forRepairedDataTracking();
-
-            return perPartitionDigest;
-        }
-
-        private void onPartitionClose()
-        {
-            if (perPartitionDigest != null)
-            {
-                // If the partition wasn't completely emptied by the purger,
-                // calculate the digest for the partition and use it to
-                // update the overall digest
-                if (!isFullyPurged)
-                {
-                    if (perCommandDigest == null)
-                        perCommandDigest = Digest.forRepairedDataTracking();
-
-                    byte[] partitionDigest = perPartitionDigest.digest();
-                    perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
-                    isFullyPurged = true;
-                }
-
-                perPartitionDigest = null;
-            }
-        }
-    }
-
-    /**
-     * Although PurgeFunction extends Transformation, this is never applied to an iterator.
-     * Instead, it is used by RepairedDataInfo during the generation of a repaired data
-     * digest to exclude data which will actually be purged later on in the read pipeline.
-     */
-    private static class RepairedDataPurger extends PurgeFunction
-    {
-        RepairedDataPurger(ColumnFamilyStore cfs,
-                           int nowInSec,
-                           int oldestUnrepairedTombstone)
-        {
-            super(nowInSec,
-                  cfs.gcBefore(nowInSec),
-                  oldestUnrepairedTombstone,
-                  cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
-                  cfs.metadata.get().enforceStrictLiveness());
-        }
-
-        protected LongPredicate getPurgeEvaluator()
-        {
-            return (time) -> true;
-        }
-
-        void setCurrentKey(DecoratedKey key)
-        {
-            super.onNewPartition(key);
-        }
-
-        void setIsReverseOrder(boolean isReverseOrder)
-        {
-            super.setReverseOrder(isReverseOrder);
-        }
-
-        public DeletionTime applyToDeletion(DeletionTime deletionTime)
-        {
-            return super.applyToDeletion(deletionTime);
-        }
-
-        public Row applyToRow(Row row)
-        {
-            return super.applyToRow(row);
-        }
-
-        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
-        {
-            return super.applyToMarker(marker);
-        }
-    }
-
     @SuppressWarnings("resource") // resultant iterators are closed by their callers
     InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view)
     {
-        BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
-            (unfilteredRowIterators, repairedDataInfo) ->
-                withRepairedDataInfo(UnfilteredRowIterators.merge(unfilteredRowIterators), repairedDataInfo);
+        final BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
+            (unfilteredRowIterators, repairedDataInfo) -> {
+                UnfilteredRowIterator repaired = UnfilteredRowIterators.merge(unfilteredRowIterators);
+                return repairedDataInfo.withRepairedDataInfo(repaired);
+            };
 
-        return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+        // For single partition reads, after reading up to the command's DataLimit nothing extra is required.
+        // The merged & repaired row iterator will be consumed until it's exhausted or the RepairedDataInfo's
+        // internal counter is satisfied
+        final Function<UnfilteredRowIterator, UnfilteredPartitionIterator> postLimitPartitions =
+            (rows) -> EmptyIterators.unfilteredPartition(metadata());
+        return new InputCollector<>(view, repairedDataInfo, merge, postLimitPartitions, isTrackingRepairedStatus());
     }
 
     @SuppressWarnings("resource") // resultant iterators are closed by their callers
     InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view)
     {
-        BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
-            (unfilteredPartitionIterators, repairedDataInfo) ->
-                withRepairedDataInfo(UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, UnfilteredPartitionIterators.MergeListener.NOOP), repairedDataInfo);
+        final BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
+            (unfilteredPartitionIterators, repairedDataInfo) -> {
+                UnfilteredPartitionIterator repaired = UnfilteredPartitionIterators.merge(unfilteredPartitionIterators,
+                                                                                          NOOP);
+                return repairedDataInfo.withRepairedDataInfo(repaired);
+            };
 
-        return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+        // Uses identity function to provide additional partitions to be consumed after the command's
+        // DataLimits are satisfied. The input to the function will be the iterator of merged, repaired partitions
+        // which we'll keep reading until the RepairedDataInfo's internal counter is satisfied.
+        return new InputCollector<>(view, repairedDataInfo, merge, Function.identity(), isTrackingRepairedStatus());
     }
 
     /**
@@ -988,12 +783,14 @@ public abstract class ReadCommand extends AbstractReadQuery
         private final boolean isTrackingRepairedStatus;
         Set<SSTableReader> repairedSSTables;
         BiFunction<List<T>, RepairedDataInfo, T> repairedMerger;
+        Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions;
         List<T> repairedIters;
         List<T> unrepairedIters;
 
         InputCollector(ColumnFamilyStore.ViewFragment view,
                        RepairedDataInfo repairedDataInfo,
                        BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
+                       Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions,
                        boolean isTrackingRepairedStatus)
         {
             this.repairedDataInfo = repairedDataInfo;
@@ -1023,6 +820,7 @@ public abstract class ReadCommand extends AbstractReadQuery
                 unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
             }
             this.repairedMerger = repairedMerger;
+            this.postLimitAdditionalPartitions = postLimitAdditionalPartitions;
         }
 
         void addMemtableIterator(T iter)
@@ -1038,15 +836,17 @@ public abstract class ReadCommand extends AbstractReadQuery
                 unrepairedIters.add(iter);
         }
 
+        @SuppressWarnings("resource") // the returned iterators are closed by the caller
         List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
         {
             if (repairedIters.isEmpty())
                 return unrepairedIters;
 
             // merge the repaired data before returning, wrapping in a digest generator
-            RepairedDataPurger purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
-            repairedDataInfo.setPurger(purger);
-            unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
+            repairedDataInfo.prepare(cfs, nowInSec, oldestUnrepairedTombstone);
+            T repairedIter = repairedMerger.apply(repairedIters, repairedDataInfo);
+            repairedDataInfo.finalize(postLimitAdditionalPartitions.apply(repairedIter));
+            unrepairedIters.add(repairedIter);
             return unrepairedIters;
         }
 
diff --git a/src/java/org/apache/cassandra/db/RepairedDataInfo.java b/src/java/org/apache/cassandra/db/RepairedDataInfo.java
new file mode 100644
index 0000000..be636d3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RepairedDataInfo.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.function.LongPredicate;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PurgeFunction;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class RepairedDataInfo
+{
+    public static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo(null)
+    {
+        boolean isConclusive(){ return true; }
+        ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
+    };
+
+    // Keeps a digest of the partition currently being processed. Since we won't know
+    // whether a partition will be fully purged from a read result until it's been
+    // consumed, we buffer this per-partition digest and add it to the final digest
+    // when the partition is closed (if it wasn't fully purged).
+    private Digest perPartitionDigest;
+    private Digest perCommandDigest;
+    private boolean isConclusive = true;
+    private ByteBuffer calculatedDigest = null;
+
+    // Doesn't actually purge from the underlying iterators, but excludes from the digest
+    // the purger can't be initialized until we've iterated all the sstables for the query
+    // as it requires the oldest repaired tombstone
+    private RepairedDataPurger purger;
+    private boolean isFullyPurged = true;
+
+    // Supplies additional partitions from the repaired data set to be consumed when the limit of
+    // executing ReadCommand has been reached. This is to ensure that each replica attempts to
+    // read the same amount of repaired data, otherwise comparisons of the repaired data digests
+    // may be invalidated by varying amounts of repaired data being present on each replica.
+    // This can't be initialized until after the underlying repaired iterators have been merged.
+    private UnfilteredPartitionIterator postLimitPartitions = null;
+    private final DataLimits.Counter repairedCounter;
+    private UnfilteredRowIterator currentPartition;
+    private TableMetrics metrics;
+
+    public RepairedDataInfo(DataLimits.Counter repairedCounter)
+    {
+        this.repairedCounter = repairedCounter;
+    }
+
+    ByteBuffer getDigest()
+    {
+        if (calculatedDigest != null)
+            return calculatedDigest;
+
+        calculatedDigest = perCommandDigest == null
+                           ? ByteBufferUtil.EMPTY_BYTE_BUFFER
+                           : ByteBuffer.wrap(perCommandDigest.digest());
+
+        return calculatedDigest;
+    }
+
+    void prepare(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
+    {
+        this.purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
+        this.metrics = cfs.metric;
+    }
+
+    void finalize(UnfilteredPartitionIterator postLimitPartitions)
+    {
+        this.postLimitPartitions = postLimitPartitions;
+    }
+
+    boolean isConclusive()
+    {
+        return isConclusive;
+    }
+
+    void markInconclusive()
+    {
+        isConclusive = false;
+    }
+
+    private void onNewPartition(UnfilteredRowIterator partition)
+    {
+        assert purger != null;
+        purger.setCurrentKey(partition.partitionKey());
+        purger.setIsReverseOrder(partition.isReverseOrder());
+        this.currentPartition = partition;
+    }
+
+    private Digest getPerPartitionDigest()
+    {
+        if (perPartitionDigest == null)
+            perPartitionDigest = Digest.forRepairedDataTracking();
+
+        return perPartitionDigest;
+    }
+
+    public UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator)
+    {
+        class WithTracking extends Transformation<UnfilteredRowIterator>
+        {
+            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+            {
+                return withRepairedDataInfo(partition);
+            }
+        }
+        return Transformation.apply(iterator, new WithTracking());
+    }
+
+    public UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator)
+    {
+        class WithTracking extends Transformation<UnfilteredRowIterator>
+        {
+            protected DecoratedKey applyToPartitionKey(DecoratedKey key)
+            {
+                getPerPartitionDigest().update(key.getKey());
+                return key;
+            }
+
+            protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+            {
+                if (repairedCounter.isDone())
+                    return deletionTime;
+
+                assert purger != null;
+                DeletionTime purged = purger.applyToDeletion(deletionTime);
+                if (!purged.isLive())
+                    isFullyPurged = false;
+                purged.digest(getPerPartitionDigest());
+                return deletionTime;
+            }
+
+            protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+            {
+                if (repairedCounter.isDone())
+                    return marker;
+
+                assert purger != null;
+                RangeTombstoneMarker purged = purger.applyToMarker(marker);
+                if (purged != null)
+                {
+                    isFullyPurged = false;
+                    purged.digest(getPerPartitionDigest());
+                }
+                return marker;
+            }
+
+            protected Row applyToStatic(Row row)
+            {
+                if (repairedCounter.isDone())
+                    return row;
+
+                assert purger != null;
+                Row purged = purger.applyToRow(row);
+                if (!purged.isEmpty())
+                {
+                    isFullyPurged = false;
+                    purged.digest(getPerPartitionDigest());
+                }
+                return row;
+            }
+
+            protected Row applyToRow(Row row)
+            {
+                if (repairedCounter.isDone())
+                    return row;
+
+                assert purger != null;
+                Row purged = purger.applyToRow(row);
+                if (purged != null)
+                {
+                    isFullyPurged = false;
+                    purged.digest(getPerPartitionDigest());
+                }
+                return row;
+            }
+
+            protected void onPartitionClose()
+            {
+                if (perPartitionDigest != null)
+                {
+                    // If the partition wasn't completely emptied by the purger,
+                    // calculate the digest for the partition and use it to
+                    // update the overall digest
+                    if (!isFullyPurged)
+                    {
+                        if (perCommandDigest == null)
+                            perCommandDigest = Digest.forRepairedDataTracking();
+
+                        byte[] partitionDigest = perPartitionDigest.digest();
+                        perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
+                    }
+
+                    perPartitionDigest = null;
+                }
+                isFullyPurged = true;
+            }
+        }
+
+        if (repairedCounter.isDone())
+            return iterator;
+
+        UnfilteredRowIterator tracked = repairedCounter.applyTo(Transformation.apply(iterator, new WithTracking()));
+        onNewPartition(tracked);
+        return tracked;
+    }
+
+    public UnfilteredPartitionIterator extend(final UnfilteredPartitionIterator partitions,
+                                              final DataLimits.Counter limit)
+    {
+        class OverreadRepairedData extends Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator>
+        {
+
+            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+            {
+                return MoreRows.extend(partition, this, partition.columns());
+            }
+
+            public UnfilteredRowIterator moreContents()
+            {
+                // We don't need to do anything until the DataLimits of the
+                // of the read have been reached
+                if (!limit.isDone() || repairedCounter.isDone())
+                    return null;
+
+                long countBeforeOverreads = repairedCounter.counted();
+                long overreadStartTime = System.nanoTime();
+                if (currentPartition != null)
+                    consumePartition(currentPartition, repairedCounter);
+
+                if (postLimitPartitions != null)
+                    while (postLimitPartitions.hasNext() && !repairedCounter.isDone())
+                        consumePartition(postLimitPartitions.next(), repairedCounter);
+
+                // we're not actually providing any more rows, just consuming the repaired data
+                long rows = repairedCounter.counted() - countBeforeOverreads;
+                long nanos = System.nanoTime() - overreadStartTime;
+                metrics.repairedDataTrackingOverreadRows.update(rows);
+                metrics.repairedDataTrackingOverreadTime.update(nanos, TimeUnit.NANOSECONDS);
+                Tracing.trace("Read {} additional rows of repaired data for tracking in {}ps", rows, TimeUnit.NANOSECONDS.toMicros(nanos));
+                return null;
+            }
+
+            private void consumePartition(UnfilteredRowIterator partition, DataLimits.Counter counter)
+            {
+                if (partition == null)
+                    return;
+
+                while (!counter.isDone() && partition.hasNext())
+                    partition.next();
+
+                partition.close();
+            }
+        }
+        // If the read didn't touch any sstables prepare() hasn't been called and
+        // we can skip this transformation
+        if (metrics == null || repairedCounter.isDone())
+            return partitions;
+        return Transformation.apply(partitions, new OverreadRepairedData());
+    }
+
+    /**
+     * Although PurgeFunction extends Transformation, this is never applied to an iterator.
+     * Instead, it is used by RepairedDataInfo during the generation of a repaired data
+     * digest to exclude data which will actually be purged later on in the read pipeline.
+     */
+    private static class RepairedDataPurger extends PurgeFunction
+    {
+        RepairedDataPurger(ColumnFamilyStore cfs,
+                           int nowInSec,
+                           int oldestUnrepairedTombstone)
+        {
+            super(nowInSec,
+                  cfs.gcBefore(nowInSec),
+                  oldestUnrepairedTombstone,
+                  cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
+                  cfs.metadata.get().enforceStrictLiveness());
+        }
+
+        protected LongPredicate getPurgeEvaluator()
+        {
+            return (time) -> true;
+        }
+
+        void setCurrentKey(DecoratedKey key)
+        {
+            super.onNewPartition(key);
+        }
+
+        void setIsReverseOrder(boolean isReverseOrder)
+        {
+            super.setReverseOrder(isReverseOrder);
+        }
+
+        public DeletionTime applyToDeletion(DeletionTime deletionTime)
+        {
+            return super.applyToDeletion(deletionTime);
+        }
+
+        public Row applyToRow(Row row)
+        {
+            return super.applyToRow(row);
+        }
+
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+        {
+            return super.applyToMarker(marker);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index ef3338e..9c45dc0 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -142,6 +142,15 @@ public class KeyspaceMetrics
      */
     public final Meter unconfirmedRepairedInconsistencies;
 
+    /**
+     * Tracks the amount overreading of repaired data replicas perform in order to produce digests
+     * at query time. For each query, on a full data read following an initial digest mismatch, the replicas
+     * may read extra repaired data, up to the DataLimit of the command, so that the coordinator can compare
+     * the repaired data on each replica. These are tracked on each replica.
+     */
+    public final Histogram repairedDataTrackingOverreadRows;
+    public final Timer repairedDataTrackingOverreadTime;
+
     public final MetricNameFactory factory;
     private Keyspace keyspace;
 
@@ -305,6 +314,9 @@ public class KeyspaceMetrics
 
         confirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesConfirmed"));
         unconfirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesUnconfirmed"));
+
+        repairedDataTrackingOverreadRows = Metrics.histogram(factory.createMetricName("RepairedOverreadRows"), false);
+        repairedDataTrackingOverreadTime = Metrics.timer(factory.createMetricName("RepairedOverreadTime"));
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 6095f50..775d87c 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -31,6 +31,11 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import com.codahale.metrics.Timer;
+
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Memtable;
@@ -52,9 +57,6 @@ import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.RatioGauge;
-import com.codahale.metrics.Timer;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 
 /**
  * Metrics for {@link ColumnFamilyStore}.
@@ -233,6 +235,13 @@ public class TableMetrics
     // be part of the repaired set whilst others do not.
     public final TableMeter unconfirmedRepairedInconsistencies;
 
+    // Tracks the amount overreading of repaired data replicas perform in order to produce digests
+    // at query time. For each query, on a full data read following an initial digest mismatch, the replicas
+    // may read extra repaired data, up to the DataLimit of the command, so that the coordinator can compare
+    // the repaired data on each replica. These are tracked on each replica.
+    public final TableHistogram repairedDataTrackingOverreadRows;
+    public final TableTimer repairedDataTrackingOverreadTime;
+
     public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -946,6 +955,9 @@ public class TableMetrics
         confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies);
         unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
 
+        repairedDataTrackingOverreadRows = createTableHistogram("RepairedDataTrackingOverreadRows", cfs.keyspace.metric.repairedDataTrackingOverreadRows, false);
+        repairedDataTrackingOverreadTime = createTableTimer("RepairedDataTrackingOverreadTime", cfs.keyspace.metric.repairedDataTrackingOverreadTime);
+
         unleveledSSTables = createTableGauge("UnleveledSSTables", cfs::getUnleveledSSTables, () -> {
             // global gauge
             int cnt = 0;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 664c99d..4b382a1 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -25,6 +25,8 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
@@ -45,6 +47,9 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.SnapshotVerbHandler;
 import org.apache.cassandra.service.StorageProxy;
 
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.junit.Assert.fail;
+
 public class RepairDigestTrackingTest extends TestBaseImpl
 {
     private static final String TABLE = "tbl";
@@ -76,7 +81,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
                                                i, i, i);
             }
             cluster.forEach(i -> i.flush(KEYSPACE));
-            cluster.forEach(i -> assertNotRepaired());
+            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
 
             // mark everything on node 2 repaired
             cluster.get(2).runOnInstance(markAllRepaired());
@@ -120,10 +125,10 @@ public class RepairDigestTrackingTest extends TestBaseImpl
             cluster.forEach(i -> i.flush(KEYSPACE));
 
             // nothing is repaired yet
-            cluster.forEach(i -> assertNotRepaired());
+            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
             // mark everything repaired
-            cluster.forEach(i -> markAllRepaired());
-            cluster.forEach(i -> assertRepaired());
+            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
 
             // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
             for (int i = 0; i < 10; i++)
@@ -198,6 +203,162 @@ public class RepairDigestTrackingTest extends TestBaseImpl
         }
     }
 
+    @Test
+    public void testRepairedReadCountNormalizationWithInitialUnderread() throws Throwable
+    {
+        // Asserts that the amount of repaired data read for digest generation is consistent
+        // across replicas where one has to read less repaired data to satisfy the original
+        // limits of the read request.
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+
+            cluster.get(1).runOnInstance(() -> {
+                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
+            });
+
+            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
+                                 "WITH CLUSTERING ORDER BY (c DESC)");
+
+            // insert data on both nodes and flush
+            for (int i=0; i<20; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
+                                               ConsistencyLevel.ALL, i, i);
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+            }
+            cluster.forEach(c -> c.flush(KEYSPACE));
+            // nothing is repaired yet
+            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
+            // mark everything repaired
+            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
+
+            // Add some unrepaired data to both nodes
+            for (int i=20; i<30; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+            }
+            // And some more unrepaired data to node2 only. This causes node2 to read less repaired data than node1
+            // when satisfying the limits of the read. So node2 needs to overread more repaired data than node1 when
+            // calculating the repaired data digest.
+            cluster.get(2).executeInternal("INSERT INTO "  + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", 30, 30);
+
+            // Verify single partition read
+            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=1 LIMIT 20", ConsistencyLevel.ALL),
+                       rows(1, 30, 11));
+            long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
+
+            // Recreate a mismatch in unrepaired data and verify partition range read
+            cluster.get(2).executeInternal("INSERT INTO "  + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?)", 31, 31);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 30", ConsistencyLevel.ALL),
+                       rows(1, 31, 2));
+            long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
+
+            if (ccAfterPartitionRead != ccAfterRangeRead)
+                if (ccAfterPartitionRead != ccBefore)
+                    fail("Both range and partition reads reported data inconsistencies but none were expected");
+                else
+                    fail("Reported inconsistency during range read but none were expected");
+            else if (ccAfterPartitionRead != ccBefore)
+                fail("Reported inconsistency during partition read but none were expected");
+        }
+    }
+
+    @Test
+    public void testRepairedReadCountNormalizationWithInitialOverread() throws Throwable
+    {
+        // Asserts that the amount of repaired data read for digest generation is consistent
+        // across replicas where one has to read more repaired data to satisfy the original
+        // limits of the read request.
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+
+            cluster.get(1).runOnInstance(() -> {
+                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
+            });
+
+            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
+                                 "WITH CLUSTERING ORDER BY (c DESC)");
+
+            // insert data on both nodes and flush
+            for (int i=0; i<10; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
+                                               ConsistencyLevel.ALL, i, i);
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+            }
+            cluster.forEach(c -> c.flush(KEYSPACE));
+            // nothing is repaired yet
+            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
+            // mark everything repaired
+            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
+            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
+
+            // Add some unrepaired data to both nodes
+            for (int i=10; i<13; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL, i, i);
+            }
+            cluster.forEach(c -> c.flush(KEYSPACE));
+            // And some row deletions on node2 only which cover data in the repaired set
+            // This will cause node2 to read more repaired data in satisfying the limit of the read request
+            // so it should overread less than node1 (in fact, it should not overread at all) in order to
+            // calculate the repaired data digest.
+            for (int i=7; i<10; i++)
+            {
+                cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 0 AND c = ?", i);
+                cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 1 AND c = ?", i);
+            }
+
+            // Verify single partition read
+            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0 LIMIT 5", ConsistencyLevel.ALL),
+                       rows(rows(0, 12, 10), rows(0, 6, 5)));
+            long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
+
+            // Verify partition range read
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 11", ConsistencyLevel.ALL),
+                       rows(rows(1, 12, 10), rows(1, 6, 0), rows(0, 12, 12)));
+            long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
+
+            if (ccAfterPartitionRead != ccAfterRangeRead)
+                if (ccAfterPartitionRead != ccBefore)
+                    fail("Both range and partition reads reported data inconsistencies but none were expected");
+                else
+                    fail("Reported inconsistency during range read but none were expected");
+            else if (ccAfterPartitionRead != ccBefore)
+                fail("Reported inconsistency during partition read but none were expected");
+        }
+    }
+
+    private Object[][] rows(Object[][] head, Object[][]...tail)
+    {
+        return Stream.concat(Stream.of(head),
+                             Stream.of(tail).flatMap(Stream::of))
+                     .toArray(Object[][]::new);
+    }
+
+    private Object[][] rows(int partitionKey, int start, int end)
+    {
+        if (start == end)
+            return new Object[][] { new Object[] { partitionKey, start, end } };
+
+        IntStream clusterings = start > end
+                                ? IntStream.range(end -1, start).map(i -> start - i + end - 1)
+                                : IntStream.range(start, end);
+
+        return clusterings.mapToObj(i -> new Object[] {partitionKey, i, i}).toArray(Object[][]::new);
+    }
+
     private IIsolatedExecutor.SerializableRunnable assertNotRepaired()
     {
         return () ->
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index e0215b7..0824168 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -23,6 +23,8 @@ import java.io.OutputStream;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -41,6 +43,8 @@ import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ReversedType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
@@ -61,6 +65,7 @@ import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.metrics.ClearableHistogram;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
@@ -94,6 +99,7 @@ public class ReadCommandTest
     private static final String CF6 = "Standard6";
     private static final String CF7 = "Counter7";
     private static final String CF8 = "Standard8";
+    private static final String CF9 = "Standard9";
 
     private static final InetAddressAndPort REPAIR_COORDINATOR;
     static {
@@ -178,6 +184,12 @@ public class ReadCommandTest
                      .addRegularColumn("b", AsciiType.instance)
                      .addRegularColumn("c", SetType.getInstance(AsciiType.instance, true));
 
+        TableMetadata.Builder metadata9 =
+        TableMetadata.builder(KEYSPACE, CF9)
+                     .addPartitionKeyColumn("key", Int32Type.instance)
+                     .addClusteringColumn("col", ReversedType.getInstance(Int32Type.instance))
+                     .addRegularColumn("a", AsciiType.instance);
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
@@ -188,7 +200,8 @@ public class ReadCommandTest
                                     metadata5,
                                     metadata6,
                                     metadata7,
-                                    metadata8);
+                                    metadata8,
+                                    metadata9);
 
         LocalSessionAccessor.startup();
     }
@@ -823,14 +836,113 @@ public class ReadCommandTest
         }
     }
 
+    @Test
+    public void testRepairedDataOverreadMetrics()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF9);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        cfs.metadata().withSwapped(cfs.metadata().params.unbuild()
+                                                        .caching(CachingParams.CACHE_NOTHING)
+                                                        .build());
+        // Insert and repair
+        insert(cfs, IntStream.range(0, 10), () -> IntStream.range(0, 10));
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+        // Insert and leave unrepaired
+        insert(cfs, IntStream.range(0, 10), () -> IntStream.range(10, 20));
+
+        // Single partition reads
+        int limit = 5;
+        ReadCommand cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build();
+        assertEquals(0, getAndResetOverreadCount(cfs));
+
+        // No overreads if not tracking
+        readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+        assertEquals(0, getAndResetOverreadCount(cfs));
+
+        // Overread up to (limit - 1) if tracking is enabled
+        cmd = cmd.copy();
+        cmd.trackRepairedStatus();
+        readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+        // overread count is always < limit as the first read is counted during merging (and so is expected)
+        assertEquals(limit - 1, getAndResetOverreadCount(cfs));
+
+        // if limit already requires reading all repaired data, no overreads should be recorded
+        limit = 20;
+        cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build();
+        readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
+        assertEquals(0, getAndResetOverreadCount(cfs));
+
+        // Range reads
+        limit = 5;
+        cmd = Util.cmd(cfs).withLimit(limit).build();
+        assertEquals(0, getAndResetOverreadCount(cfs));
+        // No overreads if not tracking
+        readAndCheckRowCount(Util.getAll(cmd), limit);
+        assertEquals(0, getAndResetOverreadCount(cfs));
+
+        // Overread up to (limit - 1) if tracking is enabled
+        cmd = cmd.copy();
+        cmd.trackRepairedStatus();
+        readAndCheckRowCount(Util.getAll(cmd), limit);
+        assertEquals(limit - 1, getAndResetOverreadCount(cfs));
+
+        // if limit already requires reading all repaired data, no overreads should be recorded
+        limit = 100;
+        cmd = Util.cmd(cfs).withLimit(limit).build();
+        readAndCheckRowCount(Util.getAll(cmd), limit);
+        assertEquals(0, getAndResetOverreadCount(cfs));
+    }
+
     private void setGCGrace(ColumnFamilyStore cfs, int gcGrace)
     {
         TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build();
         KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace);
         Schema.instance.load(
-            keyspaceMetadata.withSwapped(
-                keyspaceMetadata.tables.withSwapped(
-                    cfs.metadata().withSwapped(newParams))));
+        keyspaceMetadata.withSwapped(
+        keyspaceMetadata.tables.withSwapped(
+        cfs.metadata().withSwapped(newParams))));
+    }
+
+    private long getAndResetOverreadCount(ColumnFamilyStore cfs)
+    {
+        // always clear the histogram after reading to make comparisons & asserts easier
+        long rows = cfs.metric.repairedDataTrackingOverreadRows.cf.getSnapshot().getMax();
+        ((ClearableHistogram)cfs.metric.repairedDataTrackingOverreadRows.cf).clear();
+        return rows;
+    }
+
+    private void readAndCheckRowCount(Iterable<FilteredPartition> partitions, int expected)
+    {
+        int count = 0;
+        for (Partition partition : partitions)
+        {
+            assertFalse(partition.isEmpty());
+            try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+            {
+                while (iter.hasNext())
+                {
+                    iter.next();
+                    count++;
+                }
+            }
+        }
+        assertEquals(expected, count);
+    }
+
+    private void insert(ColumnFamilyStore cfs, IntStream partitionIds, Supplier<IntStream> rowIds)
+    {
+        partitionIds.mapToObj(ByteBufferUtil::bytes)
+                    .forEach( pk ->
+                        rowIds.get().forEach( c ->
+                            new RowUpdateBuilder(cfs.metadata(), 0, pk)
+                                .clustering(c)
+                                .add("a", ByteBufferUtil.bytes("abcd"))
+                                .build()
+                                .apply()
+
+                    ));
     }
 
     private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1)
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java b/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java
new file mode 100644
index 0000000..00a1f56
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.stream.IntStream;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Hex;
+
+import static org.apache.cassandra.Util.clustering;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.utils.ByteBufferUtil.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class RepairedDataInfoTest
+{
+    private static ColumnFamilyStore cfs;
+    private static TableMetadata metadata;
+    private static ColumnMetadata valueMetadata;
+    private static ColumnMetadata staticMetadata;
+
+    private final int nowInSec = FBUtilities.nowInSeconds();
+
+    @BeforeClass
+    public static void setUp()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        CommitLog.instance.start();
+        MockSchema.cleanup();
+        String ks = "repaired_data_info_test";
+        cfs = MockSchema.newCFS(ks, metadata -> metadata.addStaticColumn("s", UTF8Type.instance));
+        metadata = cfs.metadata();
+        valueMetadata = metadata.regularColumns().getSimple(0);
+        staticMetadata = metadata.staticColumns().getSimple(0);
+    }
+
+    @Test
+    public void withTrackingAppliesRepairedDataCounter()
+    {
+        DataLimits.Counter counter = DataLimits.cqlLimits(15).newCounter(nowInSec, false, false, false).onlyCount();
+        RepairedDataInfo info = new RepairedDataInfo(counter);
+        info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+        UnfilteredRowIterator[] partitions = new UnfilteredRowIterator[3];
+        for (int i=0; i<3; i++)
+            partitions[i] = partition(bytes(i), rows(0, 5, nowInSec));
+
+        UnfilteredPartitionIterator iter = partitions(partitions);
+        iter = info.withRepairedDataInfo(iter);
+        consume(iter);
+
+        assertEquals(15, counter.counted());
+        assertEquals(5, counter.countedInCurrentPartition());
+    }
+
+    @Test
+    public void digestOfSinglePartitionWithSingleRowAndEmptyStaticRow()
+    {
+        Digest manualDigest = Digest.forRepairedDataTracking();
+        Row[] rows = rows(0, 1, nowInSec);
+        UnfilteredRowIterator partition = partition(bytes(0), rows);
+        addToDigest(manualDigest,
+                    partition.partitionKey().getKey(),
+                    partition.partitionLevelDeletion(),
+                    Rows.EMPTY_STATIC_ROW,
+                    rows);
+        byte[] fromRepairedInfo = consume(partition);
+        assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+    }
+
+    @Test
+    public void digestOfSinglePartitionWithMultipleRowsAndEmptyStaticRow()
+    {
+        Digest manualDigest = Digest.forRepairedDataTracking();
+        Row[] rows = rows(0, 5, nowInSec);
+        UnfilteredRowIterator partition = partition(bytes(0), rows);
+        addToDigest(manualDigest,
+                    partition.partitionKey().getKey(),
+                    partition.partitionLevelDeletion(),
+                    Rows.EMPTY_STATIC_ROW,
+                    rows);
+        byte[] fromRepairedInfo = consume(partition);
+        assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+    }
+
+    @Test
+    public void digestOfSinglePartitionWithMultipleRowsAndTombstones()
+    {
+        Digest manualDigest = Digest.forRepairedDataTracking();
+        Unfiltered[] unfiltereds = new Unfiltered[]
+                                   {
+                                       open(0), close(0),
+                                       row(1, 1, nowInSec),
+                                       open(2), close(4),
+                                       row(5, 7, nowInSec)
+                                   };
+        UnfilteredRowIterator partition = partition(bytes(0), unfiltereds);
+        addToDigest(manualDigest,
+                    partition.partitionKey().getKey(),
+                    partition.partitionLevelDeletion(),
+                    Rows.EMPTY_STATIC_ROW,
+                    unfiltereds);
+        byte[] fromRepairedInfo = consume(partition);
+        assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+    }
+
+    @Test
+    public void digestOfMultiplePartitionsWithMultipleRowsAndNonEmptyStaticRows()
+    {
+        Digest manualDigest = Digest.forRepairedDataTracking();
+        Row staticRow = staticRow(nowInSec);
+        Row[] rows = rows(0, 5, nowInSec);
+        UnfilteredRowIterator[] partitionsArray = new UnfilteredRowIterator[5];
+        for (int i=0; i<5; i++)
+        {
+            UnfilteredRowIterator partition = partitionWithStaticRow(bytes(i), staticRow, rows);
+            partitionsArray[i] = partition;
+            addToDigest(manualDigest,
+                        partition.partitionKey().getKey(),
+                        partition.partitionLevelDeletion(),
+                        staticRow,
+                        rows);
+        }
+
+        UnfilteredPartitionIterator partitions = partitions(partitionsArray);
+        byte[] fromRepairedInfo = consume(partitions);
+        assertArrayEquals(manualDigest.digest(), fromRepairedInfo);
+    }
+
+    private RepairedDataInfo info()
+    {
+        return new RepairedDataInfo(DataLimits.NONE.newCounter(nowInSec, false, false, false));
+    }
+
+    private Digest addToDigest(Digest aggregate,
+                               ByteBuffer partitionKey,
+                               DeletionTime deletion,
+                               Row staticRow,
+                               Unfiltered...unfiltereds)
+    {
+        Digest perPartitionDigest = Digest.forRepairedDataTracking();
+        if (!staticRow.isEmpty())
+            staticRow.digest(perPartitionDigest);
+        perPartitionDigest.update(partitionKey);
+        deletion.digest(perPartitionDigest);
+        for (Unfiltered unfiltered : unfiltereds)
+            unfiltered.digest(perPartitionDigest);
+        byte[] rowDigestBytes = perPartitionDigest.digest();
+        aggregate.update(rowDigestBytes, 0, rowDigestBytes.length);
+        return aggregate;
+    }
+
+    private byte[] consume(UnfilteredPartitionIterator partitions)
+    {
+        RepairedDataInfo info = info();
+        info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+        partitions.forEachRemaining(partition ->
+        {
+            try (UnfilteredRowIterator iter = info.withRepairedDataInfo(partition))
+            {
+                iter.forEachRemaining(u -> {});
+            }
+        });
+        return getArray(info.getDigest());
+    }
+
+    private byte[] consume(UnfilteredRowIterator partition)
+    {
+        RepairedDataInfo info = info();
+        info.prepare(cfs, nowInSec, Integer.MAX_VALUE);
+        try (UnfilteredRowIterator iter = info.withRepairedDataInfo(partition))
+        {
+            iter.forEachRemaining(u -> {});
+        }
+        return getArray(info.getDigest());
+    }
+
+    public static Cell cell(ColumnMetadata def, Object value)
+    {
+        ByteBuffer bb = value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)def.type).decompose(value);
+        return new BufferCell(def, 1L, BufferCell.NO_TTL, BufferCell.NO_DELETION_TIME, bb, null);
+    }
+
+    private Row staticRow(int nowInSec)
+    {
+        Row.Builder builder = BTreeRow.unsortedBuilder();
+        builder.newRow(Clustering.STATIC_CLUSTERING);
+        builder.addCell(cell(staticMetadata, "static value"));
+        return builder.build();
+    }
+
+    private Row row(int clustering, int value, int nowInSec)
+    {
+        Row.Builder builder = BTreeRow.unsortedBuilder();
+        builder.newRow(clustering(metadata.comparator, Integer.toString(clustering)));
+        builder.addCell(cell(valueMetadata, Integer.toString(value)));
+        return builder.build();
+    }
+
+    private Row[] rows(int clusteringStart, int clusteringEnd, int nowInSec)
+    {
+        return IntStream.range(clusteringStart, clusteringEnd)
+                        .mapToObj(v -> row(v, v, nowInSec))
+                        .toArray(Row[]::new);
+    }
+
+    private RangeTombstoneBoundMarker open(int start)
+    {
+        return new RangeTombstoneBoundMarker(
+            ClusteringBound.create(ClusteringBound.boundKind(true, true),
+                                   new ByteBuffer[] { Clustering.make(Int32Type.instance.decompose(start)).get(0)}),
+            new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
+    }
+
+    private RangeTombstoneBoundMarker close(int close)
+    {
+        return new RangeTombstoneBoundMarker(
+            ClusteringBound.create(ClusteringBound.boundKind(false, true),
+                                   new ByteBuffer[] { Clustering.make(Int32Type.instance.decompose(close)).get(0)}),
+            new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
+    }
+
+    private UnfilteredRowIterator partition(ByteBuffer pk, Unfiltered... unfiltereds)
+    {
+        return partitionWithStaticRow(pk, Rows.EMPTY_STATIC_ROW, unfiltereds);
+    }
+
+    private UnfilteredRowIterator partitionWithStaticRow(ByteBuffer pk, Row staticRow, Unfiltered... unfiltereds)
+    {
+        Iterator<Unfiltered> unfilteredIterator = Arrays.asList(unfiltereds).iterator();
+        return new AbstractUnfilteredRowIterator(metadata, dk(pk), DeletionTime.LIVE, metadata.regularAndStaticColumns(), staticRow, false, EncodingStats.NO_STATS) {
+            protected Unfiltered computeNext()
+            {
+                return unfilteredIterator.hasNext() ? unfilteredIterator.next() : endOfData();
+            }
+        };
+    }
+
+    private static UnfilteredPartitionIterator partitions(UnfilteredRowIterator...partitions)
+    {
+        Iterator<UnfilteredRowIterator> partitionsIter = Arrays.asList(partitions).iterator();
+        return new AbstractUnfilteredPartitionIterator()
+        {
+            public TableMetadata metadata()
+            {
+                return metadata;
+            }
+
+            public boolean hasNext()
+            {
+                return partitionsIter.hasNext();
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                return partitionsIter.next();
+            }
+        };
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Merge branch 'cassandra-3.11' into trunk

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit b3ecbf38a3b9bd7dbcafb5caccacda4bc7d356a0
Merge: 781e486 d6beb01
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Thu Apr 16 18:29:05 2020 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   6 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 +
 .../cassandra/db/SinglePartitionReadCommand.java   |  83 +++---
 .../apache/cassandra/repair/RepairRunnable.java    |  76 +++++-
 .../cassandra/service/SnapshotVerbHandler.java     |  67 +++++
 .../org/apache/cassandra/service/StorageProxy.java |  18 ++
 .../cassandra/service/StorageProxyMBean.java       |   4 +
 .../cassandra/service/reads/DataResolver.java      |   2 +-
 .../service/reads/repair/RepairedDataVerifier.java |  80 +++++-
 .../distributed/test/PreviewRepairTest.java        | 106 +++++++-
 .../distributed/test/RepairDigestTrackingTest.java | 285 ++++++++++++++-------
 .../distributed/test/SimpleReadWriteTest.java      | 101 ++++++++
 .../reads/repair/RepairedDataVerifierTest.java     |   4 +-
 14 files changed, 697 insertions(+), 146 deletions(-)

diff --cc CHANGES.txt
index 8f19c2a,dbdb779..96eeed4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,53 -1,9 +1,54 @@@
 -3.11.7
 +4.0-alpha4
 + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
 + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
 + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
 + * Improve logging around incremental repair (CASSANDRA-15599)
 + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688)
 + * Replace array iterators with get by index (CASSANDRA-15394)
 + * Minimize BTree iterator allocations (CASSANDRA-15389)
 + * Add client request size server metrics (CASSANDRA-15704)
 + * Add additional logging around FileUtils and compaction leftover cleanup (CASSANDRA-15705)
 + * Mark system_views/system_virtual_schema as non-alterable keyspaces in cqlsh (CASSANDRA-15711)
 + * Fail incremental repair if an old version sstable is involved (CASSANDRA-15612)
 + * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773)
 + * Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706)
 + * Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390)
 + * Repair history tables should have TTL and TWCS (CASSANDRA-12701)
 + * Fix cqlsh erroring out on Python 3.7 due to webbrowser module being absent (CASSANDRA-15572)
 + * Fix IMH#acquireCapacity() to return correct Outcome when endpoint reserve runs out (CASSANDRA-15607)
 + * Fix nodetool describering output (CASSANDRA-15682)
 + * Only track ideal CL failure when request CL met (CASSANDRA-15696)
 + * Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and ConsistentSession (CASSANDRA-15672)
 + * Fix force compaction of wrapping ranges (CASSANDRA-15664)
 + * Expose repair streaming metrics (CASSANDRA-15656)
 + * Set now in seconds in the future for validation repairs (CASSANDRA-15655)
 + * Emit metric on preview repair failure (CASSANDRA-15654)
 + * Use more appropriate logging levels (CASSANDRA-15661)
 + * Fixed empty check in TrieMemIndex due to potential state inconsistency in ConcurrentSkipListMap (CASSANDRA-15526)
 + * Added UnleveledSSTables global and table level metric (CASSANDRA-15620)
 + * Added Virtual Table exposing Cassandra relevant system properties (CASSANDRA-15616, CASSANDRA-15643)
 + * Improve the algorithmic token allocation in case racks = RF (CASSANDRA-15600)
 + * Fix ConnectionTest.testAcquireReleaseOutbound (CASSANDRA-15308)
 + * Include finalized pending sstables in preview repair (CASSANDRA-15553)
 + * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE (CASSANDRA-15271)
 + * Correct inaccurate logging message (CASSANDRA-15549)
 + * Unset GREP_OPTIONS (CASSANDRA-14487)
 + * Update to Python driver 3.21 for cqlsh (CASSANDRA-14872)
 + * Fix missing Keyspaces in cqlsh describe output (CASSANDRA-15576)
 + * Fix multi DC nodetool status output (CASSANDRA-15305)
 + * updateCoordinatorWriteLatencyTableMetric can produce misleading metrics (CASSANDRA-15569)
 + * Make cqlsh and cqlshlib Python 2 & 3 compatible (CASSANDRA-10190)
 + * Improve the description of nodetool listsnapshots command (CASSANDRA-14587)
 + * allow embedded cassandra launched from a one-jar or uno-jar (CASSANDRA-15494)
 + * Update hppc library to version 0.8.1 (CASSANDRA-12995)
 + * Limit the dependencies used by UDFs/UDAs (CASSANDRA-14737)
 + * Make native_transport_max_concurrent_requests_in_bytes updatable (CASSANDRA-15519)
 + * Cleanup and improvements to IndexInfo/ColumnIndex (CASSANDRA-15469)
 + * Potential Overflow in DatabaseDescriptor Functions That Convert Between KB/MB & Bytes (CASSANDRA-15470)
 +Merged from 3.11:
   * Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
  Merged from 3.0:
 -=======
 -3.0.21
+  * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690)
   * Memtable memory allocations may deadlock (CASSANDRA-15367)
   * Run evictFromMembership in GossipStage (CASSANDRA-15592)
  Merged from 2.2:
diff --cc src/java/org/apache/cassandra/config/Config.java
index 2792694,7f28546..957662a
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -415,69 -387,6 +415,75 @@@ public class Confi
      public volatile boolean back_pressure_enabled = false;
      public volatile ParameterizedClass back_pressure_strategy;
  
 +    public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
 +    public int repair_command_pool_size = concurrent_validations;
 +
 +    /**
 +     * When a node first starts up it intially considers all other peers as DOWN and is disconnected from all of them.
 +     * To be useful as a coordinator (and not introduce latency penalties on restart) this node must have successfully
 +     * opened all three internode TCP connections (gossip, small, and large messages) before advertising to clients.
 +     * Due to this, by default, Casssandra will prime these internode TCP connections and wait for all but a single
 +     * node to be DOWN/disconnected in the local datacenter before offering itself as a coordinator, subject to a
 +     * timeout. See CASSANDRA-13993 and CASSANDRA-14297 for more details.
 +     *
 +     * We provide two tunables to control this behavior as some users may want to block until all datacenters are
 +     * available (global QUORUM/EACH_QUORUM), some users may not want to block at all (clients that already work
 +     * around the problem), and some users may want to prime the connections but not delay startup.
 +     *
 +     * block_for_peers_timeout_in_secs: controls how long this node will wait to connect to peers. To completely disable
 +     * any startup connectivity checks set this to -1. To trigger the internode connections but immediately continue
 +     * startup, set this to to 0. The default is 10 seconds.
 +     *
 +     * block_for_peers_in_remote_dcs: controls if this node will consider remote datacenters to wait for. The default
 +     * is to _not_ wait on remote datacenters.
 +     */
 +    public int block_for_peers_timeout_in_secs = 10;
 +    public boolean block_for_peers_in_remote_dcs = false;
 +
 +    public volatile boolean automatic_sstable_upgrade = false;
 +    public volatile int max_concurrent_automatic_sstable_upgrades = 1;
 +    public boolean stream_entire_sstables = true;
 +
 +    public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
 +    public volatile FullQueryLoggerOptions full_query_logging_options = new FullQueryLoggerOptions();
 +
 +    public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled;
 +
 +    public volatile boolean diagnostic_events_enabled = false;
 +
 +    /**
 +     * flags for enabling tracking repaired state of data during reads
 +     * separate flags for range & single partition reads as single partition reads are only tracked
 +     * when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
 +     * enabled for range reads, all such reads will include repaired data tracking. As this adds
 +     * some overhead, operators may wish to disable it whilst still enabling it for partition reads
 +     */
 +    public volatile boolean repaired_data_tracking_for_range_reads_enabled = false;
 +    public volatile boolean repaired_data_tracking_for_partition_reads_enabled = false;
 +    /* If true, unconfirmed mismatches (those which cannot be considered conclusive proof of out of
 +     * sync repaired data due to the presence of pending repair sessions, or unrepaired partition
 +     * deletes) will increment a metric, distinct from confirmed mismatches. If false, unconfirmed
 +     * mismatches are simply ignored by the coordinator.
 +     * This is purely to allow operators to avoid potential signal:noise issues as these types of
 +     * mismatches are considerably less actionable than their confirmed counterparts. Setting this
 +     * to true only disables the incrementing of the counters when an unconfirmed mismatch is found
 +     * and has no other effect on the collection or processing of the repaired data.
 +     */
 +    public volatile boolean report_unconfirmed_repaired_data_mismatches = false;
++    /*
++     * If true, when a repaired data mismatch is detected at read time or during a preview repair,
++     * a snapshot request will be issued to each particpating replica. These are limited at the replica level
++     * so that only a single snapshot per-table per-day can be taken via this method.
++     */
++    public volatile boolean snapshot_on_repaired_data_mismatch = false;
 +
 +    /**
 +     * number of seconds to set nowInSec into the future when performing validation previews against repaired data
 +     * this (attempts) to prevent a race where validations on different machines are started on different sides of
 +     * a tombstone being compacted away
 +     */
 +    public volatile int validation_preview_purge_head_start_in_sec = 60 * 60;
 +
      /**
       * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
       */
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4821653,1e2e1a1..85bfa88
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -2887,124 -2585,6 +2887,134 @@@ public class DatabaseDescripto
          return backPressureStrategy;
      }
  
 +    public static ConsistencyLevel getIdealConsistencyLevel()
 +    {
 +        return conf.ideal_consistency_level;
 +    }
 +
 +    public static void setIdealConsistencyLevel(ConsistencyLevel cl)
 +    {
 +        conf.ideal_consistency_level = cl;
 +    }
 +
 +    public static int getRepairCommandPoolSize()
 +    {
 +        return conf.repair_command_pool_size;
 +    }
 +
 +    public static Config.RepairCommandPoolFullStrategy getRepairCommandPoolFullStrategy()
 +    {
 +        return conf.repair_command_pool_full_strategy;
 +    }
 +
 +    public static FullQueryLoggerOptions getFullQueryLogOptions()
 +    {
 +        return  conf.full_query_logging_options;
 +    }
 +
 +    public static boolean getBlockForPeersInRemoteDatacenters()
 +    {
 +        return conf.block_for_peers_in_remote_dcs;
 +    }
 +
 +    public static int getBlockForPeersTimeoutInSeconds()
 +    {
 +        return conf.block_for_peers_timeout_in_secs;
 +    }
 +
 +    public static boolean automaticSSTableUpgrade()
 +    {
 +        return conf.automatic_sstable_upgrade;
 +    }
 +
 +    public static void setAutomaticSSTableUpgradeEnabled(boolean enabled)
 +    {
 +        if (conf.automatic_sstable_upgrade != enabled)
 +            logger.debug("Changing automatic_sstable_upgrade to {}", enabled);
 +        conf.automatic_sstable_upgrade = enabled;
 +    }
 +
 +    public static int maxConcurrentAutoUpgradeTasks()
 +    {
 +        return conf.max_concurrent_automatic_sstable_upgrades;
 +    }
 +
 +    public static void setMaxConcurrentAutoUpgradeTasks(int value)
 +    {
 +        if (conf.max_concurrent_automatic_sstable_upgrades != value)
 +            logger.debug("Changing max_concurrent_automatic_sstable_upgrades to {}", value);
 +        validateMaxConcurrentAutoUpgradeTasksConf(value);
 +        conf.max_concurrent_automatic_sstable_upgrades = value;
 +    }
 +
 +    private static void validateMaxConcurrentAutoUpgradeTasksConf(int value)
 +    {
 +        if (value < 0)
 +            throw new ConfigurationException("max_concurrent_automatic_sstable_upgrades can't be negative");
 +        if (value > getConcurrentCompactors())
 +            logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
 +    }
 +    
 +    public static AuditLogOptions getAuditLoggingOptions()
 +    {
 +        return conf.audit_logging_options;
 +    }
 +
 +    public static void setAuditLoggingOptions(AuditLogOptions auditLoggingOptions)
 +    {
 +        conf.audit_logging_options = auditLoggingOptions;
 +    }
 +
 +    public static Config.CorruptedTombstoneStrategy getCorruptedTombstoneStrategy()
 +    {
 +        return conf.corrupted_tombstone_strategy;
 +    }
 +
 +    public static void setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy strategy)
 +    {
 +        conf.corrupted_tombstone_strategy = strategy;
 +    }
 +
 +    public static boolean getRepairedDataTrackingForRangeReadsEnabled()
 +    {
 +        return conf.repaired_data_tracking_for_range_reads_enabled;
 +    }
 +
 +    public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled)
 +    {
 +        conf.repaired_data_tracking_for_range_reads_enabled = enabled;
 +    }
 +
 +    public static boolean getRepairedDataTrackingForPartitionReadsEnabled()
 +    {
 +        return conf.repaired_data_tracking_for_partition_reads_enabled;
 +    }
 +
 +    public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled)
 +    {
 +        conf.repaired_data_tracking_for_partition_reads_enabled = enabled;
 +    }
 +
++    public static boolean snapshotOnRepairedDataMismatch()
++    {
++        return conf.snapshot_on_repaired_data_mismatch;
++    }
++
++    public static void setSnapshotOnRepairedDataMismatch(boolean enabled)
++    {
++        conf.snapshot_on_repaired_data_mismatch = enabled;
++    }
++
 +    public static boolean reportUnconfirmedRepairedDataMismatches()
 +    {
 +        return conf.report_unconfirmed_repaired_data_mismatches;
 +    }
 +
 +    public static void reportUnconfirmedRepairedDataMismatches(boolean enabled)
 +    {
 +        conf.report_unconfirmed_repaired_data_mismatches = enabled;
 +    }
 +
      public static boolean strictRuntimeChecks()
      {
          return strictRuntimeChecks;
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 4daad7d,3d6559b..fe440f4
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -618,74 -725,64 +618,71 @@@ public class SinglePartitionReadComman
               *   timestamp(tombstone) > maxTimestamp_s0
               * since we necessarily have
               *   timestamp(tombstone) <= maxTimestamp_s1
-              * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
-              * in one pass, and minimize the number of sstables for which we read a partition tombstone.
-              */
+              * In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone
+              * elimination in one pass, and minimize the number of sstables for which we read a partition tombstone.
+             */
+             Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
 -            long mostRecentPartitionTombstone = Long.MIN_VALUE;
              int nonIntersectingSSTables = 0;
-             List<SSTableReader> skippedSSTablesWithTombstones = null;
+             int includedDueToTombstones = 0;
+ 
              SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
  
 +            if (isTrackingRepairedStatus())
 +                Tracing.trace("Collecting data from sstables and tracking repaired status");
 +
              for (SSTableReader sstable : view.sstables)
              {
                  // if we've already seen a partition tombstone with a timestamp greater
                  // than the most recent update to this sstable, we can skip it
 +                // if we're tracking repaired status, we mark the repaired digest inconclusive
 +                // as other replicas may not have seen this partition delete and so could include
 +                // data from this sstable (or others) in their digests
                  if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
 +                {
 +                    inputCollector.markInconclusive();
                      break;
 +                }
  
-                 if (!shouldInclude(sstable))
-                 {
-                     nonIntersectingSSTables++;
-                     if (sstable.mayHaveTombstones())
-                     { // if sstable has tombstones we need to check after one pass if it can be safely skipped
-                         if (skippedSSTablesWithTombstones == null)
-                             skippedSSTablesWithTombstones = new ArrayList<>();
-                         skippedSSTablesWithTombstones.add(sstable);
- 
-                     }
-                     continue;
-                 }
- 
-                 minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
- 
-                 @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception,
-                                               // or through the closing of the final merged iterator
-                 UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
-                 if (!sstable.isRepaired())
-                     oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- 
-                 inputCollector.addSSTableIterator(sstable, iter);
-                 mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
-                                                         iter.partitionLevelDeletion().markedForDeleteAt());
-             }
- 
-             int includedDueToTombstones = 0;
-             // Check for sstables with tombstones that are not expired
-             if (skippedSSTablesWithTombstones != null)
-             {
-                 for (SSTableReader sstable : skippedSSTablesWithTombstones)
+                 if (shouldInclude(sstable))
                  {
-                     if (sstable.getMaxTimestamp() <= minTimestamp)
-                         continue;
- 
-                     @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception,
-                                                   // or through the closing of the final merged iterator
-                     UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
                      if (!sstable.isRepaired())
                          oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
  
+                     // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+                     @SuppressWarnings("resource")
 -                    UnfilteredRowIterator iter = makeIterator(cfs, sstable, true, metricsCollector);
 -                    iterators.add(iter);
++                    UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
 +                    inputCollector.addSSTableIterator(sstable, iter);
-                     includedDueToTombstones++;
+                     mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+                                                             iter.partitionLevelDeletion().markedForDeleteAt());
+                 }
+                 else
+                 {
 -
+                     nonIntersectingSSTables++;
+                     // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+                     if (sstable.mayHaveTombstones())
+                     {
+                         // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+                         @SuppressWarnings("resource")
 -                        UnfilteredRowIterator iter = makeIterator(cfs, sstable, true, metricsCollector);
++                        UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
+                         // if the sstable contains a partition delete, then we must include it regardless of whether it
+                         // shadows any other data seen locally as we can't guarantee that other replicas have seen it
+                         if (!iter.partitionLevelDeletion().isLive())
+                         {
+                             if (!sstable.isRepaired())
+                                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 -                            iterators.add(iter);
++                            inputCollector.addSSTableIterator(sstable, iter);
+                             includedDueToTombstones++;
+                             mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+                                                                     iter.partitionLevelDeletion().markedForDeleteAt());
+                         }
+                         else
+                         {
+                             iter.close();
+                         }
+                     }
                  }
              }
+ 
              if (Tracing.isTracing())
                  Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
                                 nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index c7a6d71,35794e2..ae35aaf
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -17,15 -17,9 +17,16 @@@
   */
  package org.apache.cassandra.repair;
  
 -import java.net.InetAddress;
 +import java.io.IOException;
  import java.nio.ByteBuffer;
 -import java.util.*;
 +import java.util.ArrayList;
 +import java.util.Collection;
++import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutorService;
  import java.util.concurrent.LinkedBlockingQueue;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
@@@ -49,35 -33,22 +50,40 @@@ import org.apache.commons.lang3.time.Du
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 +import com.codahale.metrics.Timer;
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.NamedThreadFactory;
 -import org.apache.cassandra.config.SchemaConstants;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.metrics.RepairMetrics;
++import org.apache.cassandra.db.SnapshotCommand;
++import org.apache.cassandra.gms.FailureDetector;
++import org.apache.cassandra.net.Message;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.repair.consistent.SyncStatSummary;
++import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.cql3.QueryOptions;
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.cql3.statements.SelectStatement;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.locator.EndpointsForRange;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.repair.consistent.CoordinatorSession;
- import org.apache.cassandra.repair.consistent.SyncStatSummary;
  import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.schema.SchemaConstants;
  import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 +import org.apache.cassandra.streaming.PreviewKind;
  import org.apache.cassandra.tracing.TraceKeyspace;
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
@@@ -425,262 -306,68 +431,326 @@@ public class RepairRunnable implements 
                          hasFailure.compareAndSet(false, true);
                      }
                  }
 -                return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
 +                return Futures.immediateFuture(null);
              }
 -        });
 -        Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
 +        }, MoreExecutors.directExecutor());
 +        Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor());
 +    }
 +
 +    /**
 +     * removes dead nodes from common ranges, and exludes ranges left without any participants
 +     */
 +    @VisibleForTesting
 +    static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force)
 +    {
 +        if (!force)
          {
 -            public void onSuccess(Object result)
 +            return commonRanges;
 +        }
 +        else
 +        {
 +            List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
 +
 +            for (CommonRange commonRange : commonRanges)
              {
 -                SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
 -                if (hasFailure.get())
 +                Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
 +                Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
 +                Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
 +
 +                // this node is implicitly a participant in this repair, so a single endpoint is ok here
 +                if (!endpoints.isEmpty())
                  {
 -                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
 -                                                             "Some repair failed"));
 +                    filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges));
                  }
 -                else
 +            }
 +            Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
 +            return filtered;
 +        }
 +    }
 +
 +    private void incrementalRepair(UUID parentSession,
 +                                   long startTime,
 +                                   boolean forceRepair,
 +                                   TraceState traceState,
 +                                   Set<InetAddressAndPort> allNeighbors,
 +                                   List<CommonRange> commonRanges,
 +                                   String... cfnames)
 +    {
 +        // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
 +        Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
 +                                                  .addAll(allNeighbors)
 +                                                  .add(FBUtilities.getBroadcastAddressAndPort())
 +                                                  .build();
 +
 +        List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
 +
 +        CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, forceRepair);
 +        ListeningExecutorService executor = createExecutor();
 +        AtomicBoolean hasFailure = new AtomicBoolean(false);
 +        ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
 +                                                                   hasFailure);
 +        Collection<Range<Token>> ranges = new HashSet<>();
 +        for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges))
 +        {
 +            ranges.addAll(range);
 +        }
 +        Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, ranges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor());
 +    }
 +
 +    private void previewRepair(UUID parentSession,
 +                               long startTime,
 +                               List<CommonRange> commonRanges,
 +                               String... cfnames)
 +    {
 +
 +        logger.debug("Starting preview repair for {}", parentSession);
 +        // Set up RepairJob executor for this repair command.
 +        ListeningExecutorService executor = createExecutor();
 +
 +        final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
 +
 +        Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
 +        {
 +            public void onSuccess(List<RepairSessionResult> results)
 +            {
 +                try
                  {
 -                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
 -                                                             "Repair completed successfully"));
 +                    if (results == null || results.stream().anyMatch(s -> s == null))
 +                    {
 +                        // something failed
 +                        fail(null);
 +                        return;
 +                    }
 +                    PreviewKind previewKind = options.getPreviewKind();
 +                    Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
 +                    SyncStatSummary summary = new SyncStatSummary(true);
 +                    summary.consumeSessionResults(results);
 +
 +                    final String message;
 +                    if (summary.isEmpty())
 +                    {
 +                        message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
 +                    }
 +                    else
 +                    {
 +                        message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
 +                        RepairMetrics.previewFailures.inc();
++                        if (previewKind == PreviewKind.REPAIRED)
++                            maybeSnapshotReplicas(parentSession, keyspace, results);
 +                    }
 +                    notification(message);
 +
 +                    success("Repair preview completed successfully");
 +                }
 +                catch (Throwable t)
 +                {
 +                    logger.error("Error completing preview repair", t);
 +                    onFailure(t);
 +                }
 +                finally
 +                {
 +                    executor.shutdownNow();
                  }
 -                repairComplete();
              }
  
              public void onFailure(Throwable t)
              {
 -                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
 -                SystemDistributedKeyspace.failParentRepair(parentSession, t);
 -                repairComplete();
 +                notifyError(t);
 +                fail("Error completing preview repair: " + t.getMessage());
 +                executor.shutdownNow();
              }
 +        }, MoreExecutors.directExecutor());
 +    }
 +
++    private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results)
++    {
++        if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
++            return;
+ 
 -            private void repairComplete()
++        try
++        {
++            Set<String> mismatchingTables = new HashSet<>();
++            Set<InetAddressAndPort> nodes = new HashSet<>();
++            for (RepairSessionResult sessionResult : results)
+             {
 -                String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
 -                                                                          true, true);
 -                String message = String.format("Repair command #%d finished in %s", cmd, duration);
 -                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
 -                logger.info(message);
 -                if (options.isTraced() && traceState != null)
++                for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
+                 {
 -                    for (ProgressListener listener : listeners)
 -                        traceState.removeProgressListener(listener);
 -                    // Because DebuggableThreadPoolExecutor#afterExecute and this callback
 -                    // run in a nondeterministic order (within the same thread), the
 -                    // TraceState may have been nulled out at this point. The TraceState
 -                    // should be traceState, so just set it without bothering to check if it
 -                    // actually was nulled out.
 -                    Tracing.instance.set(traceState);
 -                    Tracing.traceRepair(message);
 -                    Tracing.instance.stopSession();
++                    for (SyncStat stat : emptyIfNull(repairResult.stats))
++                    {
++                        if (stat.numberOfDifferences > 0)
++                            mismatchingTables.add(repairResult.desc.columnFamily);
++                        // snapshot all replicas, even if they don't have any differences
++                        nodes.add(stat.nodes.coordinator);
++                        nodes.add(stat.nodes.peer);
++                    }
+                 }
 -                executor.shutdownNow();
+             }
 -        });
++
++            String snapshotName = RepairedDataVerifier.SnapshottingVerifier.getSnapshotName();
++            for (String table : mismatchingTables)
++            {
++                // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
++                if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
++                {
++                    logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
++                                options.getPreviewKind().logPrefix(parentSession),
++                                keyspace, table, snapshotName, nodes)
++                    ;
++                    Message<SnapshotCommand> message = Message.out(Verb.SNAPSHOT_REQ, new SnapshotCommand(keyspace,
++                                                                                                          table,
++                                                                                                          snapshotName,
++                                                                                                          false));
++                    for (InetAddressAndPort target : nodes)
++                        MessagingService.instance().send(message, target);
++                }
++                else
++                {
++                    logger.info("{} Not snapshotting {}.{} - snapshot {} exists",
++                                options.getPreviewKind().logPrefix(parentSession),
++                                keyspace, table, snapshotName);
++                }
++            }
++        }
++        catch (Exception e)
++        {
++            logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e);
++        }
+     }
+ 
 -    private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
++    private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
+     {
 -        for (int i = 0; i < neighborRangeList.size(); i++)
++        if (iter == null)
++            return Collections.emptyList();
++        return iter;
++    }
++
 +    private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession,
 +                                                                             boolean isIncremental,
 +                                                                             ListeningExecutorService executor,
 +                                                                             List<CommonRange> commonRanges,
 +                                                                             String... cfnames)
 +    {
 +        List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
 +
 +        // we do endpoint filtering at the start of an incremental repair,
 +        // so repair sessions shouldn't also be checking liveness
 +        boolean force = options.isForcedRepair() && !isIncremental;
 +        for (CommonRange commonRange : commonRanges)
 +        {
 +            logger.info("Starting RepairSession for {}", commonRange);
 +            RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
 +                                                                                     commonRange,
 +                                                                                     keyspace,
 +                                                                                     options.getParallelism(),
 +                                                                                     isIncremental,
 +                                                                                     options.isPullRepair(),
 +                                                                                     force,
 +                                                                                     options.getPreviewKind(),
 +                                                                                     options.optimiseStreams(),
 +                                                                                     executor,
 +                                                                                     cfnames);
 +            if (session == null)
 +                continue;
 +            Futures.addCallback(session, new RepairSessionCallback(session), MoreExecutors.directExecutor());
 +            futures.add(session);
 +        }
 +        return Futures.successfulAsList(futures);
 +    }
 +
 +    private ListeningExecutorService createExecutor()
 +    {
 +        return MoreExecutors.listeningDecorator(new JMXEnabledThreadPoolExecutor(options.getJobThreads(),
 +                                                                                 Integer.MAX_VALUE,
 +                                                                                 TimeUnit.SECONDS,
 +                                                                                 new LinkedBlockingQueue<>(),
 +                                                                                 new NamedThreadFactory("Repair#" + cmd),
 +                                                                                 "internal"));
 +    }
 +
 +    private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
 +    {
 +        private final RepairSession session;
 +
 +        public RepairSessionCallback(RepairSession session)
 +        {
 +            this.session = session;
 +        }
 +
 +        public void onSuccess(RepairSessionResult result)
 +        {
 +            String message = String.format("Repair session %s for range %s finished", session.getId(),
 +                                           session.ranges().toString());
 +            logger.info(message);
 +            fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
 +                                                progressCounter.incrementAndGet(),
 +                                                totalProgress,
 +                                                message));
 +        }
 +
 +        public void onFailure(Throwable t)
          {
 -            Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i);
 +            String message = String.format("Repair session %s for range %s failed with error %s",
 +                                           session.getId(), session.ranges().toString(), t.getMessage());
 +            notifyError(new RuntimeException(message, t));
 +        }
 +    }
  
 -            if (p.left.containsAll(neighbors))
 +    private class RepairCompleteCallback implements FutureCallback<Object>
 +    {
 +        final UUID parentSession;
 +        final Collection<Range<Token>> successfulRanges;
 +        final long startTime;
 +        final TraceState traceState;
 +        final AtomicBoolean hasFailure;
 +        final ExecutorService executor;
 +
 +        public RepairCompleteCallback(UUID parentSession,
 +                                      Collection<Range<Token>> successfulRanges,
 +                                      long startTime,
 +                                      TraceState traceState,
 +                                      AtomicBoolean hasFailure,
 +                                      ExecutorService executor)
 +        {
 +            this.parentSession = parentSession;
 +            this.successfulRanges = successfulRanges;
 +            this.startTime = startTime;
 +            this.traceState = traceState;
 +            this.hasFailure = hasFailure;
 +            this.executor = executor;
 +        }
 +
 +        public void onSuccess(Object result)
 +        {
 +            maybeStoreParentRepairSuccess(successfulRanges);
 +            if (hasFailure.get())
              {
 -                p.right.add(range);
 +                fail(null);
 +            }
 +            else
 +            {
 +                success("Repair completed successfully");
 +            }
 +            executor.shutdownNow();
 +        }
 +
 +        public void onFailure(Throwable t)
 +        {
 +            notifyError(t);
 +            fail(t.getMessage());
 +            executor.shutdownNow();
 +        }
 +    }
 +
 +    private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
 +    {
 +        Set<InetAddressAndPort> endpoints = neighbors.endpoints();
 +        Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints();
 +
 +        for (CommonRange commonRange : neighborRangeList)
 +        {
 +            if (commonRange.matchesEndpoints(endpoints, transEndpoints))
 +            {
 +                commonRange.ranges.add(range);
                  return;
              }
          }
diff --cc src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index cf2872b,a997533..4d8c4df
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@@ -17,30 -17,30 +17,97 @@@
   */
  package org.apache.cassandra.service;
  
++import java.util.concurrent.Executor;
++import java.util.concurrent.Executors;
++
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
++import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.SnapshotCommand;
  import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.IVerbHandler;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
  
  public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
  {
 +    public static final SnapshotVerbHandler instance = new SnapshotVerbHandler();
++    public static final String REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX = "RepairedDataMismatch-";
++    private static final Executor REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR = Executors.newSingleThreadExecutor();
 +
      private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
  
 -    public void doVerb(MessageIn<SnapshotCommand> message, int id)
 +    public void doVerb(Message<SnapshotCommand> message)
      {
          SnapshotCommand command = message.payload;
          if (command.clear_snapshot)
+         {
              Keyspace.clearSnapshot(command.snapshot_name, command.keyspace);
+         }
++        else if (command.snapshot_name.startsWith(REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX))
++        {
++            REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR.execute(new RepairedDataSnapshotTask(command, message.from()));
++        }
          else
++        {
              Keyspace.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
 -        logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from);
 -        MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
++        }
 +
 +        logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from());
 +        MessagingService.instance().send(message.emptyResponse(), message.from());
 +    }
++
++    private static class RepairedDataSnapshotTask implements Runnable
++    {
++        final SnapshotCommand command;
++        final InetAddressAndPort from;
++
++        RepairedDataSnapshotTask(SnapshotCommand command, InetAddressAndPort from)
++        {
++            this.command = command;
++            this.from = from;
++        }
++
++        public void run()
++        {
++            try
++            {
++                Keyspace ks = Keyspace.open(command.keyspace);
++                if (ks == null)
++                {
++                    logger.info("Snapshot request received from {} for {}.{} but keyspace not found",
++                                from,
++                                command.keyspace,
++                                command.column_family);
++                    return;
++                }
++
++                ColumnFamilyStore cfs = ks.getColumnFamilyStore(command.column_family);
++                if (cfs.snapshotExists(command.snapshot_name))
++                {
++                    logger.info("Received snapshot request from {} for {}.{} following repaired data mismatch, " +
++                                "but snapshot with tag {} already exists",
++                                from,
++                                command.keyspace,
++                                command.column_family,
++                                command.snapshot_name);
++                    return;
++                }
++                logger.info("Creating snapshot requested by {} of {}.{} following repaired data mismatch",
++                            from,
++                            command.keyspace,
++                            command.column_family);
++                cfs.snapshot(command.snapshot_name);
++            }
++            catch (IllegalArgumentException e)
++            {
++                logger.warn("Snapshot request received from {} for {}.{} but table not found",
++                            from,
++                            command.keyspace,
++                            command.column_family);
++            }
++        }
+     }
  }
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 59e6de7,15fe938..38e9c44
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -2764,107 -2813,11 +2764,125 @@@ public class StorageProxy implements St
          return Schema.instance.getNumberOfTables();
      }
  
 +    public String getIdealConsistencyLevel()
 +    {
 +        return DatabaseDescriptor.getIdealConsistencyLevel().toString();
 +    }
 +
 +    public String setIdealConsistencyLevel(String cl)
 +    {
 +        ConsistencyLevel original = DatabaseDescriptor.getIdealConsistencyLevel();
 +        ConsistencyLevel newCL = ConsistencyLevel.valueOf(cl.trim().toUpperCase());
 +        DatabaseDescriptor.setIdealConsistencyLevel(newCL);
 +        return String.format("Updating ideal consistency level new value: %s old value %s", newCL, original.toString());
 +    }
 +
 +    @Deprecated
      public int getOtcBacklogExpirationInterval() {
 -        return DatabaseDescriptor.getOtcBacklogExpirationInterval();
 +        return 0;
 +    }
 +
 +    @Deprecated
 +    public void setOtcBacklogExpirationInterval(int intervalInMillis) { }
 +
 +    @Override
 +    public void enableRepairedDataTrackingForRangeReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
 +    }
 +
 +    @Override
 +    public void disableRepairedDataTrackingForRangeReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
      }
  
 -    public void setOtcBacklogExpirationInterval(int intervalInMillis) {
 -        DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
 +    @Override
 +    public boolean getRepairedDataTrackingEnabledForRangeReads()
 +    {
 +        return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
 +    }
 +
 +    @Override
 +    public void enableRepairedDataTrackingForPartitionReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
 +    }
 +
 +    @Override
 +    public void disableRepairedDataTrackingForPartitionReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
 +    }
 +
 +    @Override
 +    public boolean getRepairedDataTrackingEnabledForPartitionReads()
 +    {
 +        return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
 +    }
 +
 +    @Override
 +    public void enableReportingUnconfirmedRepairedDataMismatches()
 +    {
 +        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
 +    }
 +
 +    @Override
 +    public void disableReportingUnconfirmedRepairedDataMismatches()
 +    {
 +       DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
 +    }
 +
 +    @Override
 +    public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled()
 +    {
 +        return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
 +    }
 +
++    @Override
++    public boolean getSnapshotOnRepairedDataMismatchEnabled()
++    {
++        return DatabaseDescriptor.snapshotOnRepairedDataMismatch();
++    }
++
++    @Override
++    public void enableSnapshotOnRepairedDataMismatch()
++    {
++        DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(true);
++    }
++
++    @Override
++    public void disableSnapshotOnRepairedDataMismatch()
++    {
++        DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(false);
++    }
++
 +    static class PaxosBallotAndContention
 +    {
 +        final UUID ballot;
 +        final int contentions;
 +
 +        PaxosBallotAndContention(UUID ballot, int contentions)
 +        {
 +            this.ballot = ballot;
 +            this.contentions = contentions;
 +        }
 +
 +        @Override
 +        public final int hashCode()
 +        {
 +            int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode());
 +            return 31 * hashCode * this.contentions;
 +        }
 +
 +        @Override
 +        public final boolean equals(Object o)
 +        {
 +            if(!(o instanceof PaxosBallotAndContention))
 +                return false;
 +            PaxosBallotAndContention that = (PaxosBallotAndContention)o;
 +            // handles nulls properly
 +            return Objects.equals(ballot, that.ballot) && contentions == that.contentions;
 +        }
      }
  }
diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 514feb1,8678dde..e0d2c86
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@@ -66,26 -63,7 +66,30 @@@ public interface StorageProxyMBea
      public void setOtcBacklogExpirationInterval(int intervalInMillis);
  
      /** Returns each live node's schema version */
 -    public Map<String, List<String>> getSchemaVersions();
 +    @Deprecated public Map<String, List<String>> getSchemaVersions();
 +    public Map<String, List<String>> getSchemaVersionsWithPort();
  
      public int getNumberOfTables();
 +
 +    public String getIdealConsistencyLevel();
 +    public String setIdealConsistencyLevel(String cl);
 +
 +    /**
 +     * Tracking and reporting of variances in the repaired data set across replicas at read time
 +     */
 +    void enableRepairedDataTrackingForRangeReads();
 +    void disableRepairedDataTrackingForRangeReads();
 +    boolean getRepairedDataTrackingEnabledForRangeReads();
 +
 +    void enableRepairedDataTrackingForPartitionReads();
 +    void disableRepairedDataTrackingForPartitionReads();
 +    boolean getRepairedDataTrackingEnabledForPartitionReads();
 +
 +    void enableReportingUnconfirmedRepairedDataMismatches();
 +    void disableReportingUnconfirmedRepairedDataMismatches();
 +    boolean getReportingUnconfirmedRepairedDataMismatchesEnabled();
++
++    void enableSnapshotOnRepairedDataMismatch();
++    void disableSnapshotOnRepairedDataMismatch();
++    boolean getSnapshotOnRepairedDataMismatchEnabled();
  }
diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java
index 45bf918,0000000..251ee28
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@@ -1,280 -1,0 +1,280 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.Collections2;
 +
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.ReadResponse;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
 +import org.apache.cassandra.db.transform.Filter;
 +import org.apache.cassandra.db.transform.FilteredPartitions;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
 +import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
 +import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 +
 +import static com.google.common.collect.Iterables.*;
 +
 +public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
 +{
 +    private final boolean enforceStrictLiveness;
 +    private final ReadRepair<E, P> readRepair;
 +
 +    public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
 +    {
 +        super(command, replicaPlan, queryStartNanoTime);
 +        this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
 +        this.readRepair = readRepair;
 +    }
 +
 +    public PartitionIterator getData()
 +    {
 +        ReadResponse response = responses.get(0).payload;
 +        return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
 +    }
 +
 +    public boolean isDataPresent()
 +    {
 +        return !responses.isEmpty();
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public PartitionIterator resolve()
 +    {
 +        // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
 +        // at the beginning of this method), so grab the response count once and use that through the method.
 +        Collection<Message<ReadResponse>> messages = responses.snapshot();
 +        assert !any(messages, msg -> msg.payload.isDigestResponse());
 +
 +        E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from()), false);
 +        List<UnfilteredPartitionIterator> iters = new ArrayList<>(
 +        Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
 +        assert replicas.size() == iters.size();
 +
 +        // If requested, inspect each response for a digest of the replica's repaired data set
 +        RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
 +                                                  ? new RepairedDataTracker(getRepairedDataVerifier(command))
 +                                                  : null;
 +        if (repairedDataTracker != null)
 +        {
 +            messages.forEach(msg -> {
 +                if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from()).isFull())
 +                {
 +                    repairedDataTracker.recordDigest(msg.from(),
 +                                                     msg.payload.repairedDataDigest(),
 +                                                     msg.payload.isRepairedDigestConclusive());
 +                }
 +            });
 +        }
 +
 +        /*
 +         * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
 +         * have more rows than the client requested. To make sure that we still conform to the original limit,
 +         * we apply a top-level post-reconciliation counter to the merged partition iterator.
 +         *
 +         * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
 +         * to the current partition to work. For this reason we have to apply the counter transformation before
 +         * empty partition discard logic kicks in - for it will eagerly consume the iterator.
 +         *
 +         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
 +         *
 +         * See CASSANDRA-13747 for more details.
 +         */
 +        DataLimits.Counter mergedResultCounter =
 +            command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
 +
 +        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
 +                                                                          replicaPlan.getWithContacts(replicas),
 +                                                                          mergedResultCounter,
 +                                                                          repairedDataTracker);
 +        FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
 +        PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
 +        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
 +    }
 +
 +    protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
 +    {
-         return RepairedDataVerifier.simple(command);
++        return RepairedDataVerifier.verifier(command);
 +    }
 +
 +    private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
 +                                                                     P sources,
 +                                                                     DataLimits.Counter mergedResultCounter,
 +                                                                     RepairedDataTracker repairedDataTracker)
 +    {
 +        // If we have only one results, there is no read repair to do, we can't get short
 +        // reads and we can't make a comparison between repaired data sets
 +        if (results.size() == 1)
 +            return results.get(0);
 +
 +        /*
 +         * So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
 +         * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
 +         */
 +        if (!command.limits().isUnlimited())
 +            for (int i = 0; i < results.size(); i++)
 +                results.set(i, ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
 +
 +        return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
 +    }
 +
 +    private String makeResponsesDebugString(DecoratedKey partitionKey)
 +    {
 +        return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from() + " => " + m.payload.toDebugString(command, partitionKey)));
 +    }
 +
 +    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
 +                                                                         P sources,
 +                                                                         RepairedDataTracker repairedDataTracker)
 +    {
 +        // Avoid wrapping no-op listener as it doesn't throw, unless we're tracking repaired status
 +        // in which case we need to inject the tracker & verify on close
 +        if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP)
 +        {
 +            if (repairedDataTracker == null)
 +                return partitionListener;
 +
 +            return new UnfilteredPartitionIterators.MergeListener()
 +            {
 +
 +                public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +                {
 +                    return UnfilteredRowIterators.MergeListener.NOOP;
 +                }
 +
 +                public void close()
 +                {
 +                    repairedDataTracker.verify();
 +                }
 +            };
 +        }
 +
 +        return new UnfilteredPartitionIterators.MergeListener()
 +        {
 +            public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +            {
 +                UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions);
 +
 +                return new UnfilteredRowIterators.MergeListener()
 +                {
 +                    public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
 +                    {
 +                        try
 +                        {
 +                            rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           mergedDeletion == null ? "null" : mergedDeletion.toString(),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public void onMergedRows(Row merged, Row[] versions)
 +                    {
 +                        try
 +                        {
 +                            rowListener.onMergedRows(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? "null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
 +                    {
 +                        try
 +                        {
 +                            // The code for merging range tombstones is a tad complex and we had the assertions there triggered
 +                            // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
 +                            // when that happen without more context that what the assertion errors give us however, hence the
 +                            // catch here that basically gather as much as context as reasonable.
 +                            rowListener.onMergedRangeTombstoneMarkers(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? "null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +
 +                    }
 +
 +                    public void close()
 +                    {
 +                        rowListener.close();
 +                    }
 +                };
 +            }
 +
 +            public void close()
 +            {
 +                partitionListener.close();
 +                if (repairedDataTracker != null)
 +                    repairedDataTracker.verify();
 +            }
 +        };
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
index 816fe9f,0000000..bed240c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
@@@ -1,95 -1,0 +1,157 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
++import java.time.LocalDate;
++import java.time.format.DateTimeFormatter;
++import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.PartitionRangeReadCommand;
 +import org.apache.cassandra.db.ReadCommand;
- import org.apache.cassandra.db.SinglePartitionReadCommand;
++import org.apache.cassandra.db.SnapshotCommand;
++import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.TableMetrics;
++import org.apache.cassandra.net.Message;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.schema.TableId;
++import org.apache.cassandra.service.SnapshotVerbHandler;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +
 +public interface RepairedDataVerifier
 +{
 +    public void verify(RepairedDataTracker tracker);
 +
++    static RepairedDataVerifier verifier(ReadCommand command)
++    {
++        return DatabaseDescriptor.snapshotOnRepairedDataMismatch() ? snapshotting(command) : simple(command);
++    }
++
 +    static RepairedDataVerifier simple(ReadCommand command)
 +    {
 +        return new SimpleVerifier(command);
 +    }
 +
++    static RepairedDataVerifier snapshotting(ReadCommand command)
++    {
++        return new SnapshottingVerifier(command);
++    }
++
 +    static class SimpleVerifier implements RepairedDataVerifier
 +    {
 +        private static final Logger logger = LoggerFactory.getLogger(SimpleVerifier.class);
-         private final ReadCommand command;
++        protected final ReadCommand command;
 +
 +        private static final String INCONSISTENCY_WARNING = "Detected mismatch between repaired datasets for table {}.{} during read of {}. {}";
 +
 +        SimpleVerifier(ReadCommand command)
 +        {
 +            this.command = command;
 +        }
 +
 +        @Override
 +        public void verify(RepairedDataTracker tracker)
 +        {
 +            Tracing.trace("Verifying repaired data tracker {}", tracker);
 +
 +            // some mismatch occurred between the repaired datasets on the replicas
 +            if (tracker.digests.keySet().size() > 1)
 +            {
 +                // if any of the digests should be considered inconclusive, because there were
 +                // pending repair sessions which had not yet been committed or unrepaired partition
 +                // deletes which meant some sstables were skipped during reads, mark the inconsistency
 +                // as confirmed
 +                if (tracker.inconclusiveDigests.isEmpty())
 +                {
 +                    TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
 +                    metrics.confirmedRepairedInconsistencies.mark();
 +                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
 +                                     INCONSISTENCY_WARNING, command.metadata().keyspace,
-                                      command.metadata().name, getCommandString(), tracker);
++                                     command.metadata().name, command.toString(), tracker);
 +                }
 +                else if (DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
 +                {
 +                    TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
 +                    metrics.unconfirmedRepairedInconsistencies.mark();
 +                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
 +                                     INCONSISTENCY_WARNING, command.metadata().keyspace,
-                                      command.metadata().name, getCommandString(), tracker);
++                                     command.metadata().name, command.toString(), tracker);
 +                }
 +            }
 +        }
++    }
++
++    static class SnapshottingVerifier extends SimpleVerifier
++    {
++        private static final Logger logger = LoggerFactory.getLogger(SnapshottingVerifier.class);
++        private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE;
++        private static final String SNAPSHOTTING_WARNING = "Issuing snapshot command for mismatch between repaired datasets for table {}.{} during read of {}. {}";
++
++        // Issue at most 1 snapshot request per minute for any given table.
++        // Replicas will only create one snapshot per day, but this stops us
++        // from swamping the network if we start seeing mismatches.
++        private static final long SNAPSHOT_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
++        private static final ConcurrentHashMap<TableId, AtomicLong> LAST_SNAPSHOT_TIMES = new ConcurrentHashMap<>();
++
++        SnapshottingVerifier(ReadCommand command)
++        {
++            super(command);
++        }
 +
-         private String getCommandString()
++        public void verify(RepairedDataTracker tracker)
 +        {
-             return command instanceof SinglePartitionReadCommand
-                    ? ((SinglePartitionReadCommand)command).partitionKey().toString()
-                    : ((PartitionRangeReadCommand)command).dataRange().keyRange().getString(command.metadata().partitionKeyType);
++            super.verify(tracker);
++            if (tracker.digests.keySet().size() > 1)
++            {
++                if (tracker.inconclusiveDigests.isEmpty() ||  DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
++                {
++                    long now = System.nanoTime();
++                    AtomicLong cached = LAST_SNAPSHOT_TIMES.computeIfAbsent(command.metadata().id, u -> new AtomicLong(0));
++                    long last = cached.get();
++                    if (now - last > SNAPSHOT_INTERVAL_NANOS && cached.compareAndSet(last, now))
++                    {
++                        logger.warn(SNAPSHOTTING_WARNING, command.metadata().keyspace, command.metadata().name, command.toString(), tracker);
++                        Message<SnapshotCommand> msg = Message.out(Verb.SNAPSHOT_REQ,
++                                                                   new SnapshotCommand(command.metadata().keyspace,
++                                                                                       command.metadata().name,
++                                                                                       getSnapshotName(),
++                                                                                       false));
++                        for (InetAddressAndPort replica : tracker.digests.values())
++                            MessagingService.instance().send(msg, replica);
++                    }
++                }
++            }
++        }
 +
++        public static String getSnapshotName()
++        {
++            return String.format("%s%s",
++                                 SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX,
++                                 DATE_FORMAT.format(LocalDate.now()));
 +        }
 +    }
 +}
++
diff --cc test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 30c8f25,0000000..70b40bc
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@@ -1,281 -1,0 +1,383 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
++import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.google.common.collect.ImmutableList;
++import com.google.common.util.concurrent.Uninterruptibles;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICoordinator;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.distributed.api.IMessage;
 +import org.apache.cassandra.distributed.api.IMessageFilters;
 +import org.apache.cassandra.distributed.shared.RepairResult;
 +import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.repair.messages.RepairOption;
++import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.SimpleCondition;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +public class PreviewRepairTest extends TestBaseImpl
 +{
 +    /**
 +     * makes sure that the repaired sstables are not matching on the two
 +     * nodes by disabling autocompaction on node2 and then running an
 +     * incremental repair
 +     */
 +    @Test
 +    public void testWithMismatchingPending() throws Throwable
 +    {
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            // make sure that all sstables have moved to repaired by triggering a compaction
 +            // also disables autocompaction on the nodes
 +            cluster.forEach((node) -> node.runOnInstance(() -> {
 +                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
 +                FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
 +                cfs.disableAutoCompaction();
 +            }));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            // now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired
 +            cluster.get(1).runOnInstance(() -> {
 +                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
 +                cfs.enableAutoCompaction();
 +                FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
 +            });
 +            RepairResult rs = cluster.get(1).callOnInstance(repair(options(true)));
 +            assertTrue(rs.success); // preview repair should succeed
 +            assertFalse(rs.wasInconsistent); // and we should see no mismatches
 +        }
 +    }
 +
 +    /**
 +     * another case where the repaired datasets could mismatch is if an incremental repair finishes just as the preview
 +     * repair is starting up.
 +     *
 +     * This tests this case:
 +     * 1. we start a preview repair
 +     * 2. pause the validation requests from node1 -> node2
 +     * 3. node1 starts its validation
 +     * 4. run an incremental repair which completes fine
 +     * 5. node2 resumes its validation
 +     *
 +     * Now we will include sstables from the second incremental repair on node2 but not on node1
 +     * This should fail since we fail any preview repair which is ongoing when an incremental repair finishes (step 4 above)
 +     */
 +    @Test
 +    public void testFinishingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
 +    {
 +        ExecutorService es = Executors.newSingleThreadExecutor();
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            SimpleCondition continuePreviewRepair = new SimpleCondition();
 +            DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
 +            // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
 +            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 +
 +            Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
 +            Thread.sleep(1000);
 +            // this needs to finish before the preview repair is unpaused on node2
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            continuePreviewRepair.signalAll();
 +            RepairResult rs = rsFuture.get();
 +            assertFalse(rs.success); // preview repair should have failed
 +            assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
 +    /**
 +     * Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair
 +     * so both preview and incremental repair should finish fine (without any mismatches)
 +     */
 +    @Test
 +    public void testFinishingNonIntersectingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
 +    {
 +        ExecutorService es = Executors.newSingleThreadExecutor();
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +            assertTrue(cluster.get(1).callOnInstance(repair(options(false))).success);
 +
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            // pause preview repair validation messages on node2 until node1 has finished
 +            SimpleCondition continuePreviewRepair = new SimpleCondition();
 +            DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
 +            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 +
 +            // get local ranges to repair two separate ranges:
 +            List<String> localRanges = cluster.get(1).callOnInstance(() -> {
 +                List<String> res = new ArrayList<>();
 +                for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
 +                    res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
 +                return res;
 +            });
 +
 +            assertEquals(2, localRanges.size());
 +            Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
 +            Thread.sleep(1000); // wait for node1 to start validation compaction
 +            // this needs to finish before the preview repair is unpaused on node2
 +            assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).success);
 +
 +            continuePreviewRepair.signalAll();
 +            RepairResult rs = repairStatusFuture.get();
 +            assertTrue(rs.success); // repair should succeed
 +            assertFalse(rs.wasInconsistent); // and no mismatches
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
-     private static class DelayMessageFilter implements IMessageFilters.Matcher
++    @Test
++    public void snapshotTest() throws IOException, InterruptedException
++    {
++        try(Cluster cluster = init(Cluster.build(3).withConfig(config ->
++                                                               config.set("snapshot_on_repaired_data_mismatch", true)
++                                                                     .with(GOSSIP)
++                                                                     .with(NETWORK))
++                                          .start()))
++        {
++            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
++            cluster.schemaChange("create table " + KEYSPACE + ".tbl2 (id int primary key, t int)");
++            Thread.sleep(1000);
++
++            // populate 2 tables
++            insert(cluster.coordinator(1), 0, 100, "tbl");
++            insert(cluster.coordinator(1), 0, 100, "tbl2");
++            cluster.forEach((n) -> n.flush(KEYSPACE));
++
++            // make sure everything is marked repaired
++            cluster.get(1).callOnInstance(repair(options(false)));
++            waitMarkedRepaired(cluster);
++            // make node2 mismatch
++            unmarkRepaired(cluster.get(2), "tbl");
++            verifySnapshots(cluster, "tbl", true);
++            verifySnapshots(cluster, "tbl2", true);
++
++            AtomicInteger snapshotMessageCounter = new AtomicInteger();
++            cluster.filters().verbs(Verb.SNAPSHOT_REQ.id).messagesMatching((from, to, message) -> {
++                snapshotMessageCounter.incrementAndGet();
++                return false;
++            }).drop();
++            cluster.get(1).callOnInstance(repair(options(true)));
++            verifySnapshots(cluster, "tbl", false);
++            // tbl2 should not have a mismatch, so the snapshots should be empty here
++            verifySnapshots(cluster, "tbl2", true);
++            assertEquals(3, snapshotMessageCounter.get());
++
++            // and make sure that we don't try to snapshot again
++            snapshotMessageCounter.set(0);
++            cluster.get(3).callOnInstance(repair(options(true)));
++            assertEquals(0, snapshotMessageCounter.get());
++        }
++    }
++
++    private void waitMarkedRepaired(Cluster cluster)
++    {
++        cluster.forEach(node -> node.runOnInstance(() -> {
++            for (String table : Arrays.asList("tbl", "tbl2"))
++            {
++                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++                while (true)
++                {
++                    if (cfs.getLiveSSTables().stream().allMatch(SSTableReader::isRepaired))
++                        return;
++                    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++                }
++            }
++        }));
++    }
++
++    private void unmarkRepaired(IInvokableInstance instance, String table)
++    {
++        instance.runOnInstance(() -> {
++            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++            try
++            {
++                cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
++            }
++            catch (IOException e)
++            {
++                throw new RuntimeException(e);
++            }
++        });
++    }
++
++    private void verifySnapshots(Cluster cluster, String table, boolean shouldBeEmpty)
++    {
++        cluster.forEach(node -> node.runOnInstance(() -> {
++            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++            if(shouldBeEmpty)
++            {
++                assertTrue(cfs.getSnapshotDetails().isEmpty());
++            }
++            else
++            {
++                while (cfs.getSnapshotDetails().isEmpty())
++                    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++            }
++        }));
++    }
++
++    static class DelayMessageFilter implements IMessageFilters.Matcher
 +    {
 +        private final SimpleCondition condition;
 +        private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
 +
 +        public DelayMessageFilter(SimpleCondition condition)
 +        {
 +            this.condition = condition;
 +        }
 +        public boolean matches(int from, int to, IMessage message)
 +        {
 +            try
 +            {
 +                // only the first validation req should be delayed:
 +                if (waitForRepair.compareAndSet(true, false))
 +                    condition.await();
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            return false; // don't drop the message
 +        }
 +    }
 +
 +    private static void insert(ICoordinator coordinator, int start, int count)
 +    {
++        insert(coordinator, start, count, "tbl");
++    }
++
++    static void insert(ICoordinator coordinator, int start, int count, String table)
++    {
 +        for (int i = start; i < start + count; i++)
-             coordinator.execute("insert into " + KEYSPACE + ".tbl (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
++            coordinator.execute("insert into " + KEYSPACE + "." + table + " (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
 +    }
 +
 +    /**
 +     * returns a pair with [repair success, was inconsistent]
 +     */
 +    private static IIsolatedExecutor.SerializableCallable<RepairResult> repair(Map<String, String> options)
 +    {
 +        return () -> {
 +            SimpleCondition await = new SimpleCondition();
 +            AtomicBoolean success = new AtomicBoolean(true);
 +            AtomicBoolean wasInconsistent = new AtomicBoolean(false);
 +            StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
 +                if (event.getType() == ProgressEventType.ERROR)
 +                {
 +                    success.set(false);
 +                    await.signalAll();
 +                }
 +                else if (event.getType() == ProgressEventType.NOTIFICATION && event.getMessage().contains("Repaired data is inconsistent"))
 +                {
 +                    wasInconsistent.set(true);
 +                }
 +                else if (event.getType() == ProgressEventType.COMPLETE)
 +                    await.signalAll();
 +            }));
 +            try
 +            {
 +                await.await(1, TimeUnit.MINUTES);
 +            }
 +            catch (InterruptedException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            return new RepairResult(success.get(), wasInconsistent.get());
 +        };
 +    }
 +
 +    private static Map<String, String> options(boolean preview)
 +    {
 +        Map<String, String> config = new HashMap<>();
 +        config.put(RepairOption.INCREMENTAL_KEY, "true");
 +        config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString());
 +        if (preview)
 +            config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
 +        return config;
 +    }
 +
 +    private static Map<String, String> options(boolean preview, String range)
 +    {
 +        Map<String, String> options = options(preview);
 +        options.put(RepairOption.RANGES_KEY, range);
 +        return options;
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 4e44543,0000000..664c99d
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@@ -1,205 -1,0 +1,316 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
- import java.io.Serializable;
++import java.time.LocalDate;
++import java.time.format.DateTimeFormatter;
 +import java.util.EnumSet;
++import java.util.Iterator;
 +import java.util.Map;
 +import java.util.concurrent.TimeUnit;
 +
++import com.google.common.util.concurrent.Uninterruptibles;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
++import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
++import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.service.ActiveRepairService;
++import org.apache.cassandra.service.SnapshotVerbHandler;
 +import org.apache.cassandra.service.StorageProxy;
 +
- public class RepairDigestTrackingTest extends TestBaseImpl implements Serializable// TODO: why serializable?
++public class RepairDigestTrackingTest extends TestBaseImpl
 +{
++    private static final String TABLE = "tbl";
++    private static final String KS_TABLE = KEYSPACE + "." + TABLE;
 +
 +    @Test
 +    public void testInconsistenciesFound() throws Throwable
 +    {
 +        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
 +        {
 +
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +            });
 +
-             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
++            cluster.schemaChange("CREATE TABLE " + KS_TABLE+ " (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
 +            for (int i = 0; i < 10; i++)
 +            {
-                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)",
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
- 
-             cluster.get(1).runOnInstance(() ->
-                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
-             );
-             cluster.get(2).runOnInstance(() ->
-                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
-             );
++            cluster.forEach(i -> i.flush(KEYSPACE));
 +
 +            for (int i = 10; i < 20; i++)
 +            {
-                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)",
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
++            cluster.forEach(i -> i.flush(KEYSPACE));
++            cluster.forEach(i -> assertNotRepaired());
++
++            // mark everything on node 2 repaired
++            cluster.get(2).runOnInstance(markAllRepaired());
++            cluster.get(2).runOnInstance(assertRepaired());
 +
-             cluster.get(1).runOnInstance(() ->
-                                          Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
-             );
-             cluster.get(2).runOnInstance(() ->
-                                          Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
-             );
- 
-             cluster.get(1).runOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
-             );
-             cluster.get(2).runOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
-             );
- 
-             cluster.get(2).runOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::markRepaired)
-             );
- 
- 
-             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
-             cluster.get(1).runOnInstance(() ->
-               Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
-             );
-             cluster.get(2).runOnInstance(() ->
-               Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertRepaired)
-             );
- 
-             long ccBefore = cluster.get(1).callOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
-             );
- 
-             cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
-             long ccAfter = cluster.get(1).callOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
-             );
++            // insert more data on node1 to generate an initial mismatch
++            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
++            cluster.get(1).runOnInstance(assertNotRepaired());
 +
++            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
++            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
++            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
 +            Assert.assertEquals("confirmed count should differ by 1 after range read", ccBefore + 1, ccAfter);
 +        }
 +    }
 +
 +    @Test
 +    public void testPurgeableTombstonesAreIgnored() throws Throwable
 +    {
 +        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
 +        {
- 
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +            });
 +
-             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
++            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
 +            // on node1 only insert some tombstones, then flush
 +            for (int i = 0; i < 10; i++)
 +            {
-                 cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
++                cluster.get(1).executeInternal("DELETE v1 FROM " + KS_TABLE + " USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
 +            }
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
++            cluster.get(1).flush(KEYSPACE);
 +
 +            // insert data on both nodes and flush
 +            for (int i = 0; i < 10; i++)
 +            {
-                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
-             cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
++            cluster.forEach(i -> i.flush(KEYSPACE));
 +
 +            // nothing is repaired yet
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
-             cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
++            cluster.forEach(i -> assertNotRepaired());
 +            // mark everything repaired
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
-             cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
-             cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
++            cluster.forEach(i -> markAllRepaired());
++            cluster.forEach(i -> assertRepaired());
 +
 +            // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
 +            for (int i = 0; i < 10; i++)
 +            {
-                 cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
++                cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
 +            }
 +
-             long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
++            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
 +            // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
 +            TimeUnit.SECONDS.sleep(2);
-             cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL);
-             long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
++            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
++            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
 +
 +            Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
 +        }
 +    }
 +
-     private void assertNotRepaired(SSTableReader reader) {
-         Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE);
-     }
++    @Test
++    public void testSnapshottingOnInconsistency() throws Throwable
++    {
++        try (Cluster cluster = init(Cluster.create(2)))
++        {
++            cluster.get(1).runOnInstance(() -> {
++                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
++            });
++
++            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v INT, PRIMARY KEY (k,c))");
++            for (int i = 0; i < 10; i++)
++            {
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
++                                               ConsistencyLevel.ALL, i, i);
++            }
++            cluster.forEach(c -> c.flush(KEYSPACE));
 +
-     private void assertRepaired(SSTableReader reader) {
-         Assert.assertTrue("repaired at is not set for sstable: " + reader.descriptor, getRepairedAt(reader) > 0);
++            for (int i = 10; i < 20; i++)
++            {
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
++                                               ConsistencyLevel.ALL, i, i);
++            }
++            cluster.forEach(c -> c.flush(KEYSPACE));
++            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
++            // Mark everything repaired on node2
++            cluster.get(2).runOnInstance(markAllRepaired());
++            cluster.get(2).runOnInstance(assertRepaired());
++
++            // now overwrite on node1 only to generate digest mismatches
++            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 55);
++            cluster.get(1).runOnInstance(assertNotRepaired());
++
++            // Execute a partition read and assert inconsistency is detected (as nothing is repaired on node1)
++            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
++            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
++            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
++            Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 1, ccAfter);
++
++            String snapshotName = SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX
++                                  + DateTimeFormatter.BASIC_ISO_DATE.format(LocalDate.now());
++
++            cluster.forEach(i -> i.runOnInstance(assertSnapshotNotPresent(snapshotName)));
++
++            // re-introduce a mismatch, enable snapshotting and try again
++            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 555);
++            cluster.get(1).runOnInstance(() -> {
++                StorageProxy.instance.enableSnapshotOnRepairedDataMismatch();
++            });
++
++            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
++            ccAfter = getConfirmedInconsistencies(cluster.get(1));
++            Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 2, ccAfter);
++
++            cluster.forEach(i -> i.runOnInstance(assertSnapshotPresent(snapshotName)));
++        }
 +    }
 +
-     private long getRepairedAt(SSTableReader reader)
++    private IIsolatedExecutor.SerializableRunnable assertNotRepaired()
 +    {
-         Descriptor descriptor = reader.descriptor;
-         try
++        return () ->
 +        {
-             Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
-                                                                       .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++            try
++            {
++                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++                                                           .getColumnFamilyStore(TABLE)
++                                                           .getLiveSSTables()
++                                                           .iterator();
++                while (sstables.hasNext())
++                {
++                    SSTableReader sstable = sstables.next();
++                    Descriptor descriptor = sstable.descriptor;
++                    Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
++                                                                              .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++
++                    StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
++                    Assert.assertEquals("repaired at is set for sstable: " + descriptor,
++                                        stats.repairedAt,
++                                        ActiveRepairService.UNREPAIRED_SSTABLE);
++                }
++            } catch (IOException e) {
++                throw new RuntimeException(e);
++            }
++        };
++    }
 +
-             StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
-             return stats.repairedAt;
-         } catch (IOException e) {
-             throw new RuntimeException(e);
-         }
++    private IIsolatedExecutor.SerializableRunnable markAllRepaired()
++    {
++        return () ->
++        {
++            try
++            {
++                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++                                                           .getColumnFamilyStore(TABLE)
++                                                           .getLiveSSTables()
++                                                           .iterator();
++                while (sstables.hasNext())
++                {
++                    SSTableReader sstable = sstables.next();
++                    Descriptor descriptor = sstable.descriptor;
++                    descriptor.getMetadataSerializer()
++                              .mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
++                    sstable.reloadSSTableMetadata();
++                }
++            } catch (IOException e) {
++                throw new RuntimeException(e);
++            }
++        };
++    }
 +
++    private IIsolatedExecutor.SerializableRunnable assertRepaired()
++    {
++        return () ->
++        {
++            try
++            {
++                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++                                                           .getColumnFamilyStore(TABLE)
++                                                           .getLiveSSTables()
++                                                           .iterator();
++                while (sstables.hasNext())
++                {
++                    SSTableReader sstable = sstables.next();
++                    Descriptor descriptor = sstable.descriptor;
++                    Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
++                                                                              .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++
++                    StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
++                    Assert.assertTrue("repaired at is not set for sstable: " + descriptor, stats.repairedAt > 0);
++                }
++            }
++            catch (IOException e)
++            {
++                throw new RuntimeException(e);
++            }
++        };
 +    }
 +
-     private void markRepaired(SSTableReader reader) {
-         Descriptor descriptor = reader.descriptor;
-         try
++    private IInvokableInstance.SerializableRunnable assertSnapshotPresent(String snapshotName)
++    {
++        return () ->
 +        {
-             descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
-             reader.reloadSSTableMetadata();
-         } catch (IOException e) {
-             throw new RuntimeException(e);
-         }
++            // snapshots are taken asynchronously, this is crude but it gives it a chance to happen
++            int attempts = 100;
++            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
++
++            while (cfs.getSnapshotDetails().isEmpty())
++            {
++                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++                if (attempts-- < 0)
++                    throw new AssertionError(String.format("Snapshot %s not found for for %s", snapshotName, KS_TABLE));
++            }
++        };
++    }
 +
++    private IInvokableInstance.SerializableRunnable assertSnapshotNotPresent(String snapshotName)
++    {
++        return () ->
++        {
++            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
++            Assert.assertFalse(cfs.snapshotExists(snapshotName));
++        };
 +    }
 +
++    private long getConfirmedInconsistencies(IInvokableInstance instance)
++    {
++        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
++                                                     .getColumnFamilyStore(TABLE)
++                                             .metric
++                                             .confirmedRepairedInconsistencies
++                                             .table
++                                             .getCount());
++    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 226331c,75e5ba9..a547c76
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -1,23 -1,7 +1,25 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
  package org.apache.cassandra.distributed.test;
  
+ import java.util.Set;
+ 
  import org.junit.Assert;
  import org.junit.Test;
  
@@@ -27,17 -10,14 +29,18 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.ConsistencyLevel;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.metrics.ReadRepairMetrics;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
  
 -import static org.junit.Assert.assertEquals;
 -
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
  import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 +import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
 +import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
 +import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP;
 +import static org.junit.Assert.fail;
  
  // TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
 -public class SimpleReadWriteTest extends SharedClusterTestBase
 +public class SimpleReadWriteTest extends TestBaseImpl
  {
      @Test
      public void coordinatorReadTest() throws Throwable
@@@ -392,24 -258,118 +395,122 @@@
      @Test
      public void metricsCountQueriesTest() throws Throwable
      {
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 -        for (int i = 0; i < 100; i++)
 -            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
 -
 -        long readCount1 = readCount((IInvokableInstance) cluster.get(1));
 -        long readCount2 = readCount((IInvokableInstance) cluster.get(2));
 -        for (int i = 0; i < 100; i++)
 -            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
 -
 -        readCount1 = readCount((IInvokableInstance) cluster.get(1)) - readCount1;
 -        readCount2 = readCount((IInvokableInstance) cluster.get(2)) - readCount2;
 -        assertEquals(readCount1, readCount2);
 -        assertEquals(100, readCount1);
 +        try (ICluster<IInvokableInstance> cluster = init(Cluster.create(2)))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +            for (int i = 0; i < 100; i++)
 +                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
 +
 +            long readCount1 = readCount(cluster.get(1));
 +            long readCount2 = readCount(cluster.get(2));
 +            for (int i = 0; i < 100; i++)
 +                cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
 +
 +            readCount1 = readCount(cluster.get(1)) - readCount1;
 +            readCount2 = readCount(cluster.get(2)) - readCount2;
 +            Assert.assertEquals(readCount1, readCount2);
 +            Assert.assertEquals(100, readCount1);
 +        }
      }
  
+ 
+     @Test
+     public void skippedSSTableWithPartitionDeletionTest() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(2)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+             // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+             cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+             // and a row from a different partition, to provide the sstable's min/max clustering
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2");
+             cluster.get(1).flush(KEYSPACE);
+             // expect a single sstable, where minTimestamp equals the timestamp of the partition delete
+             cluster.get(1).runOnInstance(() -> {
+                 Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
+                                                       .getColumnFamilyStore("tbl")
+                                                       .getLiveSSTables();
 -                assertEquals(1, sstables.size());
 -                assertEquals(1, sstables.iterator().next().getMinTimestamp());
++                assertEquals("Expected a single sstable, but found " + sstables.size(), 1, sstables.size());
++                long minTimestamp = sstables.iterator().next().getMinTimestamp();
++                assertEquals("Expected min timestamp of 1, but was " + minTimestamp, 1, minTimestamp);
+             });
+ 
+             // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+ 
+ 
+             Object[][] rows = cluster.coordinator(1)
+                                      .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+                                               ConsistencyLevel.ALL);
 -            assertEquals(0, rows.length);
++            assertEquals("Expected 0 rows, but found " + rows.length, 0, rows.length);
+         }
+     }
+ 
+     @Test
+     public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(2)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+             // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+             cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+             // and a row from a different partition, to provide the sstable's min/max clustering
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1");
+             cluster.get(1).flush(KEYSPACE);
+             // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+             // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly,
+             // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first
+             // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+             // know what data and/or tombstones are present on other nodes
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+             cluster.get(1).flush(KEYSPACE);
+ 
+             // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+ 
+             Object[][] rows = cluster.coordinator(1)
+                                      .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+                                               ConsistencyLevel.ALL);
+             // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+             // node 1 (0, 6, 6) was not.
+             assertRows(rows, new Object[] {0, 6 ,6});
+         }
+     }
+ 
+     @Test
+     public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable
+     {
+         // don't not add skipped sstables back just because the partition delete ts is < the local min ts
+ 
+         try (Cluster cluster = init(Cluster.create(2)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+             // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+             cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+             // and a row from a different partition, to provide the sstable's min/max clustering
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3");
+             cluster.get(1).flush(KEYSPACE);
+             // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+             // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable
+             // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the
+             // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would  cause the
+             // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+             // know what data and/or tombstones are present on other nodes
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+             cluster.get(1).flush(KEYSPACE);
+ 
+             // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+ 
+             Object[][] rows = cluster.coordinator(1)
+                                      .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+                                               ConsistencyLevel.ALL);
+             // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+             // node 1 (0, 6, 6) was not.
+             assertRows(rows, new Object[] {0, 6 ,6});
+         }
+     }
+ 
      private long readCount(IInvokableInstance instance)
      {
          return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
diff --cc test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
index c916d13,0000000..169e09d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
@@@ -1,291 -1,0 +1,293 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.net.UnknownHostException;
 +import java.util.Random;
 +
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
++import org.apache.cassandra.db.Slices;
++import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class RepairedDataVerifierTest
 +{
 +    private static final String TEST_NAME = "read_command_vh_test_";
 +    private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
 +    private static final String TABLE = "table1";
 +
 +    private final Random random = new Random();
 +    private TableMetadata metadata;
 +    private TableMetrics metrics;
 +
 +    // counter to generate the last byte of peer addresses
 +    private int addressSuffix = 10;
 +
 +    @BeforeClass
 +    public static void init()
 +    {
 +        SchemaLoader.loadSchema();
 +        SchemaLoader.schemaDefinition(TEST_NAME);
 +        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
 +    }
 +
 +    @Before
 +    public void setup()
 +    {
 +        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
 +        metrics = ColumnFamilyStore.metricsFor(metadata.id);
 +    }
 +
 +    @Test
 +    public void repairedDataMismatchWithSomeConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount + 1 , unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMismatchWithNoneConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount + 1 , unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMismatchWithAllConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount + 1, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMatchesWithAllConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMatchesWithSomeConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMatchesWithNoneConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void allEmptyDigestWithAllConclusive()
 +    {
 +        // if a read didn't touch any repaired sstables, digests will be empty
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void allEmptyDigestsWithSomeConclusive()
 +    {
 +        // if a read didn't touch any repaired sstables, digests will be empty
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void allEmptyDigestsWithNoneConclusive()
 +    {
 +        // if a read didn't touch any repaired sstables, digests will be empty
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
 +        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void noTrackingDataRecorded()
 +    {
 +        // if a read didn't land on any replicas which support repaired data tracking, nothing will be recorded
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    private long confirmedCount()
 +    {
 +        return metrics.confirmedRepairedInconsistencies.table.getCount();
 +    }
 +
 +    private long unconfirmedCount()
 +    {
 +        return metrics.unconfirmedRepairedInconsistencies.table.getCount();
 +    }
 +
 +    private InetAddressAndPort peer()
 +    {
 +        try
 +        {
 +            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
 +        }
 +        catch (UnknownHostException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private int key()
 +    {
 +        return random.nextInt();
 +    }
 +
 +    private ReadCommand command(int key)
 +    {
 +        return new StubReadCommand(key, metadata, false);
 +    }
 +
 +    private static class StubReadCommand extends SinglePartitionReadCommand
 +    {
 +        StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
 +        {
 +            super(isDigest,
 +                  0,
 +                  false,
 +                  metadata,
 +                  FBUtilities.nowInSeconds(),
 +                  ColumnFilter.all(metadata),
 +                  RowFilter.NONE,
 +                  DataLimits.NONE,
 +                  metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
-                   null,
++                  new ClusteringIndexSliceFilter(Slices.ALL, false),
 +                  null);
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org