You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/04/16 17:44:52 UTC

[cassandra] 01/02: Merge branch 'cassandra-3.11' into trunk

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit b3ecbf38a3b9bd7dbcafb5caccacda4bc7d356a0
Merge: 781e486 d6beb01
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Thu Apr 16 18:29:05 2020 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   6 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 +
 .../cassandra/db/SinglePartitionReadCommand.java   |  83 +++---
 .../apache/cassandra/repair/RepairRunnable.java    |  76 +++++-
 .../cassandra/service/SnapshotVerbHandler.java     |  67 +++++
 .../org/apache/cassandra/service/StorageProxy.java |  18 ++
 .../cassandra/service/StorageProxyMBean.java       |   4 +
 .../cassandra/service/reads/DataResolver.java      |   2 +-
 .../service/reads/repair/RepairedDataVerifier.java |  80 +++++-
 .../distributed/test/PreviewRepairTest.java        | 106 +++++++-
 .../distributed/test/RepairDigestTrackingTest.java | 285 ++++++++++++++-------
 .../distributed/test/SimpleReadWriteTest.java      | 101 ++++++++
 .../reads/repair/RepairedDataVerifierTest.java     |   4 +-
 14 files changed, 697 insertions(+), 146 deletions(-)

diff --cc CHANGES.txt
index 8f19c2a,dbdb779..96eeed4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,53 -1,9 +1,54 @@@
 -3.11.7
 +4.0-alpha4
 + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
 + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
 + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
 + * Improve logging around incremental repair (CASSANDRA-15599)
 + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688)
 + * Replace array iterators with get by index (CASSANDRA-15394)
 + * Minimize BTree iterator allocations (CASSANDRA-15389)
 + * Add client request size server metrics (CASSANDRA-15704)
 + * Add additional logging around FileUtils and compaction leftover cleanup (CASSANDRA-15705)
 + * Mark system_views/system_virtual_schema as non-alterable keyspaces in cqlsh (CASSANDRA-15711)
 + * Fail incremental repair if an old version sstable is involved (CASSANDRA-15612)
 + * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773)
 + * Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706)
 + * Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390)
 + * Repair history tables should have TTL and TWCS (CASSANDRA-12701)
 + * Fix cqlsh erroring out on Python 3.7 due to webbrowser module being absent (CASSANDRA-15572)
 + * Fix IMH#acquireCapacity() to return correct Outcome when endpoint reserve runs out (CASSANDRA-15607)
 + * Fix nodetool describering output (CASSANDRA-15682)
 + * Only track ideal CL failure when request CL met (CASSANDRA-15696)
 + * Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and ConsistentSession (CASSANDRA-15672)
 + * Fix force compaction of wrapping ranges (CASSANDRA-15664)
 + * Expose repair streaming metrics (CASSANDRA-15656)
 + * Set now in seconds in the future for validation repairs (CASSANDRA-15655)
 + * Emit metric on preview repair failure (CASSANDRA-15654)
 + * Use more appropriate logging levels (CASSANDRA-15661)
 + * Fixed empty check in TrieMemIndex due to potential state inconsistency in ConcurrentSkipListMap (CASSANDRA-15526)
 + * Added UnleveledSSTables global and table level metric (CASSANDRA-15620)
 + * Added Virtual Table exposing Cassandra relevant system properties (CASSANDRA-15616, CASSANDRA-15643)
 + * Improve the algorithmic token allocation in case racks = RF (CASSANDRA-15600)
 + * Fix ConnectionTest.testAcquireReleaseOutbound (CASSANDRA-15308)
 + * Include finalized pending sstables in preview repair (CASSANDRA-15553)
 + * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE (CASSANDRA-15271)
 + * Correct inaccurate logging message (CASSANDRA-15549)
 + * Unset GREP_OPTIONS (CASSANDRA-14487)
 + * Update to Python driver 3.21 for cqlsh (CASSANDRA-14872)
 + * Fix missing Keyspaces in cqlsh describe output (CASSANDRA-15576)
 + * Fix multi DC nodetool status output (CASSANDRA-15305)
 + * updateCoordinatorWriteLatencyTableMetric can produce misleading metrics (CASSANDRA-15569)
 + * Make cqlsh and cqlshlib Python 2 & 3 compatible (CASSANDRA-10190)
 + * Improve the description of nodetool listsnapshots command (CASSANDRA-14587)
 + * allow embedded cassandra launched from a one-jar or uno-jar (CASSANDRA-15494)
 + * Update hppc library to version 0.8.1 (CASSANDRA-12995)
 + * Limit the dependencies used by UDFs/UDAs (CASSANDRA-14737)
 + * Make native_transport_max_concurrent_requests_in_bytes updatable (CASSANDRA-15519)
 + * Cleanup and improvements to IndexInfo/ColumnIndex (CASSANDRA-15469)
 + * Potential Overflow in DatabaseDescriptor Functions That Convert Between KB/MB & Bytes (CASSANDRA-15470)
 +Merged from 3.11:
   * Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
  Merged from 3.0:
 -=======
 -3.0.21
+  * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690)
   * Memtable memory allocations may deadlock (CASSANDRA-15367)
   * Run evictFromMembership in GossipStage (CASSANDRA-15592)
  Merged from 2.2:
diff --cc src/java/org/apache/cassandra/config/Config.java
index 2792694,7f28546..957662a
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -415,69 -387,6 +415,75 @@@ public class Confi
      public volatile boolean back_pressure_enabled = false;
      public volatile ParameterizedClass back_pressure_strategy;
  
 +    public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
 +    public int repair_command_pool_size = concurrent_validations;
 +
 +    /**
 +     * When a node first starts up it intially considers all other peers as DOWN and is disconnected from all of them.
 +     * To be useful as a coordinator (and not introduce latency penalties on restart) this node must have successfully
 +     * opened all three internode TCP connections (gossip, small, and large messages) before advertising to clients.
 +     * Due to this, by default, Casssandra will prime these internode TCP connections and wait for all but a single
 +     * node to be DOWN/disconnected in the local datacenter before offering itself as a coordinator, subject to a
 +     * timeout. See CASSANDRA-13993 and CASSANDRA-14297 for more details.
 +     *
 +     * We provide two tunables to control this behavior as some users may want to block until all datacenters are
 +     * available (global QUORUM/EACH_QUORUM), some users may not want to block at all (clients that already work
 +     * around the problem), and some users may want to prime the connections but not delay startup.
 +     *
 +     * block_for_peers_timeout_in_secs: controls how long this node will wait to connect to peers. To completely disable
 +     * any startup connectivity checks set this to -1. To trigger the internode connections but immediately continue
 +     * startup, set this to to 0. The default is 10 seconds.
 +     *
 +     * block_for_peers_in_remote_dcs: controls if this node will consider remote datacenters to wait for. The default
 +     * is to _not_ wait on remote datacenters.
 +     */
 +    public int block_for_peers_timeout_in_secs = 10;
 +    public boolean block_for_peers_in_remote_dcs = false;
 +
 +    public volatile boolean automatic_sstable_upgrade = false;
 +    public volatile int max_concurrent_automatic_sstable_upgrades = 1;
 +    public boolean stream_entire_sstables = true;
 +
 +    public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
 +    public volatile FullQueryLoggerOptions full_query_logging_options = new FullQueryLoggerOptions();
 +
 +    public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled;
 +
 +    public volatile boolean diagnostic_events_enabled = false;
 +
 +    /**
 +     * flags for enabling tracking repaired state of data during reads
 +     * separate flags for range & single partition reads as single partition reads are only tracked
 +     * when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
 +     * enabled for range reads, all such reads will include repaired data tracking. As this adds
 +     * some overhead, operators may wish to disable it whilst still enabling it for partition reads
 +     */
 +    public volatile boolean repaired_data_tracking_for_range_reads_enabled = false;
 +    public volatile boolean repaired_data_tracking_for_partition_reads_enabled = false;
 +    /* If true, unconfirmed mismatches (those which cannot be considered conclusive proof of out of
 +     * sync repaired data due to the presence of pending repair sessions, or unrepaired partition
 +     * deletes) will increment a metric, distinct from confirmed mismatches. If false, unconfirmed
 +     * mismatches are simply ignored by the coordinator.
 +     * This is purely to allow operators to avoid potential signal:noise issues as these types of
 +     * mismatches are considerably less actionable than their confirmed counterparts. Setting this
 +     * to true only disables the incrementing of the counters when an unconfirmed mismatch is found
 +     * and has no other effect on the collection or processing of the repaired data.
 +     */
 +    public volatile boolean report_unconfirmed_repaired_data_mismatches = false;
++    /*
++     * If true, when a repaired data mismatch is detected at read time or during a preview repair,
++     * a snapshot request will be issued to each particpating replica. These are limited at the replica level
++     * so that only a single snapshot per-table per-day can be taken via this method.
++     */
++    public volatile boolean snapshot_on_repaired_data_mismatch = false;
 +
 +    /**
 +     * number of seconds to set nowInSec into the future when performing validation previews against repaired data
 +     * this (attempts) to prevent a race where validations on different machines are started on different sides of
 +     * a tombstone being compacted away
 +     */
 +    public volatile int validation_preview_purge_head_start_in_sec = 60 * 60;
 +
      /**
       * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
       */
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4821653,1e2e1a1..85bfa88
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -2887,124 -2585,6 +2887,134 @@@ public class DatabaseDescripto
          return backPressureStrategy;
      }
  
 +    public static ConsistencyLevel getIdealConsistencyLevel()
 +    {
 +        return conf.ideal_consistency_level;
 +    }
 +
 +    public static void setIdealConsistencyLevel(ConsistencyLevel cl)
 +    {
 +        conf.ideal_consistency_level = cl;
 +    }
 +
 +    public static int getRepairCommandPoolSize()
 +    {
 +        return conf.repair_command_pool_size;
 +    }
 +
 +    public static Config.RepairCommandPoolFullStrategy getRepairCommandPoolFullStrategy()
 +    {
 +        return conf.repair_command_pool_full_strategy;
 +    }
 +
 +    public static FullQueryLoggerOptions getFullQueryLogOptions()
 +    {
 +        return  conf.full_query_logging_options;
 +    }
 +
 +    public static boolean getBlockForPeersInRemoteDatacenters()
 +    {
 +        return conf.block_for_peers_in_remote_dcs;
 +    }
 +
 +    public static int getBlockForPeersTimeoutInSeconds()
 +    {
 +        return conf.block_for_peers_timeout_in_secs;
 +    }
 +
 +    public static boolean automaticSSTableUpgrade()
 +    {
 +        return conf.automatic_sstable_upgrade;
 +    }
 +
 +    public static void setAutomaticSSTableUpgradeEnabled(boolean enabled)
 +    {
 +        if (conf.automatic_sstable_upgrade != enabled)
 +            logger.debug("Changing automatic_sstable_upgrade to {}", enabled);
 +        conf.automatic_sstable_upgrade = enabled;
 +    }
 +
 +    public static int maxConcurrentAutoUpgradeTasks()
 +    {
 +        return conf.max_concurrent_automatic_sstable_upgrades;
 +    }
 +
 +    public static void setMaxConcurrentAutoUpgradeTasks(int value)
 +    {
 +        if (conf.max_concurrent_automatic_sstable_upgrades != value)
 +            logger.debug("Changing max_concurrent_automatic_sstable_upgrades to {}", value);
 +        validateMaxConcurrentAutoUpgradeTasksConf(value);
 +        conf.max_concurrent_automatic_sstable_upgrades = value;
 +    }
 +
 +    private static void validateMaxConcurrentAutoUpgradeTasksConf(int value)
 +    {
 +        if (value < 0)
 +            throw new ConfigurationException("max_concurrent_automatic_sstable_upgrades can't be negative");
 +        if (value > getConcurrentCompactors())
 +            logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
 +    }
 +    
 +    public static AuditLogOptions getAuditLoggingOptions()
 +    {
 +        return conf.audit_logging_options;
 +    }
 +
 +    public static void setAuditLoggingOptions(AuditLogOptions auditLoggingOptions)
 +    {
 +        conf.audit_logging_options = auditLoggingOptions;
 +    }
 +
 +    public static Config.CorruptedTombstoneStrategy getCorruptedTombstoneStrategy()
 +    {
 +        return conf.corrupted_tombstone_strategy;
 +    }
 +
 +    public static void setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy strategy)
 +    {
 +        conf.corrupted_tombstone_strategy = strategy;
 +    }
 +
 +    public static boolean getRepairedDataTrackingForRangeReadsEnabled()
 +    {
 +        return conf.repaired_data_tracking_for_range_reads_enabled;
 +    }
 +
 +    public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled)
 +    {
 +        conf.repaired_data_tracking_for_range_reads_enabled = enabled;
 +    }
 +
 +    public static boolean getRepairedDataTrackingForPartitionReadsEnabled()
 +    {
 +        return conf.repaired_data_tracking_for_partition_reads_enabled;
 +    }
 +
 +    public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled)
 +    {
 +        conf.repaired_data_tracking_for_partition_reads_enabled = enabled;
 +    }
 +
++    public static boolean snapshotOnRepairedDataMismatch()
++    {
++        return conf.snapshot_on_repaired_data_mismatch;
++    }
++
++    public static void setSnapshotOnRepairedDataMismatch(boolean enabled)
++    {
++        conf.snapshot_on_repaired_data_mismatch = enabled;
++    }
++
 +    public static boolean reportUnconfirmedRepairedDataMismatches()
 +    {
 +        return conf.report_unconfirmed_repaired_data_mismatches;
 +    }
 +
 +    public static void reportUnconfirmedRepairedDataMismatches(boolean enabled)
 +    {
 +        conf.report_unconfirmed_repaired_data_mismatches = enabled;
 +    }
 +
      public static boolean strictRuntimeChecks()
      {
          return strictRuntimeChecks;
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 4daad7d,3d6559b..fe440f4
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -618,74 -725,64 +618,71 @@@ public class SinglePartitionReadComman
               *   timestamp(tombstone) > maxTimestamp_s0
               * since we necessarily have
               *   timestamp(tombstone) <= maxTimestamp_s1
-              * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
-              * in one pass, and minimize the number of sstables for which we read a partition tombstone.
-              */
+              * In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone
+              * elimination in one pass, and minimize the number of sstables for which we read a partition tombstone.
+             */
+             Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
 -            long mostRecentPartitionTombstone = Long.MIN_VALUE;
              int nonIntersectingSSTables = 0;
-             List<SSTableReader> skippedSSTablesWithTombstones = null;
+             int includedDueToTombstones = 0;
+ 
              SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
  
 +            if (isTrackingRepairedStatus())
 +                Tracing.trace("Collecting data from sstables and tracking repaired status");
 +
              for (SSTableReader sstable : view.sstables)
              {
                  // if we've already seen a partition tombstone with a timestamp greater
                  // than the most recent update to this sstable, we can skip it
 +                // if we're tracking repaired status, we mark the repaired digest inconclusive
 +                // as other replicas may not have seen this partition delete and so could include
 +                // data from this sstable (or others) in their digests
                  if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
 +                {
 +                    inputCollector.markInconclusive();
                      break;
 +                }
  
-                 if (!shouldInclude(sstable))
-                 {
-                     nonIntersectingSSTables++;
-                     if (sstable.mayHaveTombstones())
-                     { // if sstable has tombstones we need to check after one pass if it can be safely skipped
-                         if (skippedSSTablesWithTombstones == null)
-                             skippedSSTablesWithTombstones = new ArrayList<>();
-                         skippedSSTablesWithTombstones.add(sstable);
- 
-                     }
-                     continue;
-                 }
- 
-                 minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
- 
-                 @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception,
-                                               // or through the closing of the final merged iterator
-                 UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
-                 if (!sstable.isRepaired())
-                     oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- 
-                 inputCollector.addSSTableIterator(sstable, iter);
-                 mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
-                                                         iter.partitionLevelDeletion().markedForDeleteAt());
-             }
- 
-             int includedDueToTombstones = 0;
-             // Check for sstables with tombstones that are not expired
-             if (skippedSSTablesWithTombstones != null)
-             {
-                 for (SSTableReader sstable : skippedSSTablesWithTombstones)
+                 if (shouldInclude(sstable))
                  {
-                     if (sstable.getMaxTimestamp() <= minTimestamp)
-                         continue;
- 
-                     @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception,
-                                                   // or through the closing of the final merged iterator
-                     UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
                      if (!sstable.isRepaired())
                          oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
  
+                     // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+                     @SuppressWarnings("resource")
 -                    UnfilteredRowIterator iter = makeIterator(cfs, sstable, true, metricsCollector);
 -                    iterators.add(iter);
++                    UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
 +                    inputCollector.addSSTableIterator(sstable, iter);
-                     includedDueToTombstones++;
+                     mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+                                                             iter.partitionLevelDeletion().markedForDeleteAt());
+                 }
+                 else
+                 {
 -
+                     nonIntersectingSSTables++;
+                     // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+                     if (sstable.mayHaveTombstones())
+                     {
+                         // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+                         @SuppressWarnings("resource")
 -                        UnfilteredRowIterator iter = makeIterator(cfs, sstable, true, metricsCollector);
++                        UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
+                         // if the sstable contains a partition delete, then we must include it regardless of whether it
+                         // shadows any other data seen locally as we can't guarantee that other replicas have seen it
+                         if (!iter.partitionLevelDeletion().isLive())
+                         {
+                             if (!sstable.isRepaired())
+                                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 -                            iterators.add(iter);
++                            inputCollector.addSSTableIterator(sstable, iter);
+                             includedDueToTombstones++;
+                             mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+                                                                     iter.partitionLevelDeletion().markedForDeleteAt());
+                         }
+                         else
+                         {
+                             iter.close();
+                         }
+                     }
                  }
              }
+ 
              if (Tracing.isTracing())
                  Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
                                 nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index c7a6d71,35794e2..ae35aaf
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -17,15 -17,9 +17,16 @@@
   */
  package org.apache.cassandra.repair;
  
 -import java.net.InetAddress;
 +import java.io.IOException;
  import java.nio.ByteBuffer;
 -import java.util.*;
 +import java.util.ArrayList;
 +import java.util.Collection;
++import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutorService;
  import java.util.concurrent.LinkedBlockingQueue;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
@@@ -49,35 -33,22 +50,40 @@@ import org.apache.commons.lang3.time.Du
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 +import com.codahale.metrics.Timer;
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.NamedThreadFactory;
 -import org.apache.cassandra.config.SchemaConstants;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.metrics.RepairMetrics;
++import org.apache.cassandra.db.SnapshotCommand;
++import org.apache.cassandra.gms.FailureDetector;
++import org.apache.cassandra.net.Message;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.repair.consistent.SyncStatSummary;
++import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.cql3.QueryOptions;
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.cql3.statements.SelectStatement;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.locator.EndpointsForRange;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.repair.consistent.CoordinatorSession;
- import org.apache.cassandra.repair.consistent.SyncStatSummary;
  import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.schema.SchemaConstants;
  import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 +import org.apache.cassandra.streaming.PreviewKind;
  import org.apache.cassandra.tracing.TraceKeyspace;
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
@@@ -425,262 -306,68 +431,326 @@@ public class RepairRunnable implements 
                          hasFailure.compareAndSet(false, true);
                      }
                  }
 -                return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
 +                return Futures.immediateFuture(null);
              }
 -        });
 -        Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
 +        }, MoreExecutors.directExecutor());
 +        Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor());
 +    }
 +
 +    /**
 +     * removes dead nodes from common ranges, and exludes ranges left without any participants
 +     */
 +    @VisibleForTesting
 +    static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force)
 +    {
 +        if (!force)
          {
 -            public void onSuccess(Object result)
 +            return commonRanges;
 +        }
 +        else
 +        {
 +            List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
 +
 +            for (CommonRange commonRange : commonRanges)
              {
 -                SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
 -                if (hasFailure.get())
 +                Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
 +                Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
 +                Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
 +
 +                // this node is implicitly a participant in this repair, so a single endpoint is ok here
 +                if (!endpoints.isEmpty())
                  {
 -                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
 -                                                             "Some repair failed"));
 +                    filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges));
                  }
 -                else
 +            }
 +            Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
 +            return filtered;
 +        }
 +    }
 +
 +    private void incrementalRepair(UUID parentSession,
 +                                   long startTime,
 +                                   boolean forceRepair,
 +                                   TraceState traceState,
 +                                   Set<InetAddressAndPort> allNeighbors,
 +                                   List<CommonRange> commonRanges,
 +                                   String... cfnames)
 +    {
 +        // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
 +        Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
 +                                                  .addAll(allNeighbors)
 +                                                  .add(FBUtilities.getBroadcastAddressAndPort())
 +                                                  .build();
 +
 +        List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
 +
 +        CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, forceRepair);
 +        ListeningExecutorService executor = createExecutor();
 +        AtomicBoolean hasFailure = new AtomicBoolean(false);
 +        ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
 +                                                                   hasFailure);
 +        Collection<Range<Token>> ranges = new HashSet<>();
 +        for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges))
 +        {
 +            ranges.addAll(range);
 +        }
 +        Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, ranges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor());
 +    }
 +
 +    private void previewRepair(UUID parentSession,
 +                               long startTime,
 +                               List<CommonRange> commonRanges,
 +                               String... cfnames)
 +    {
 +
 +        logger.debug("Starting preview repair for {}", parentSession);
 +        // Set up RepairJob executor for this repair command.
 +        ListeningExecutorService executor = createExecutor();
 +
 +        final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
 +
 +        Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
 +        {
 +            public void onSuccess(List<RepairSessionResult> results)
 +            {
 +                try
                  {
 -                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
 -                                                             "Repair completed successfully"));
 +                    if (results == null || results.stream().anyMatch(s -> s == null))
 +                    {
 +                        // something failed
 +                        fail(null);
 +                        return;
 +                    }
 +                    PreviewKind previewKind = options.getPreviewKind();
 +                    Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
 +                    SyncStatSummary summary = new SyncStatSummary(true);
 +                    summary.consumeSessionResults(results);
 +
 +                    final String message;
 +                    if (summary.isEmpty())
 +                    {
 +                        message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
 +                    }
 +                    else
 +                    {
 +                        message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
 +                        RepairMetrics.previewFailures.inc();
++                        if (previewKind == PreviewKind.REPAIRED)
++                            maybeSnapshotReplicas(parentSession, keyspace, results);
 +                    }
 +                    notification(message);
 +
 +                    success("Repair preview completed successfully");
 +                }
 +                catch (Throwable t)
 +                {
 +                    logger.error("Error completing preview repair", t);
 +                    onFailure(t);
 +                }
 +                finally
 +                {
 +                    executor.shutdownNow();
                  }
 -                repairComplete();
              }
  
              public void onFailure(Throwable t)
              {
 -                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
 -                SystemDistributedKeyspace.failParentRepair(parentSession, t);
 -                repairComplete();
 +                notifyError(t);
 +                fail("Error completing preview repair: " + t.getMessage());
 +                executor.shutdownNow();
              }
 +        }, MoreExecutors.directExecutor());
 +    }
 +
++    private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results)
++    {
++        if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
++            return;
+ 
 -            private void repairComplete()
++        try
++        {
++            Set<String> mismatchingTables = new HashSet<>();
++            Set<InetAddressAndPort> nodes = new HashSet<>();
++            for (RepairSessionResult sessionResult : results)
+             {
 -                String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
 -                                                                          true, true);
 -                String message = String.format("Repair command #%d finished in %s", cmd, duration);
 -                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
 -                logger.info(message);
 -                if (options.isTraced() && traceState != null)
++                for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
+                 {
 -                    for (ProgressListener listener : listeners)
 -                        traceState.removeProgressListener(listener);
 -                    // Because DebuggableThreadPoolExecutor#afterExecute and this callback
 -                    // run in a nondeterministic order (within the same thread), the
 -                    // TraceState may have been nulled out at this point. The TraceState
 -                    // should be traceState, so just set it without bothering to check if it
 -                    // actually was nulled out.
 -                    Tracing.instance.set(traceState);
 -                    Tracing.traceRepair(message);
 -                    Tracing.instance.stopSession();
++                    for (SyncStat stat : emptyIfNull(repairResult.stats))
++                    {
++                        if (stat.numberOfDifferences > 0)
++                            mismatchingTables.add(repairResult.desc.columnFamily);
++                        // snapshot all replicas, even if they don't have any differences
++                        nodes.add(stat.nodes.coordinator);
++                        nodes.add(stat.nodes.peer);
++                    }
+                 }
 -                executor.shutdownNow();
+             }
 -        });
++
++            String snapshotName = RepairedDataVerifier.SnapshottingVerifier.getSnapshotName();
++            for (String table : mismatchingTables)
++            {
++                // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
++                if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
++                {
++                    logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
++                                options.getPreviewKind().logPrefix(parentSession),
++                                keyspace, table, snapshotName, nodes)
++                    ;
++                    Message<SnapshotCommand> message = Message.out(Verb.SNAPSHOT_REQ, new SnapshotCommand(keyspace,
++                                                                                                          table,
++                                                                                                          snapshotName,
++                                                                                                          false));
++                    for (InetAddressAndPort target : nodes)
++                        MessagingService.instance().send(message, target);
++                }
++                else
++                {
++                    logger.info("{} Not snapshotting {}.{} - snapshot {} exists",
++                                options.getPreviewKind().logPrefix(parentSession),
++                                keyspace, table, snapshotName);
++                }
++            }
++        }
++        catch (Exception e)
++        {
++            logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e);
++        }
+     }
+ 
 -    private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
++    private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
+     {
 -        for (int i = 0; i < neighborRangeList.size(); i++)
++        if (iter == null)
++            return Collections.emptyList();
++        return iter;
++    }
++
 +    private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession,
 +                                                                             boolean isIncremental,
 +                                                                             ListeningExecutorService executor,
 +                                                                             List<CommonRange> commonRanges,
 +                                                                             String... cfnames)
 +    {
 +        List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
 +
 +        // we do endpoint filtering at the start of an incremental repair,
 +        // so repair sessions shouldn't also be checking liveness
 +        boolean force = options.isForcedRepair() && !isIncremental;
 +        for (CommonRange commonRange : commonRanges)
 +        {
 +            logger.info("Starting RepairSession for {}", commonRange);
 +            RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
 +                                                                                     commonRange,
 +                                                                                     keyspace,
 +                                                                                     options.getParallelism(),
 +                                                                                     isIncremental,
 +                                                                                     options.isPullRepair(),
 +                                                                                     force,
 +                                                                                     options.getPreviewKind(),
 +                                                                                     options.optimiseStreams(),
 +                                                                                     executor,
 +                                                                                     cfnames);
 +            if (session == null)
 +                continue;
 +            Futures.addCallback(session, new RepairSessionCallback(session), MoreExecutors.directExecutor());
 +            futures.add(session);
 +        }
 +        return Futures.successfulAsList(futures);
 +    }
 +
 +    private ListeningExecutorService createExecutor()
 +    {
 +        return MoreExecutors.listeningDecorator(new JMXEnabledThreadPoolExecutor(options.getJobThreads(),
 +                                                                                 Integer.MAX_VALUE,
 +                                                                                 TimeUnit.SECONDS,
 +                                                                                 new LinkedBlockingQueue<>(),
 +                                                                                 new NamedThreadFactory("Repair#" + cmd),
 +                                                                                 "internal"));
 +    }
 +
 +    private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
 +    {
 +        private final RepairSession session;
 +
 +        public RepairSessionCallback(RepairSession session)
 +        {
 +            this.session = session;
 +        }
 +
 +        public void onSuccess(RepairSessionResult result)
 +        {
 +            String message = String.format("Repair session %s for range %s finished", session.getId(),
 +                                           session.ranges().toString());
 +            logger.info(message);
 +            fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
 +                                                progressCounter.incrementAndGet(),
 +                                                totalProgress,
 +                                                message));
 +        }
 +
 +        public void onFailure(Throwable t)
          {
 -            Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i);
 +            String message = String.format("Repair session %s for range %s failed with error %s",
 +                                           session.getId(), session.ranges().toString(), t.getMessage());
 +            notifyError(new RuntimeException(message, t));
 +        }
 +    }
  
 -            if (p.left.containsAll(neighbors))
 +    private class RepairCompleteCallback implements FutureCallback<Object>
 +    {
 +        final UUID parentSession;
 +        final Collection<Range<Token>> successfulRanges;
 +        final long startTime;
 +        final TraceState traceState;
 +        final AtomicBoolean hasFailure;
 +        final ExecutorService executor;
 +
 +        public RepairCompleteCallback(UUID parentSession,
 +                                      Collection<Range<Token>> successfulRanges,
 +                                      long startTime,
 +                                      TraceState traceState,
 +                                      AtomicBoolean hasFailure,
 +                                      ExecutorService executor)
 +        {
 +            this.parentSession = parentSession;
 +            this.successfulRanges = successfulRanges;
 +            this.startTime = startTime;
 +            this.traceState = traceState;
 +            this.hasFailure = hasFailure;
 +            this.executor = executor;
 +        }
 +
 +        public void onSuccess(Object result)
 +        {
 +            maybeStoreParentRepairSuccess(successfulRanges);
 +            if (hasFailure.get())
              {
 -                p.right.add(range);
 +                fail(null);
 +            }
 +            else
 +            {
 +                success("Repair completed successfully");
 +            }
 +            executor.shutdownNow();
 +        }
 +
 +        public void onFailure(Throwable t)
 +        {
 +            notifyError(t);
 +            fail(t.getMessage());
 +            executor.shutdownNow();
 +        }
 +    }
 +
 +    private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
 +    {
 +        Set<InetAddressAndPort> endpoints = neighbors.endpoints();
 +        Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints();
 +
 +        for (CommonRange commonRange : neighborRangeList)
 +        {
 +            if (commonRange.matchesEndpoints(endpoints, transEndpoints))
 +            {
 +                commonRange.ranges.add(range);
                  return;
              }
          }
diff --cc src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index cf2872b,a997533..4d8c4df
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@@ -17,30 -17,30 +17,97 @@@
   */
  package org.apache.cassandra.service;
  
++import java.util.concurrent.Executor;
++import java.util.concurrent.Executors;
++
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
++import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.SnapshotCommand;
  import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.IVerbHandler;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
  
  public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
  {
 +    public static final SnapshotVerbHandler instance = new SnapshotVerbHandler();
++    public static final String REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX = "RepairedDataMismatch-";
++    private static final Executor REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR = Executors.newSingleThreadExecutor();
 +
      private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
  
 -    public void doVerb(MessageIn<SnapshotCommand> message, int id)
 +    public void doVerb(Message<SnapshotCommand> message)
      {
          SnapshotCommand command = message.payload;
          if (command.clear_snapshot)
+         {
              Keyspace.clearSnapshot(command.snapshot_name, command.keyspace);
+         }
++        else if (command.snapshot_name.startsWith(REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX))
++        {
++            REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR.execute(new RepairedDataSnapshotTask(command, message.from()));
++        }
          else
++        {
              Keyspace.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
 -        logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from);
 -        MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
++        }
 +
 +        logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from());
 +        MessagingService.instance().send(message.emptyResponse(), message.from());
 +    }
++
++    private static class RepairedDataSnapshotTask implements Runnable
++    {
++        final SnapshotCommand command;
++        final InetAddressAndPort from;
++
++        RepairedDataSnapshotTask(SnapshotCommand command, InetAddressAndPort from)
++        {
++            this.command = command;
++            this.from = from;
++        }
++
++        public void run()
++        {
++            try
++            {
++                Keyspace ks = Keyspace.open(command.keyspace);
++                if (ks == null)
++                {
++                    logger.info("Snapshot request received from {} for {}.{} but keyspace not found",
++                                from,
++                                command.keyspace,
++                                command.column_family);
++                    return;
++                }
++
++                ColumnFamilyStore cfs = ks.getColumnFamilyStore(command.column_family);
++                if (cfs.snapshotExists(command.snapshot_name))
++                {
++                    logger.info("Received snapshot request from {} for {}.{} following repaired data mismatch, " +
++                                "but snapshot with tag {} already exists",
++                                from,
++                                command.keyspace,
++                                command.column_family,
++                                command.snapshot_name);
++                    return;
++                }
++                logger.info("Creating snapshot requested by {} of {}.{} following repaired data mismatch",
++                            from,
++                            command.keyspace,
++                            command.column_family);
++                cfs.snapshot(command.snapshot_name);
++            }
++            catch (IllegalArgumentException e)
++            {
++                logger.warn("Snapshot request received from {} for {}.{} but table not found",
++                            from,
++                            command.keyspace,
++                            command.column_family);
++            }
++        }
+     }
  }
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 59e6de7,15fe938..38e9c44
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -2764,107 -2813,11 +2764,125 @@@ public class StorageProxy implements St
          return Schema.instance.getNumberOfTables();
      }
  
 +    public String getIdealConsistencyLevel()
 +    {
 +        return DatabaseDescriptor.getIdealConsistencyLevel().toString();
 +    }
 +
 +    public String setIdealConsistencyLevel(String cl)
 +    {
 +        ConsistencyLevel original = DatabaseDescriptor.getIdealConsistencyLevel();
 +        ConsistencyLevel newCL = ConsistencyLevel.valueOf(cl.trim().toUpperCase());
 +        DatabaseDescriptor.setIdealConsistencyLevel(newCL);
 +        return String.format("Updating ideal consistency level new value: %s old value %s", newCL, original.toString());
 +    }
 +
 +    @Deprecated
      public int getOtcBacklogExpirationInterval() {
 -        return DatabaseDescriptor.getOtcBacklogExpirationInterval();
 +        return 0;
 +    }
 +
 +    @Deprecated
 +    public void setOtcBacklogExpirationInterval(int intervalInMillis) { }
 +
 +    @Override
 +    public void enableRepairedDataTrackingForRangeReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
 +    }
 +
 +    @Override
 +    public void disableRepairedDataTrackingForRangeReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
      }
  
 -    public void setOtcBacklogExpirationInterval(int intervalInMillis) {
 -        DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
 +    @Override
 +    public boolean getRepairedDataTrackingEnabledForRangeReads()
 +    {
 +        return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
 +    }
 +
 +    @Override
 +    public void enableRepairedDataTrackingForPartitionReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
 +    }
 +
 +    @Override
 +    public void disableRepairedDataTrackingForPartitionReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
 +    }
 +
 +    @Override
 +    public boolean getRepairedDataTrackingEnabledForPartitionReads()
 +    {
 +        return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
 +    }
 +
 +    @Override
 +    public void enableReportingUnconfirmedRepairedDataMismatches()
 +    {
 +        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
 +    }
 +
 +    @Override
 +    public void disableReportingUnconfirmedRepairedDataMismatches()
 +    {
 +       DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
 +    }
 +
 +    @Override
 +    public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled()
 +    {
 +        return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
 +    }
 +
++    @Override
++    public boolean getSnapshotOnRepairedDataMismatchEnabled()
++    {
++        return DatabaseDescriptor.snapshotOnRepairedDataMismatch();
++    }
++
++    @Override
++    public void enableSnapshotOnRepairedDataMismatch()
++    {
++        DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(true);
++    }
++
++    @Override
++    public void disableSnapshotOnRepairedDataMismatch()
++    {
++        DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(false);
++    }
++
 +    static class PaxosBallotAndContention
 +    {
 +        final UUID ballot;
 +        final int contentions;
 +
 +        PaxosBallotAndContention(UUID ballot, int contentions)
 +        {
 +            this.ballot = ballot;
 +            this.contentions = contentions;
 +        }
 +
 +        @Override
 +        public final int hashCode()
 +        {
 +            int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode());
 +            return 31 * hashCode * this.contentions;
 +        }
 +
 +        @Override
 +        public final boolean equals(Object o)
 +        {
 +            if(!(o instanceof PaxosBallotAndContention))
 +                return false;
 +            PaxosBallotAndContention that = (PaxosBallotAndContention)o;
 +            // handles nulls properly
 +            return Objects.equals(ballot, that.ballot) && contentions == that.contentions;
 +        }
      }
  }
diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 514feb1,8678dde..e0d2c86
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@@ -66,26 -63,7 +66,30 @@@ public interface StorageProxyMBea
      public void setOtcBacklogExpirationInterval(int intervalInMillis);
  
      /** Returns each live node's schema version */
 -    public Map<String, List<String>> getSchemaVersions();
 +    @Deprecated public Map<String, List<String>> getSchemaVersions();
 +    public Map<String, List<String>> getSchemaVersionsWithPort();
  
      public int getNumberOfTables();
 +
 +    public String getIdealConsistencyLevel();
 +    public String setIdealConsistencyLevel(String cl);
 +
 +    /**
 +     * Tracking and reporting of variances in the repaired data set across replicas at read time
 +     */
 +    void enableRepairedDataTrackingForRangeReads();
 +    void disableRepairedDataTrackingForRangeReads();
 +    boolean getRepairedDataTrackingEnabledForRangeReads();
 +
 +    void enableRepairedDataTrackingForPartitionReads();
 +    void disableRepairedDataTrackingForPartitionReads();
 +    boolean getRepairedDataTrackingEnabledForPartitionReads();
 +
 +    void enableReportingUnconfirmedRepairedDataMismatches();
 +    void disableReportingUnconfirmedRepairedDataMismatches();
 +    boolean getReportingUnconfirmedRepairedDataMismatchesEnabled();
++
++    void enableSnapshotOnRepairedDataMismatch();
++    void disableSnapshotOnRepairedDataMismatch();
++    boolean getSnapshotOnRepairedDataMismatchEnabled();
  }
diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java
index 45bf918,0000000..251ee28
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@@ -1,280 -1,0 +1,280 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.Collections2;
 +
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.ReadResponse;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
 +import org.apache.cassandra.db.transform.Filter;
 +import org.apache.cassandra.db.transform.FilteredPartitions;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
 +import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
 +import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 +
 +import static com.google.common.collect.Iterables.*;
 +
 +public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
 +{
 +    private final boolean enforceStrictLiveness;
 +    private final ReadRepair<E, P> readRepair;
 +
 +    public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
 +    {
 +        super(command, replicaPlan, queryStartNanoTime);
 +        this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
 +        this.readRepair = readRepair;
 +    }
 +
 +    public PartitionIterator getData()
 +    {
 +        ReadResponse response = responses.get(0).payload;
 +        return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
 +    }
 +
 +    public boolean isDataPresent()
 +    {
 +        return !responses.isEmpty();
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public PartitionIterator resolve()
 +    {
 +        // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
 +        // at the beginning of this method), so grab the response count once and use that through the method.
 +        Collection<Message<ReadResponse>> messages = responses.snapshot();
 +        assert !any(messages, msg -> msg.payload.isDigestResponse());
 +
 +        E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from()), false);
 +        List<UnfilteredPartitionIterator> iters = new ArrayList<>(
 +        Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
 +        assert replicas.size() == iters.size();
 +
 +        // If requested, inspect each response for a digest of the replica's repaired data set
 +        RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
 +                                                  ? new RepairedDataTracker(getRepairedDataVerifier(command))
 +                                                  : null;
 +        if (repairedDataTracker != null)
 +        {
 +            messages.forEach(msg -> {
 +                if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from()).isFull())
 +                {
 +                    repairedDataTracker.recordDigest(msg.from(),
 +                                                     msg.payload.repairedDataDigest(),
 +                                                     msg.payload.isRepairedDigestConclusive());
 +                }
 +            });
 +        }
 +
 +        /*
 +         * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
 +         * have more rows than the client requested. To make sure that we still conform to the original limit,
 +         * we apply a top-level post-reconciliation counter to the merged partition iterator.
 +         *
 +         * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
 +         * to the current partition to work. For this reason we have to apply the counter transformation before
 +         * empty partition discard logic kicks in - for it will eagerly consume the iterator.
 +         *
 +         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
 +         *
 +         * See CASSANDRA-13747 for more details.
 +         */
 +        DataLimits.Counter mergedResultCounter =
 +            command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
 +
 +        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
 +                                                                          replicaPlan.getWithContacts(replicas),
 +                                                                          mergedResultCounter,
 +                                                                          repairedDataTracker);
 +        FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
 +        PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
 +        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
 +    }
 +
 +    protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
 +    {
-         return RepairedDataVerifier.simple(command);
++        return RepairedDataVerifier.verifier(command);
 +    }
 +
 +    private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
 +                                                                     P sources,
 +                                                                     DataLimits.Counter mergedResultCounter,
 +                                                                     RepairedDataTracker repairedDataTracker)
 +    {
 +        // If we have only one results, there is no read repair to do, we can't get short
 +        // reads and we can't make a comparison between repaired data sets
 +        if (results.size() == 1)
 +            return results.get(0);
 +
 +        /*
 +         * So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
 +         * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
 +         */
 +        if (!command.limits().isUnlimited())
 +            for (int i = 0; i < results.size(); i++)
 +                results.set(i, ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
 +
 +        return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
 +    }
 +
 +    private String makeResponsesDebugString(DecoratedKey partitionKey)
 +    {
 +        return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from() + " => " + m.payload.toDebugString(command, partitionKey)));
 +    }
 +
 +    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
 +                                                                         P sources,
 +                                                                         RepairedDataTracker repairedDataTracker)
 +    {
 +        // Avoid wrapping no-op listener as it doesn't throw, unless we're tracking repaired status
 +        // in which case we need to inject the tracker & verify on close
 +        if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP)
 +        {
 +            if (repairedDataTracker == null)
 +                return partitionListener;
 +
 +            return new UnfilteredPartitionIterators.MergeListener()
 +            {
 +
 +                public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +                {
 +                    return UnfilteredRowIterators.MergeListener.NOOP;
 +                }
 +
 +                public void close()
 +                {
 +                    repairedDataTracker.verify();
 +                }
 +            };
 +        }
 +
 +        return new UnfilteredPartitionIterators.MergeListener()
 +        {
 +            public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +            {
 +                UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions);
 +
 +                return new UnfilteredRowIterators.MergeListener()
 +                {
 +                    public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
 +                    {
 +                        try
 +                        {
 +                            rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           mergedDeletion == null ? "null" : mergedDeletion.toString(),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public void onMergedRows(Row merged, Row[] versions)
 +                    {
 +                        try
 +                        {
 +                            rowListener.onMergedRows(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? "null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
 +                    {
 +                        try
 +                        {
 +                            // The code for merging range tombstones is a tad complex and we had the assertions there triggered
 +                            // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
 +                            // when that happen without more context that what the assertion errors give us however, hence the
 +                            // catch here that basically gather as much as context as reasonable.
 +                            rowListener.onMergedRangeTombstoneMarkers(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? "null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
 +                                                           sources.contacts(),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +
 +                    }
 +
 +                    public void close()
 +                    {
 +                        rowListener.close();
 +                    }
 +                };
 +            }
 +
 +            public void close()
 +            {
 +                partitionListener.close();
 +                if (repairedDataTracker != null)
 +                    repairedDataTracker.verify();
 +            }
 +        };
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
index 816fe9f,0000000..bed240c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
@@@ -1,95 -1,0 +1,157 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
++import java.time.LocalDate;
++import java.time.format.DateTimeFormatter;
++import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.PartitionRangeReadCommand;
 +import org.apache.cassandra.db.ReadCommand;
- import org.apache.cassandra.db.SinglePartitionReadCommand;
++import org.apache.cassandra.db.SnapshotCommand;
++import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.TableMetrics;
++import org.apache.cassandra.net.Message;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.schema.TableId;
++import org.apache.cassandra.service.SnapshotVerbHandler;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +
 +public interface RepairedDataVerifier
 +{
 +    public void verify(RepairedDataTracker tracker);
 +
++    static RepairedDataVerifier verifier(ReadCommand command)
++    {
++        return DatabaseDescriptor.snapshotOnRepairedDataMismatch() ? snapshotting(command) : simple(command);
++    }
++
 +    static RepairedDataVerifier simple(ReadCommand command)
 +    {
 +        return new SimpleVerifier(command);
 +    }
 +
++    static RepairedDataVerifier snapshotting(ReadCommand command)
++    {
++        return new SnapshottingVerifier(command);
++    }
++
 +    static class SimpleVerifier implements RepairedDataVerifier
 +    {
 +        private static final Logger logger = LoggerFactory.getLogger(SimpleVerifier.class);
-         private final ReadCommand command;
++        protected final ReadCommand command;
 +
 +        private static final String INCONSISTENCY_WARNING = "Detected mismatch between repaired datasets for table {}.{} during read of {}. {}";
 +
 +        SimpleVerifier(ReadCommand command)
 +        {
 +            this.command = command;
 +        }
 +
 +        @Override
 +        public void verify(RepairedDataTracker tracker)
 +        {
 +            Tracing.trace("Verifying repaired data tracker {}", tracker);
 +
 +            // some mismatch occurred between the repaired datasets on the replicas
 +            if (tracker.digests.keySet().size() > 1)
 +            {
 +                // if any of the digests should be considered inconclusive, because there were
 +                // pending repair sessions which had not yet been committed or unrepaired partition
 +                // deletes which meant some sstables were skipped during reads, mark the inconsistency
 +                // as confirmed
 +                if (tracker.inconclusiveDigests.isEmpty())
 +                {
 +                    TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
 +                    metrics.confirmedRepairedInconsistencies.mark();
 +                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
 +                                     INCONSISTENCY_WARNING, command.metadata().keyspace,
-                                      command.metadata().name, getCommandString(), tracker);
++                                     command.metadata().name, command.toString(), tracker);
 +                }
 +                else if (DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
 +                {
 +                    TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
 +                    metrics.unconfirmedRepairedInconsistencies.mark();
 +                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
 +                                     INCONSISTENCY_WARNING, command.metadata().keyspace,
-                                      command.metadata().name, getCommandString(), tracker);
++                                     command.metadata().name, command.toString(), tracker);
 +                }
 +            }
 +        }
++    }
++
++    static class SnapshottingVerifier extends SimpleVerifier
++    {
++        private static final Logger logger = LoggerFactory.getLogger(SnapshottingVerifier.class);
++        private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE;
++        private static final String SNAPSHOTTING_WARNING = "Issuing snapshot command for mismatch between repaired datasets for table {}.{} during read of {}. {}";
++
++        // Issue at most 1 snapshot request per minute for any given table.
++        // Replicas will only create one snapshot per day, but this stops us
++        // from swamping the network if we start seeing mismatches.
++        private static final long SNAPSHOT_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
++        private static final ConcurrentHashMap<TableId, AtomicLong> LAST_SNAPSHOT_TIMES = new ConcurrentHashMap<>();
++
++        SnapshottingVerifier(ReadCommand command)
++        {
++            super(command);
++        }
 +
-         private String getCommandString()
++        public void verify(RepairedDataTracker tracker)
 +        {
-             return command instanceof SinglePartitionReadCommand
-                    ? ((SinglePartitionReadCommand)command).partitionKey().toString()
-                    : ((PartitionRangeReadCommand)command).dataRange().keyRange().getString(command.metadata().partitionKeyType);
++            super.verify(tracker);
++            if (tracker.digests.keySet().size() > 1)
++            {
++                if (tracker.inconclusiveDigests.isEmpty() ||  DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
++                {
++                    long now = System.nanoTime();
++                    AtomicLong cached = LAST_SNAPSHOT_TIMES.computeIfAbsent(command.metadata().id, u -> new AtomicLong(0));
++                    long last = cached.get();
++                    if (now - last > SNAPSHOT_INTERVAL_NANOS && cached.compareAndSet(last, now))
++                    {
++                        logger.warn(SNAPSHOTTING_WARNING, command.metadata().keyspace, command.metadata().name, command.toString(), tracker);
++                        Message<SnapshotCommand> msg = Message.out(Verb.SNAPSHOT_REQ,
++                                                                   new SnapshotCommand(command.metadata().keyspace,
++                                                                                       command.metadata().name,
++                                                                                       getSnapshotName(),
++                                                                                       false));
++                        for (InetAddressAndPort replica : tracker.digests.values())
++                            MessagingService.instance().send(msg, replica);
++                    }
++                }
++            }
++        }
 +
++        public static String getSnapshotName()
++        {
++            return String.format("%s%s",
++                                 SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX,
++                                 DATE_FORMAT.format(LocalDate.now()));
 +        }
 +    }
 +}
++
diff --cc test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 30c8f25,0000000..70b40bc
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@@ -1,281 -1,0 +1,383 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
++import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.google.common.collect.ImmutableList;
++import com.google.common.util.concurrent.Uninterruptibles;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICoordinator;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.distributed.api.IMessage;
 +import org.apache.cassandra.distributed.api.IMessageFilters;
 +import org.apache.cassandra.distributed.shared.RepairResult;
 +import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.repair.messages.RepairOption;
++import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.SimpleCondition;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +public class PreviewRepairTest extends TestBaseImpl
 +{
 +    /**
 +     * makes sure that the repaired sstables are not matching on the two
 +     * nodes by disabling autocompaction on node2 and then running an
 +     * incremental repair
 +     */
 +    @Test
 +    public void testWithMismatchingPending() throws Throwable
 +    {
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            // make sure that all sstables have moved to repaired by triggering a compaction
 +            // also disables autocompaction on the nodes
 +            cluster.forEach((node) -> node.runOnInstance(() -> {
 +                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
 +                FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
 +                cfs.disableAutoCompaction();
 +            }));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            // now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired
 +            cluster.get(1).runOnInstance(() -> {
 +                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
 +                cfs.enableAutoCompaction();
 +                FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
 +            });
 +            RepairResult rs = cluster.get(1).callOnInstance(repair(options(true)));
 +            assertTrue(rs.success); // preview repair should succeed
 +            assertFalse(rs.wasInconsistent); // and we should see no mismatches
 +        }
 +    }
 +
 +    /**
 +     * another case where the repaired datasets could mismatch is if an incremental repair finishes just as the preview
 +     * repair is starting up.
 +     *
 +     * This tests this case:
 +     * 1. we start a preview repair
 +     * 2. pause the validation requests from node1 -> node2
 +     * 3. node1 starts its validation
 +     * 4. run an incremental repair which completes fine
 +     * 5. node2 resumes its validation
 +     *
 +     * Now we will include sstables from the second incremental repair on node2 but not on node1
 +     * This should fail since we fail any preview repair which is ongoing when an incremental repair finishes (step 4 above)
 +     */
 +    @Test
 +    public void testFinishingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
 +    {
 +        ExecutorService es = Executors.newSingleThreadExecutor();
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            SimpleCondition continuePreviewRepair = new SimpleCondition();
 +            DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
 +            // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
 +            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 +
 +            Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
 +            Thread.sleep(1000);
 +            // this needs to finish before the preview repair is unpaused on node2
 +            cluster.get(1).callOnInstance(repair(options(false)));
 +            continuePreviewRepair.signalAll();
 +            RepairResult rs = rsFuture.get();
 +            assertFalse(rs.success); // preview repair should have failed
 +            assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
 +    /**
 +     * Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair
 +     * so both preview and incremental repair should finish fine (without any mismatches)
 +     */
 +    @Test
 +    public void testFinishingNonIntersectingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
 +    {
 +        ExecutorService es = Executors.newSingleThreadExecutor();
 +        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
 +        {
 +            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
 +
 +            insert(cluster.coordinator(1), 0, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +            assertTrue(cluster.get(1).callOnInstance(repair(options(false))).success);
 +
 +            insert(cluster.coordinator(1), 100, 100);
 +            cluster.forEach((node) -> node.flush(KEYSPACE));
 +
 +            // pause preview repair validation messages on node2 until node1 has finished
 +            SimpleCondition continuePreviewRepair = new SimpleCondition();
 +            DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
 +            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 +
 +            // get local ranges to repair two separate ranges:
 +            List<String> localRanges = cluster.get(1).callOnInstance(() -> {
 +                List<String> res = new ArrayList<>();
 +                for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
 +                    res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
 +                return res;
 +            });
 +
 +            assertEquals(2, localRanges.size());
 +            Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
 +            Thread.sleep(1000); // wait for node1 to start validation compaction
 +            // this needs to finish before the preview repair is unpaused on node2
 +            assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).success);
 +
 +            continuePreviewRepair.signalAll();
 +            RepairResult rs = repairStatusFuture.get();
 +            assertTrue(rs.success); // repair should succeed
 +            assertFalse(rs.wasInconsistent); // and no mismatches
 +        }
 +        finally
 +        {
 +            es.shutdown();
 +        }
 +    }
 +
-     private static class DelayMessageFilter implements IMessageFilters.Matcher
++    @Test
++    public void snapshotTest() throws IOException, InterruptedException
++    {
++        try(Cluster cluster = init(Cluster.build(3).withConfig(config ->
++                                                               config.set("snapshot_on_repaired_data_mismatch", true)
++                                                                     .with(GOSSIP)
++                                                                     .with(NETWORK))
++                                          .start()))
++        {
++            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
++            cluster.schemaChange("create table " + KEYSPACE + ".tbl2 (id int primary key, t int)");
++            Thread.sleep(1000);
++
++            // populate 2 tables
++            insert(cluster.coordinator(1), 0, 100, "tbl");
++            insert(cluster.coordinator(1), 0, 100, "tbl2");
++            cluster.forEach((n) -> n.flush(KEYSPACE));
++
++            // make sure everything is marked repaired
++            cluster.get(1).callOnInstance(repair(options(false)));
++            waitMarkedRepaired(cluster);
++            // make node2 mismatch
++            unmarkRepaired(cluster.get(2), "tbl");
++            verifySnapshots(cluster, "tbl", true);
++            verifySnapshots(cluster, "tbl2", true);
++
++            AtomicInteger snapshotMessageCounter = new AtomicInteger();
++            cluster.filters().verbs(Verb.SNAPSHOT_REQ.id).messagesMatching((from, to, message) -> {
++                snapshotMessageCounter.incrementAndGet();
++                return false;
++            }).drop();
++            cluster.get(1).callOnInstance(repair(options(true)));
++            verifySnapshots(cluster, "tbl", false);
++            // tbl2 should not have a mismatch, so the snapshots should be empty here
++            verifySnapshots(cluster, "tbl2", true);
++            assertEquals(3, snapshotMessageCounter.get());
++
++            // and make sure that we don't try to snapshot again
++            snapshotMessageCounter.set(0);
++            cluster.get(3).callOnInstance(repair(options(true)));
++            assertEquals(0, snapshotMessageCounter.get());
++        }
++    }
++
++    private void waitMarkedRepaired(Cluster cluster)
++    {
++        cluster.forEach(node -> node.runOnInstance(() -> {
++            for (String table : Arrays.asList("tbl", "tbl2"))
++            {
++                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++                while (true)
++                {
++                    if (cfs.getLiveSSTables().stream().allMatch(SSTableReader::isRepaired))
++                        return;
++                    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++                }
++            }
++        }));
++    }
++
++    private void unmarkRepaired(IInvokableInstance instance, String table)
++    {
++        instance.runOnInstance(() -> {
++            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++            try
++            {
++                cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
++            }
++            catch (IOException e)
++            {
++                throw new RuntimeException(e);
++            }
++        });
++    }
++
++    private void verifySnapshots(Cluster cluster, String table, boolean shouldBeEmpty)
++    {
++        cluster.forEach(node -> node.runOnInstance(() -> {
++            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++            if(shouldBeEmpty)
++            {
++                assertTrue(cfs.getSnapshotDetails().isEmpty());
++            }
++            else
++            {
++                while (cfs.getSnapshotDetails().isEmpty())
++                    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++            }
++        }));
++    }
++
++    static class DelayMessageFilter implements IMessageFilters.Matcher
 +    {
 +        private final SimpleCondition condition;
 +        private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
 +
 +        public DelayMessageFilter(SimpleCondition condition)
 +        {
 +            this.condition = condition;
 +        }
 +        public boolean matches(int from, int to, IMessage message)
 +        {
 +            try
 +            {
 +                // only the first validation req should be delayed:
 +                if (waitForRepair.compareAndSet(true, false))
 +                    condition.await();
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            return false; // don't drop the message
 +        }
 +    }
 +
 +    private static void insert(ICoordinator coordinator, int start, int count)
 +    {
++        insert(coordinator, start, count, "tbl");
++    }
++
++    static void insert(ICoordinator coordinator, int start, int count, String table)
++    {
 +        for (int i = start; i < start + count; i++)
-             coordinator.execute("insert into " + KEYSPACE + ".tbl (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
++            coordinator.execute("insert into " + KEYSPACE + "." + table + " (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
 +    }
 +
 +    /**
 +     * returns a pair with [repair success, was inconsistent]
 +     */
 +    private static IIsolatedExecutor.SerializableCallable<RepairResult> repair(Map<String, String> options)
 +    {
 +        return () -> {
 +            SimpleCondition await = new SimpleCondition();
 +            AtomicBoolean success = new AtomicBoolean(true);
 +            AtomicBoolean wasInconsistent = new AtomicBoolean(false);
 +            StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
 +                if (event.getType() == ProgressEventType.ERROR)
 +                {
 +                    success.set(false);
 +                    await.signalAll();
 +                }
 +                else if (event.getType() == ProgressEventType.NOTIFICATION && event.getMessage().contains("Repaired data is inconsistent"))
 +                {
 +                    wasInconsistent.set(true);
 +                }
 +                else if (event.getType() == ProgressEventType.COMPLETE)
 +                    await.signalAll();
 +            }));
 +            try
 +            {
 +                await.await(1, TimeUnit.MINUTES);
 +            }
 +            catch (InterruptedException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +            return new RepairResult(success.get(), wasInconsistent.get());
 +        };
 +    }
 +
 +    private static Map<String, String> options(boolean preview)
 +    {
 +        Map<String, String> config = new HashMap<>();
 +        config.put(RepairOption.INCREMENTAL_KEY, "true");
 +        config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString());
 +        if (preview)
 +            config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
 +        return config;
 +    }
 +
 +    private static Map<String, String> options(boolean preview, String range)
 +    {
 +        Map<String, String> options = options(preview);
 +        options.put(RepairOption.RANGES_KEY, range);
 +        return options;
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 4e44543,0000000..664c99d
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@@ -1,205 -1,0 +1,316 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
- import java.io.Serializable;
++import java.time.LocalDate;
++import java.time.format.DateTimeFormatter;
 +import java.util.EnumSet;
++import java.util.Iterator;
 +import java.util.Map;
 +import java.util.concurrent.TimeUnit;
 +
++import com.google.common.util.concurrent.Uninterruptibles;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
++import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
++import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.service.ActiveRepairService;
++import org.apache.cassandra.service.SnapshotVerbHandler;
 +import org.apache.cassandra.service.StorageProxy;
 +
- public class RepairDigestTrackingTest extends TestBaseImpl implements Serializable// TODO: why serializable?
++public class RepairDigestTrackingTest extends TestBaseImpl
 +{
++    private static final String TABLE = "tbl";
++    private static final String KS_TABLE = KEYSPACE + "." + TABLE;
 +
 +    @Test
 +    public void testInconsistenciesFound() throws Throwable
 +    {
 +        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
 +        {
 +
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +            });
 +
-             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
++            cluster.schemaChange("CREATE TABLE " + KS_TABLE+ " (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
 +            for (int i = 0; i < 10; i++)
 +            {
-                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)",
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
- 
-             cluster.get(1).runOnInstance(() ->
-                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
-             );
-             cluster.get(2).runOnInstance(() ->
-                Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
-             );
++            cluster.forEach(i -> i.flush(KEYSPACE));
 +
 +            for (int i = 10; i < 20; i++)
 +            {
-                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)",
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
++            cluster.forEach(i -> i.flush(KEYSPACE));
++            cluster.forEach(i -> assertNotRepaired());
++
++            // mark everything on node 2 repaired
++            cluster.get(2).runOnInstance(markAllRepaired());
++            cluster.get(2).runOnInstance(assertRepaired());
 +
-             cluster.get(1).runOnInstance(() ->
-                                          Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
-             );
-             cluster.get(2).runOnInstance(() ->
-                                          Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
-             );
- 
-             cluster.get(1).runOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
-             );
-             cluster.get(2).runOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
-             );
- 
-             cluster.get(2).runOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::markRepaired)
-             );
- 
- 
-             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
-             cluster.get(1).runOnInstance(() ->
-               Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
-             );
-             cluster.get(2).runOnInstance(() ->
-               Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertRepaired)
-             );
- 
-             long ccBefore = cluster.get(1).callOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
-             );
- 
-             cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
-             long ccAfter = cluster.get(1).callOnInstance(() ->
-                 Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
-             );
++            // insert more data on node1 to generate an initial mismatch
++            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
++            cluster.get(1).runOnInstance(assertNotRepaired());
 +
++            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
++            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
++            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
 +            Assert.assertEquals("confirmed count should differ by 1 after range read", ccBefore + 1, ccAfter);
 +        }
 +    }
 +
 +    @Test
 +    public void testPurgeableTombstonesAreIgnored() throws Throwable
 +    {
 +        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
 +        {
- 
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +            });
 +
-             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
++            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
 +            // on node1 only insert some tombstones, then flush
 +            for (int i = 0; i < 10; i++)
 +            {
-                 cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
++                cluster.get(1).executeInternal("DELETE v1 FROM " + KS_TABLE + " USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
 +            }
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
++            cluster.get(1).flush(KEYSPACE);
 +
 +            // insert data on both nodes and flush
 +            for (int i = 0; i < 10; i++)
 +            {
-                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
-             cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
++            cluster.forEach(i -> i.flush(KEYSPACE));
 +
 +            // nothing is repaired yet
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
-             cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
++            cluster.forEach(i -> assertNotRepaired());
 +            // mark everything repaired
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
-             cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
-             cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
-             cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
++            cluster.forEach(i -> markAllRepaired());
++            cluster.forEach(i -> assertRepaired());
 +
 +            // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
 +            for (int i = 0; i < 10; i++)
 +            {
-                 cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
++                cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
 +            }
 +
-             long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
++            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
 +            // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
 +            TimeUnit.SECONDS.sleep(2);
-             cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL);
-             long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
++            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
++            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
 +
 +            Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
 +        }
 +    }
 +
-     private void assertNotRepaired(SSTableReader reader) {
-         Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE);
-     }
++    @Test
++    public void testSnapshottingOnInconsistency() throws Throwable
++    {
++        try (Cluster cluster = init(Cluster.create(2)))
++        {
++            cluster.get(1).runOnInstance(() -> {
++                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
++            });
++
++            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v INT, PRIMARY KEY (k,c))");
++            for (int i = 0; i < 10; i++)
++            {
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
++                                               ConsistencyLevel.ALL, i, i);
++            }
++            cluster.forEach(c -> c.flush(KEYSPACE));
 +
-     private void assertRepaired(SSTableReader reader) {
-         Assert.assertTrue("repaired at is not set for sstable: " + reader.descriptor, getRepairedAt(reader) > 0);
++            for (int i = 10; i < 20; i++)
++            {
++                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
++                                               ConsistencyLevel.ALL, i, i);
++            }
++            cluster.forEach(c -> c.flush(KEYSPACE));
++            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
++            // Mark everything repaired on node2
++            cluster.get(2).runOnInstance(markAllRepaired());
++            cluster.get(2).runOnInstance(assertRepaired());
++
++            // now overwrite on node1 only to generate digest mismatches
++            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 55);
++            cluster.get(1).runOnInstance(assertNotRepaired());
++
++            // Execute a partition read and assert inconsistency is detected (as nothing is repaired on node1)
++            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
++            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
++            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
++            Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 1, ccAfter);
++
++            String snapshotName = SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX
++                                  + DateTimeFormatter.BASIC_ISO_DATE.format(LocalDate.now());
++
++            cluster.forEach(i -> i.runOnInstance(assertSnapshotNotPresent(snapshotName)));
++
++            // re-introduce a mismatch, enable snapshotting and try again
++            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 555);
++            cluster.get(1).runOnInstance(() -> {
++                StorageProxy.instance.enableSnapshotOnRepairedDataMismatch();
++            });
++
++            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
++            ccAfter = getConfirmedInconsistencies(cluster.get(1));
++            Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 2, ccAfter);
++
++            cluster.forEach(i -> i.runOnInstance(assertSnapshotPresent(snapshotName)));
++        }
 +    }
 +
-     private long getRepairedAt(SSTableReader reader)
++    private IIsolatedExecutor.SerializableRunnable assertNotRepaired()
 +    {
-         Descriptor descriptor = reader.descriptor;
-         try
++        return () ->
 +        {
-             Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
-                                                                       .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++            try
++            {
++                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++                                                           .getColumnFamilyStore(TABLE)
++                                                           .getLiveSSTables()
++                                                           .iterator();
++                while (sstables.hasNext())
++                {
++                    SSTableReader sstable = sstables.next();
++                    Descriptor descriptor = sstable.descriptor;
++                    Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
++                                                                              .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++
++                    StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
++                    Assert.assertEquals("repaired at is set for sstable: " + descriptor,
++                                        stats.repairedAt,
++                                        ActiveRepairService.UNREPAIRED_SSTABLE);
++                }
++            } catch (IOException e) {
++                throw new RuntimeException(e);
++            }
++        };
++    }
 +
-             StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
-             return stats.repairedAt;
-         } catch (IOException e) {
-             throw new RuntimeException(e);
-         }
++    private IIsolatedExecutor.SerializableRunnable markAllRepaired()
++    {
++        return () ->
++        {
++            try
++            {
++                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++                                                           .getColumnFamilyStore(TABLE)
++                                                           .getLiveSSTables()
++                                                           .iterator();
++                while (sstables.hasNext())
++                {
++                    SSTableReader sstable = sstables.next();
++                    Descriptor descriptor = sstable.descriptor;
++                    descriptor.getMetadataSerializer()
++                              .mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
++                    sstable.reloadSSTableMetadata();
++                }
++            } catch (IOException e) {
++                throw new RuntimeException(e);
++            }
++        };
++    }
 +
++    private IIsolatedExecutor.SerializableRunnable assertRepaired()
++    {
++        return () ->
++        {
++            try
++            {
++                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++                                                           .getColumnFamilyStore(TABLE)
++                                                           .getLiveSSTables()
++                                                           .iterator();
++                while (sstables.hasNext())
++                {
++                    SSTableReader sstable = sstables.next();
++                    Descriptor descriptor = sstable.descriptor;
++                    Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
++                                                                              .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++
++                    StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
++                    Assert.assertTrue("repaired at is not set for sstable: " + descriptor, stats.repairedAt > 0);
++                }
++            }
++            catch (IOException e)
++            {
++                throw new RuntimeException(e);
++            }
++        };
 +    }
 +
-     private void markRepaired(SSTableReader reader) {
-         Descriptor descriptor = reader.descriptor;
-         try
++    private IInvokableInstance.SerializableRunnable assertSnapshotPresent(String snapshotName)
++    {
++        return () ->
 +        {
-             descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
-             reader.reloadSSTableMetadata();
-         } catch (IOException e) {
-             throw new RuntimeException(e);
-         }
++            // snapshots are taken asynchronously, this is crude but it gives it a chance to happen
++            int attempts = 100;
++            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
++
++            while (cfs.getSnapshotDetails().isEmpty())
++            {
++                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++                if (attempts-- < 0)
++                    throw new AssertionError(String.format("Snapshot %s not found for for %s", snapshotName, KS_TABLE));
++            }
++        };
++    }
 +
++    private IInvokableInstance.SerializableRunnable assertSnapshotNotPresent(String snapshotName)
++    {
++        return () ->
++        {
++            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
++            Assert.assertFalse(cfs.snapshotExists(snapshotName));
++        };
 +    }
 +
++    private long getConfirmedInconsistencies(IInvokableInstance instance)
++    {
++        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
++                                                     .getColumnFamilyStore(TABLE)
++                                             .metric
++                                             .confirmedRepairedInconsistencies
++                                             .table
++                                             .getCount());
++    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 226331c,75e5ba9..a547c76
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -1,23 -1,7 +1,25 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
  package org.apache.cassandra.distributed.test;
  
+ import java.util.Set;
+ 
  import org.junit.Assert;
  import org.junit.Test;
  
@@@ -27,17 -10,14 +29,18 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.ConsistencyLevel;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.metrics.ReadRepairMetrics;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
  
 -import static org.junit.Assert.assertEquals;
 -
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
  import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 +import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
 +import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
 +import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP;
 +import static org.junit.Assert.fail;
  
  // TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
 -public class SimpleReadWriteTest extends SharedClusterTestBase
 +public class SimpleReadWriteTest extends TestBaseImpl
  {
      @Test
      public void coordinatorReadTest() throws Throwable
@@@ -392,24 -258,118 +395,122 @@@
      @Test
      public void metricsCountQueriesTest() throws Throwable
      {
 -        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 -        for (int i = 0; i < 100; i++)
 -            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
 -
 -        long readCount1 = readCount((IInvokableInstance) cluster.get(1));
 -        long readCount2 = readCount((IInvokableInstance) cluster.get(2));
 -        for (int i = 0; i < 100; i++)
 -            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
 -
 -        readCount1 = readCount((IInvokableInstance) cluster.get(1)) - readCount1;
 -        readCount2 = readCount((IInvokableInstance) cluster.get(2)) - readCount2;
 -        assertEquals(readCount1, readCount2);
 -        assertEquals(100, readCount1);
 +        try (ICluster<IInvokableInstance> cluster = init(Cluster.create(2)))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +            for (int i = 0; i < 100; i++)
 +                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
 +
 +            long readCount1 = readCount(cluster.get(1));
 +            long readCount2 = readCount(cluster.get(2));
 +            for (int i = 0; i < 100; i++)
 +                cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
 +
 +            readCount1 = readCount(cluster.get(1)) - readCount1;
 +            readCount2 = readCount(cluster.get(2)) - readCount2;
 +            Assert.assertEquals(readCount1, readCount2);
 +            Assert.assertEquals(100, readCount1);
 +        }
      }
  
+ 
+     @Test
+     public void skippedSSTableWithPartitionDeletionTest() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(2)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+             // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+             cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+             // and a row from a different partition, to provide the sstable's min/max clustering
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2");
+             cluster.get(1).flush(KEYSPACE);
+             // expect a single sstable, where minTimestamp equals the timestamp of the partition delete
+             cluster.get(1).runOnInstance(() -> {
+                 Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
+                                                       .getColumnFamilyStore("tbl")
+                                                       .getLiveSSTables();
 -                assertEquals(1, sstables.size());
 -                assertEquals(1, sstables.iterator().next().getMinTimestamp());
++                assertEquals("Expected a single sstable, but found " + sstables.size(), 1, sstables.size());
++                long minTimestamp = sstables.iterator().next().getMinTimestamp();
++                assertEquals("Expected min timestamp of 1, but was " + minTimestamp, 1, minTimestamp);
+             });
+ 
+             // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+ 
+ 
+             Object[][] rows = cluster.coordinator(1)
+                                      .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+                                               ConsistencyLevel.ALL);
 -            assertEquals(0, rows.length);
++            assertEquals("Expected 0 rows, but found " + rows.length, 0, rows.length);
+         }
+     }
+ 
+     @Test
+     public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(2)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+             // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+             cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+             // and a row from a different partition, to provide the sstable's min/max clustering
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1");
+             cluster.get(1).flush(KEYSPACE);
+             // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+             // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly,
+             // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first
+             // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+             // know what data and/or tombstones are present on other nodes
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+             cluster.get(1).flush(KEYSPACE);
+ 
+             // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+ 
+             Object[][] rows = cluster.coordinator(1)
+                                      .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+                                               ConsistencyLevel.ALL);
+             // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+             // node 1 (0, 6, 6) was not.
+             assertRows(rows, new Object[] {0, 6 ,6});
+         }
+     }
+ 
+     @Test
+     public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable
+     {
+         // don't not add skipped sstables back just because the partition delete ts is < the local min ts
+ 
+         try (Cluster cluster = init(Cluster.create(2)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+             // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+             cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+             // and a row from a different partition, to provide the sstable's min/max clustering
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3");
+             cluster.get(1).flush(KEYSPACE);
+             // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+             // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable
+             // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the
+             // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would  cause the
+             // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+             // know what data and/or tombstones are present on other nodes
+             cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+             cluster.get(1).flush(KEYSPACE);
+ 
+             // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+ 
+             Object[][] rows = cluster.coordinator(1)
+                                      .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+                                               ConsistencyLevel.ALL);
+             // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+             // node 1 (0, 6, 6) was not.
+             assertRows(rows, new Object[] {0, 6 ,6});
+         }
+     }
+ 
      private long readCount(IInvokableInstance instance)
      {
          return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
diff --cc test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
index c916d13,0000000..169e09d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
@@@ -1,291 -1,0 +1,293 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.net.UnknownHostException;
 +import java.util.Random;
 +
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
++import org.apache.cassandra.db.Slices;
++import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class RepairedDataVerifierTest
 +{
 +    private static final String TEST_NAME = "read_command_vh_test_";
 +    private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
 +    private static final String TABLE = "table1";
 +
 +    private final Random random = new Random();
 +    private TableMetadata metadata;
 +    private TableMetrics metrics;
 +
 +    // counter to generate the last byte of peer addresses
 +    private int addressSuffix = 10;
 +
 +    @BeforeClass
 +    public static void init()
 +    {
 +        SchemaLoader.loadSchema();
 +        SchemaLoader.schemaDefinition(TEST_NAME);
 +        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
 +    }
 +
 +    @Before
 +    public void setup()
 +    {
 +        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
 +        metrics = ColumnFamilyStore.metricsFor(metadata.id);
 +    }
 +
 +    @Test
 +    public void repairedDataMismatchWithSomeConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount + 1 , unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMismatchWithNoneConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount + 1 , unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMismatchWithAllConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount + 1, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMatchesWithAllConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMatchesWithSomeConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void repairedDataMatchesWithNoneConclusive()
 +    {
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
 +        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void allEmptyDigestWithAllConclusive()
 +    {
 +        // if a read didn't touch any repaired sstables, digests will be empty
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void allEmptyDigestsWithSomeConclusive()
 +    {
 +        // if a read didn't touch any repaired sstables, digests will be empty
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
 +        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void allEmptyDigestsWithNoneConclusive()
 +    {
 +        // if a read didn't touch any repaired sstables, digests will be empty
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        InetAddressAndPort peer1 = peer();
 +        InetAddressAndPort peer2 = peer();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
 +        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
 +
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    @Test
 +    public void noTrackingDataRecorded()
 +    {
 +        // if a read didn't land on any replicas which support repaired data tracking, nothing will be recorded
 +        long confirmedCount =  confirmedCount();
 +        long unconfirmedCount =  unconfirmedCount();
 +        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
 +        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
 +        tracker.verify();
 +        assertEquals(confirmedCount, confirmedCount());
 +        assertEquals(unconfirmedCount, unconfirmedCount());
 +    }
 +
 +    private long confirmedCount()
 +    {
 +        return metrics.confirmedRepairedInconsistencies.table.getCount();
 +    }
 +
 +    private long unconfirmedCount()
 +    {
 +        return metrics.unconfirmedRepairedInconsistencies.table.getCount();
 +    }
 +
 +    private InetAddressAndPort peer()
 +    {
 +        try
 +        {
 +            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
 +        }
 +        catch (UnknownHostException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private int key()
 +    {
 +        return random.nextInt();
 +    }
 +
 +    private ReadCommand command(int key)
 +    {
 +        return new StubReadCommand(key, metadata, false);
 +    }
 +
 +    private static class StubReadCommand extends SinglePartitionReadCommand
 +    {
 +        StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
 +        {
 +            super(isDigest,
 +                  0,
 +                  false,
 +                  metadata,
 +                  FBUtilities.nowInSeconds(),
 +                  ColumnFilter.all(metadata),
 +                  RowFilter.NONE,
 +                  DataLimits.NONE,
 +                  metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
-                   null,
++                  new ClusteringIndexSliceFilter(Slices.ALL, false),
 +                  null);
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org