You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/20 07:08:21 UTC

[cassandra] branch trunk updated (fdcd0df -> 97aeff6)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from fdcd0df  Fixed non-deterministic test in CasWriteTest
     new 4d42c18  Avoid creating duplicate rows during major upgrades
     new e1a0db7  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 97aeff6  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |  18 ++
 .../cassandra/config/DatabaseDescriptor.java       |  30 +++
 .../db/compaction/CompactionIterator.java          |   3 +
 .../db/transform/DuplicateRowChecker.java          | 139 ++++++++++++
 .../apache/cassandra/repair/RepairRunnable.java    |  13 +-
 .../cassandra/service/SnapshotVerbHandler.java     |  60 +-----
 .../org/apache/cassandra/service/StorageProxy.java |  54 +++++
 .../cassandra/service/StorageProxyMBean.java       |  11 +
 .../service/reads/AbstractReadExecutor.java        |   3 +-
 .../service/reads/repair/RepairedDataVerifier.java |  31 +--
 .../cassandra/utils/DiagnosticSnapshotService.java | 199 ++++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       |   2 +
 .../distributed/test/RepairDigestTrackingTest.java |   4 +-
 .../upgrade/MixedModeReadRepairTest.java           |   2 +-
 .../db/compaction/CompactionIteratorTest.java      |  64 +++++-
 .../db/transform/DuplicateRowCheckerTest.java      | 234 +++++++++++++++++++++
 17 files changed, 769 insertions(+), 99 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
 create mode 100644 src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
 create mode 100644 test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 97aeff6bd66c03d1304cab8334c14e7a19049889
Merge: fdcd0df e1a0db7
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed May 20 08:50:15 2020 +0200

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |  18 ++
 .../cassandra/config/DatabaseDescriptor.java       |  30 +++
 .../db/compaction/CompactionIterator.java          |   3 +
 .../db/transform/DuplicateRowChecker.java          | 139 ++++++++++++
 .../apache/cassandra/repair/RepairRunnable.java    |  13 +-
 .../cassandra/service/SnapshotVerbHandler.java     |  60 +-----
 .../org/apache/cassandra/service/StorageProxy.java |  54 +++++
 .../cassandra/service/StorageProxyMBean.java       |  11 +
 .../service/reads/AbstractReadExecutor.java        |   3 +-
 .../service/reads/repair/RepairedDataVerifier.java |  31 +--
 .../cassandra/utils/DiagnosticSnapshotService.java | 199 ++++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       |   2 +
 .../distributed/test/RepairDigestTrackingTest.java |   4 +-
 .../upgrade/MixedModeReadRepairTest.java           |   2 +-
 .../db/compaction/CompactionIteratorTest.java      |  64 +++++-
 .../db/transform/DuplicateRowCheckerTest.java      | 234 +++++++++++++++++++++
 17 files changed, 769 insertions(+), 99 deletions(-)

diff --cc CHANGES.txt
index 56212f9,3506589..f181c32
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,8 +1,36 @@@
 -3.11.7
 +4.0-alpha5
 + * Add isTransient to SSTableMetadataView (CASSANDRA-15806)
 + * Fix tools/bin/fqltool for all shells (CASSANDRA-15820)
 + * Fix clearing of legacy size_estimates (CASSANDRA-15776)
 + * Update port when reconnecting to pre-4.0 SSL storage (CASSANDRA-15727)
 + * Only calculate dynamicBadnessThreshold once per loop in DynamicEndpointSnitch (CASSANDRA-15798)
 + * Cleanup redundant nodetool commands added in 4.0 (CASSANDRA-15256)
 + * Update to Python driver 3.23 for cqlsh (CASSANDRA-15793)
 + * Add tunable initial size and growth factor to RangeTombstoneList (CASSANDRA-15763)
 + * Improve debug logging in SSTableReader for index summary (CASSANDRA-15755)
 + * bin/sstableverify should support user provided token ranges (CASSANDRA-15753)
 + * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781)
 + * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
 + * Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726)
 + * Avoid race condition when completing stream sessions (CASSANDRA-15666)
 + * Flush with fast compressors by default (CASSANDRA-15379)
 + * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637)
 + * Allow sending Entire SSTables over SSL (CASSANDRA-15740)
 + * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739)
 + * Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730)
 + * Fix regression with traceOutgoingMessage printing message size (CASSANDRA-15687)
 + * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601)
 + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
 + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
 + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
 + * Improve logging around incremental repair (CASSANDRA-15599)
 + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688)
 + * Replace array iterators with get by index (CASSANDRA-15394)
 + * Minimize BTree iterator allocations (CASSANDRA-15389)
 +Merged from 3.11:
   * Fix CQL formatting of read command restrictions for slow query log (CASSANDRA-15503)
 - * Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
  Merged from 3.0:
+  * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789)
   * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
   * Fix Debian init start/stop (CASSANDRA-15770)
   * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242)
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 85a107f,86d2287..5c753e0
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -2888,225 -2585,39 +2888,255 @@@ public class DatabaseDescripto
          return backPressureStrategy;
      }
  
 -    public static boolean strictRuntimeChecks()
 +    public static ConsistencyLevel getIdealConsistencyLevel()
      {
 -        return strictRuntimeChecks;
 +        return conf.ideal_consistency_level;
 +    }
 +
 +    public static void setIdealConsistencyLevel(ConsistencyLevel cl)
 +    {
 +        conf.ideal_consistency_level = cl;
 +    }
 +
 +    public static int getRepairCommandPoolSize()
 +    {
 +        return conf.repair_command_pool_size;
 +    }
 +
 +    public static Config.RepairCommandPoolFullStrategy getRepairCommandPoolFullStrategy()
 +    {
 +        return conf.repair_command_pool_full_strategy;
 +    }
 +
 +    public static FullQueryLoggerOptions getFullQueryLogOptions()
 +    {
 +        return  conf.full_query_logging_options;
 +    }
 +
 +    public static boolean getBlockForPeersInRemoteDatacenters()
 +    {
 +        return conf.block_for_peers_in_remote_dcs;
 +    }
 +
 +    public static int getBlockForPeersTimeoutInSeconds()
 +    {
 +        return conf.block_for_peers_timeout_in_secs;
 +    }
 +
 +    public static boolean automaticSSTableUpgrade()
 +    {
 +        return conf.automatic_sstable_upgrade;
 +    }
 +
 +    public static void setAutomaticSSTableUpgradeEnabled(boolean enabled)
 +    {
 +        if (conf.automatic_sstable_upgrade != enabled)
 +            logger.debug("Changing automatic_sstable_upgrade to {}", enabled);
 +        conf.automatic_sstable_upgrade = enabled;
 +    }
 +
 +    public static int maxConcurrentAutoUpgradeTasks()
 +    {
 +        return conf.max_concurrent_automatic_sstable_upgrades;
 +    }
 +
 +    public static void setMaxConcurrentAutoUpgradeTasks(int value)
 +    {
 +        if (conf.max_concurrent_automatic_sstable_upgrades != value)
 +            logger.debug("Changing max_concurrent_automatic_sstable_upgrades to {}", value);
 +        validateMaxConcurrentAutoUpgradeTasksConf(value);
 +        conf.max_concurrent_automatic_sstable_upgrades = value;
 +    }
 +
 +    private static void validateMaxConcurrentAutoUpgradeTasksConf(int value)
 +    {
 +        if (value < 0)
 +            throw new ConfigurationException("max_concurrent_automatic_sstable_upgrades can't be negative");
 +        if (value > getConcurrentCompactors())
 +            logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
 +    }
 +    
 +    public static AuditLogOptions getAuditLoggingOptions()
 +    {
 +        return conf.audit_logging_options;
 +    }
 +
 +    public static void setAuditLoggingOptions(AuditLogOptions auditLoggingOptions)
 +    {
 +        conf.audit_logging_options = auditLoggingOptions;
 +    }
 +
 +    public static Config.CorruptedTombstoneStrategy getCorruptedTombstoneStrategy()
 +    {
 +        return conf.corrupted_tombstone_strategy;
 +    }
 +
 +    public static void setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy strategy)
 +    {
 +        conf.corrupted_tombstone_strategy = strategy;
 +    }
 +
 +    public static boolean getRepairedDataTrackingForRangeReadsEnabled()
 +    {
 +        return conf.repaired_data_tracking_for_range_reads_enabled;
 +    }
 +
 +    public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled)
 +    {
 +        conf.repaired_data_tracking_for_range_reads_enabled = enabled;
 +    }
 +
 +    public static boolean getRepairedDataTrackingForPartitionReadsEnabled()
 +    {
 +        return conf.repaired_data_tracking_for_partition_reads_enabled;
 +    }
 +
 +    public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled)
 +    {
 +        conf.repaired_data_tracking_for_partition_reads_enabled = enabled;
 +    }
 +
 +    public static boolean snapshotOnRepairedDataMismatch()
 +    {
 +        return conf.snapshot_on_repaired_data_mismatch;
 +    }
 +
 +    public static void setSnapshotOnRepairedDataMismatch(boolean enabled)
 +    {
 +        conf.snapshot_on_repaired_data_mismatch = enabled;
      }
  
+     public static boolean snapshotOnDuplicateRowDetection()
+     {
+         return conf.snapshot_on_duplicate_row_detection;
+     }
+ 
+     public static void setSnapshotOnDuplicateRowDetection(boolean enabled)
+     {
+         conf.snapshot_on_duplicate_row_detection = enabled;
+     }
+ 
 +    public static boolean reportUnconfirmedRepairedDataMismatches()
 +    {
 +        return conf.report_unconfirmed_repaired_data_mismatches;
 +    }
 +
 +    public static void reportUnconfirmedRepairedDataMismatches(boolean enabled)
 +    {
 +        conf.report_unconfirmed_repaired_data_mismatches = enabled;
 +    }
 +
 +    public static boolean strictRuntimeChecks()
 +    {
 +        return strictRuntimeChecks;
 +    }
 +
 +    public static boolean useOffheapMerkleTrees()
 +    {
 +        return conf.use_offheap_merkle_trees;
 +    }
 +
 +    public static void useOffheapMerkleTrees(boolean value)
 +    {
 +        logger.info("Setting use_offheap_merkle_trees to {}", value);
 +        conf.use_offheap_merkle_trees = value;
 +    }
 +
 +    public static Function<CommitLog, AbstractCommitLogSegmentManager> getCommitLogSegmentMgrProvider()
 +    {
 +        return commitLogSegmentMgrProvider;
 +    }
 +
 +    public static void setCommitLogSegmentMgrProvider(Function<CommitLog, AbstractCommitLogSegmentManager> provider)
 +    {
 +        commitLogSegmentMgrProvider = provider;
 +    }
 +
 +    /**
 +     * Class that primarily tracks overflow thresholds during conversions
 +     */
 +    private enum ByteUnit {
 +        KIBI_BYTES(2048 * 1024, 1024),
 +        MEBI_BYTES(2048, 1024 * 1024);
 +
 +        private final int overflowThreshold;
 +        private final int multiplier;
 +
 +        ByteUnit(int t, int m)
 +        {
 +            this.overflowThreshold = t;
 +            this.multiplier = m;
 +        }
 +
 +        public int overflowThreshold()
 +        {
 +            return overflowThreshold;
 +        }
 +
 +        public boolean willOverflowInBytes(int val)
 +        {
 +            return val >= overflowThreshold;
 +        }
 +
 +        public long toBytes(int val)
 +        {
 +            return val * multiplier;
 +        }
 +    }
 +
 +    /**
 +     * Ensures passed in configuration value is positive and will not overflow when converted to Bytes
 +     */
 +    private static void checkValidForByteConversion(int val, final String name, final ByteUnit unit)
 +    {
 +        if (val < 0 || unit.willOverflowInBytes(val))
 +            throw new ConfigurationException(String.format("%s must be positive value < %d, but was %d",
 +                                                           name, unit.overflowThreshold(), val), false);
 +    }
 +
 +    public static int getValidationPreviewPurgeHeadStartInSec()
 +    {
 +        int seconds = conf.validation_preview_purge_head_start_in_sec;
 +        return Math.max(seconds, 0);
 +    }
 +
+     public static boolean checkForDuplicateRowsDuringReads()
+     {
+         return conf.check_for_duplicate_rows_during_reads;
+     }
+ 
+     public static void setCheckForDuplicateRowsDuringReads(boolean enabled)
+     {
+         conf.check_for_duplicate_rows_during_reads = enabled;
+     }
+ 
+     public static boolean checkForDuplicateRowsDuringCompaction()
+     {
+         return conf.check_for_duplicate_rows_during_compaction;
+     }
+ 
+     public static void setCheckForDuplicateRowsDuringCompaction(boolean enabled)
+     {
+         conf.check_for_duplicate_rows_during_compaction = enabled;
+     }
+ 
 +    public static int getInitialRangeTombstoneListAllocationSize()
 +    {
 +        return conf.initial_range_tombstone_list_allocation_size;
 +    }
 +
 +    public static void setInitialRangeTombstoneListAllocationSize(int size)
 +    {
 +        conf.initial_range_tombstone_list_allocation_size = size;
 +    }
 +
 +    public static double getRangeTombstoneListGrowthFactor()
 +    {
 +        return conf.range_tombstone_list_growth_factor;
 +    }
 +
 +    public static void setRangeTombstoneListGrowthFactor(double resizeFactor)
 +    {
 +        conf.range_tombstone_list_growth_factor = resizeFactor;
 +    }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index ce3bb6c,4460d4d..d83d0bb
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -18,13 -18,13 +18,15 @@@
  package org.apache.cassandra.db.compaction;
  
  import java.util.*;
 -import java.util.function.Predicate;
 +import java.util.function.LongPredicate;
  
 +import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Ordering;
  
 -import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.TableMetadata;
+ 
+ import org.apache.cassandra.db.transform.DuplicateRowChecker;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.partitions.PurgeFunction;
@@@ -97,21 -97,28 +99,22 @@@ public class CompactionIterator extend
              bytes += scanner.getLengthInBytes();
          this.totalBytes = bytes;
          this.mergeCounters = new long[scanners.size()];
 -        this.metrics = metrics;
 -
 -        if (metrics != null)
 -            metrics.beginCompaction(this);
 +        this.activeCompactions = activeCompactions == null ? ActiveCompactionsTracker.NOOP : activeCompactions;
 +        this.activeCompactions.beginCompaction(this); // note that CompactionTask also calls this, but CT only creates CompactionIterator with a NOOP ActiveCompactions
  
          UnfilteredPartitionIterator merged = scanners.isEmpty()
 -                                             ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
 -                                             : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
 -        boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
 -        merged = Transformation.apply(merged, new GarbageSkipper(controller, nowInSec));
 -        merged = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
 -        this.compacted = DuplicateRowChecker.duringCompaction(merged, type);
 +                                           ? EmptyIterators.unfilteredPartition(controller.cfs.metadata())
 +                                           : UnfilteredPartitionIterators.merge(scanners, listener());
 +        merged = Transformation.apply(merged, new GarbageSkipper(controller));
 +        merged = Transformation.apply(merged, new Purger(controller, nowInSec));
++        merged = DuplicateRowChecker.duringCompaction(merged, type);
 +        compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
 +        sstables = scanners.stream().map(ISSTableScanner::getBackingSSTables).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
      }
  
 -    public boolean isForThrift()
 +    public TableMetadata metadata()
      {
 -        return false;
 -    }
 -
 -    public CFMetaData metadata()
 -    {
 -        return controller.cfs.metadata;
 +        return controller.cfs.metadata();
      }
  
      public CompactionInfo getCompactionInfo()
diff --cc src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
index 0000000,7a6f7f9..aa1305a
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
+++ b/src/java/org/apache/cassandra/db/transform/DuplicateRowChecker.java
@@@ -1,0 -1,139 +1,139 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.transform;
+ 
 -import java.net.InetAddress;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.compaction.OperationType;
+ import org.apache.cassandra.db.partitions.PartitionIterator;
+ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+ import org.apache.cassandra.db.rows.*;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.utils.DiagnosticSnapshotService;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ public class DuplicateRowChecker extends Transformation<BaseRowIterator<?>>
+ {
+     private static final Logger logger = LoggerFactory.getLogger(DuplicateRowChecker.class);
+ 
+     Clustering previous = null;
+     int duplicatesDetected = 0;
+ 
+     final String stage;
 -    final List<InetAddress> replicas;
 -    final CFMetaData metadata;
++    final List<InetAddressAndPort> replicas;
++    final TableMetadata metadata;
+     final DecoratedKey key;
+     final boolean snapshotOnDuplicate;
+ 
+     DuplicateRowChecker(final DecoratedKey key,
 -                        final CFMetaData metadata,
++                        final TableMetadata metadata,
+                         final String stage,
+                         final boolean snapshotOnDuplicate,
 -                        final List<InetAddress> replicas)
++                        final List<InetAddressAndPort> replicas)
+     {
+         this.key = key;
+         this.metadata = metadata;
+         this.stage = stage;
+         this.snapshotOnDuplicate = snapshotOnDuplicate;
+         this.replicas = replicas;
+     }
+ 
+     protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+     {
+         return deletionTime;
+     }
+ 
+     protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+     {
+         return marker;
+     }
+ 
+     protected Row applyToStatic(Row row)
+     {
+         return row;
+     }
+ 
+     protected Row applyToRow(Row row)
+     {
+         if (null != previous && row.clustering().equals(previous))
+             duplicatesDetected++;
+         previous = row.clustering();
+         return row;
+     }
+ 
+     protected void onPartitionClose()
+     {
+         if (duplicatesDetected > 0)
+         {
+             logger.warn("Detected {} duplicate rows for {} during {}",
+                         duplicatesDetected,
 -                        metadata.getKeyValidator().getString(key.getKey()),
++                        metadata.partitionKeyType.getString(key.getKey()),
+                         stage);
+             if (snapshotOnDuplicate)
+                 DiagnosticSnapshotService.duplicateRows(metadata, replicas);
+         }
+         duplicatesDetected = 0;
+         previous = null;
+         super.onPartitionClose();
+     }
+ 
+     public static UnfilteredPartitionIterator duringCompaction(final UnfilteredPartitionIterator iterator, OperationType type)
+     {
+         if (!DatabaseDescriptor.checkForDuplicateRowsDuringCompaction())
+             return iterator;
 -        final List<InetAddress> address = Collections.singletonList(FBUtilities.getBroadcastAddress());
++        final List<InetAddressAndPort> address = Collections.singletonList(FBUtilities.getBroadcastAddressAndPort());
+         final boolean snapshot = DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+         return Transformation.apply(iterator, new Transformation<UnfilteredRowIterator>()
+         {
+             protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+             {
+                 return Transformation.apply(partition, new DuplicateRowChecker(partition.partitionKey(),
+                                                                                partition.metadata(),
+                                                                                type.toString(),
+                                                                                snapshot,
+                                                                                address));
+             }
+         });
+     }
+ 
 -    public static PartitionIterator duringRead(final PartitionIterator iterator, final List<InetAddress> replicas)
++    public static PartitionIterator duringRead(final PartitionIterator iterator, final List<InetAddressAndPort> replicas)
+     {
+         if (!DatabaseDescriptor.checkForDuplicateRowsDuringReads())
+             return iterator;
+         final boolean snapshot = DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+         return Transformation.apply(iterator, new Transformation<RowIterator>()
+         {
+             protected RowIterator applyToPartition(RowIterator partition)
+             {
+                 return Transformation.apply(partition, new DuplicateRowChecker(partition.partitionKey(),
+                                                                                partition.metadata(),
+                                                                                "Read",
+                                                                                snapshot,
+                                                                                replicas));
+             }
+         });
+     }
+ }
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index ae35aaf,35794e2..e5e8e50
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -89,7 -54,8 +89,8 @@@ import org.apache.cassandra.tracing.Tra
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.DiagnosticSnapshotService;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.UUIDGen;
  import org.apache.cassandra.utils.WrappedRunnable;
  import org.apache.cassandra.utils.progress.ProgressEvent;
@@@ -555,202 -329,45 +556,196 @@@ public class RepairRunnable implements 
  
              public void onFailure(Throwable t)
              {
 -                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
 -                SystemDistributedKeyspace.failParentRepair(parentSession, t);
 -                repairComplete();
 +                notifyError(t);
 +                fail("Error completing preview repair: " + t.getMessage());
 +                executor.shutdownNow();
              }
 +        }, MoreExecutors.directExecutor());
 +    }
 +
 +    private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results)
 +    {
 +        if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
 +            return;
  
 -            private void repairComplete()
 +        try
 +        {
 +            Set<String> mismatchingTables = new HashSet<>();
 +            Set<InetAddressAndPort> nodes = new HashSet<>();
 +            for (RepairSessionResult sessionResult : results)
              {
 -                String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
 -                                                                          true, true);
 -                String message = String.format("Repair command #%d finished in %s", cmd, duration);
 -                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
 -                logger.info(message);
 -                if (options.isTraced() && traceState != null)
 +                for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
                  {
 -                    for (ProgressListener listener : listeners)
 -                        traceState.removeProgressListener(listener);
 -                    // Because DebuggableThreadPoolExecutor#afterExecute and this callback
 -                    // run in a nondeterministic order (within the same thread), the
 -                    // TraceState may have been nulled out at this point. The TraceState
 -                    // should be traceState, so just set it without bothering to check if it
 -                    // actually was nulled out.
 -                    Tracing.instance.set(traceState);
 -                    Tracing.traceRepair(message);
 -                    Tracing.instance.stopSession();
 +                    for (SyncStat stat : emptyIfNull(repairResult.stats))
 +                    {
 +                        if (stat.numberOfDifferences > 0)
 +                            mismatchingTables.add(repairResult.desc.columnFamily);
 +                        // snapshot all replicas, even if they don't have any differences
 +                        nodes.add(stat.nodes.coordinator);
 +                        nodes.add(stat.nodes.peer);
 +                    }
                  }
 -                executor.shutdownNow();
              }
 -        });
 +
-             String snapshotName = RepairedDataVerifier.SnapshottingVerifier.getSnapshotName();
++            String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX);
 +            for (String table : mismatchingTables)
 +            {
 +                // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
 +                if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
 +                {
 +                    logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
 +                                options.getPreviewKind().logPrefix(parentSession),
-                                 keyspace, table, snapshotName, nodes)
-                     ;
-                     Message<SnapshotCommand> message = Message.out(Verb.SNAPSHOT_REQ, new SnapshotCommand(keyspace,
-                                                                                                           table,
-                                                                                                           snapshotName,
-                                                                                                           false));
-                     for (InetAddressAndPort target : nodes)
-                         MessagingService.instance().send(message, target);
++                                keyspace, table, snapshotName, nodes);
++                    DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes);
 +                }
 +                else
 +                {
 +                    logger.info("{} Not snapshotting {}.{} - snapshot {} exists",
 +                                options.getPreviewKind().logPrefix(parentSession),
 +                                keyspace, table, snapshotName);
 +                }
 +            }
 +        }
 +        catch (Exception e)
 +        {
 +            logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e);
 +        }
      }
  
 -    private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
 +    private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
      {
 -        for (int i = 0; i < neighborRangeList.size(); i++)
 +        if (iter == null)
 +            return Collections.emptyList();
 +        return iter;
 +    }
 +
 +    private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession,
 +                                                                             boolean isIncremental,
 +                                                                             ListeningExecutorService executor,
 +                                                                             List<CommonRange> commonRanges,
 +                                                                             String... cfnames)
 +    {
 +        List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
 +
 +        // we do endpoint filtering at the start of an incremental repair,
 +        // so repair sessions shouldn't also be checking liveness
 +        boolean force = options.isForcedRepair() && !isIncremental;
 +        for (CommonRange commonRange : commonRanges)
 +        {
 +            logger.info("Starting RepairSession for {}", commonRange);
 +            RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
 +                                                                                     commonRange,
 +                                                                                     keyspace,
 +                                                                                     options.getParallelism(),
 +                                                                                     isIncremental,
 +                                                                                     options.isPullRepair(),
 +                                                                                     force,
 +                                                                                     options.getPreviewKind(),
 +                                                                                     options.optimiseStreams(),
 +                                                                                     executor,
 +                                                                                     cfnames);
 +            if (session == null)
 +                continue;
 +            Futures.addCallback(session, new RepairSessionCallback(session), MoreExecutors.directExecutor());
 +            futures.add(session);
 +        }
 +        return Futures.successfulAsList(futures);
 +    }
 +
 +    private ListeningExecutorService createExecutor()
 +    {
 +        return MoreExecutors.listeningDecorator(new JMXEnabledThreadPoolExecutor(options.getJobThreads(),
 +                                                                                 Integer.MAX_VALUE,
 +                                                                                 TimeUnit.SECONDS,
 +                                                                                 new LinkedBlockingQueue<>(),
 +                                                                                 new NamedThreadFactory("Repair#" + cmd),
 +                                                                                 "internal"));
 +    }
 +
 +    private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
 +    {
 +        private final RepairSession session;
 +
 +        public RepairSessionCallback(RepairSession session)
 +        {
 +            this.session = session;
 +        }
 +
 +        public void onSuccess(RepairSessionResult result)
 +        {
 +            String message = String.format("Repair session %s for range %s finished", session.getId(),
 +                                           session.ranges().toString());
 +            logger.info(message);
 +            fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
 +                                                progressCounter.incrementAndGet(),
 +                                                totalProgress,
 +                                                message));
 +        }
 +
 +        public void onFailure(Throwable t)
          {
 -            Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i);
 +            String message = String.format("Repair session %s for range %s failed with error %s",
 +                                           session.getId(), session.ranges().toString(), t.getMessage());
 +            notifyError(new RuntimeException(message, t));
 +        }
 +    }
  
 -            if (p.left.containsAll(neighbors))
 +    private class RepairCompleteCallback implements FutureCallback<Object>
 +    {
 +        final UUID parentSession;
 +        final Collection<Range<Token>> successfulRanges;
 +        final long startTime;
 +        final TraceState traceState;
 +        final AtomicBoolean hasFailure;
 +        final ExecutorService executor;
 +
 +        public RepairCompleteCallback(UUID parentSession,
 +                                      Collection<Range<Token>> successfulRanges,
 +                                      long startTime,
 +                                      TraceState traceState,
 +                                      AtomicBoolean hasFailure,
 +                                      ExecutorService executor)
 +        {
 +            this.parentSession = parentSession;
 +            this.successfulRanges = successfulRanges;
 +            this.startTime = startTime;
 +            this.traceState = traceState;
 +            this.hasFailure = hasFailure;
 +            this.executor = executor;
 +        }
 +
 +        public void onSuccess(Object result)
 +        {
 +            maybeStoreParentRepairSuccess(successfulRanges);
 +            if (hasFailure.get())
              {
 -                p.right.add(range);
 +                fail(null);
 +            }
 +            else
 +            {
 +                success("Repair completed successfully");
 +            }
 +            executor.shutdownNow();
 +        }
 +
 +        public void onFailure(Throwable t)
 +        {
 +            notifyError(t);
 +            fail(t.getMessage());
 +            executor.shutdownNow();
 +        }
 +    }
 +
 +    private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
 +    {
 +        Set<InetAddressAndPort> endpoints = neighbors.endpoints();
 +        Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints();
 +
 +        for (CommonRange commonRange : neighborRangeList)
 +        {
 +            if (commonRange.matchesEndpoints(endpoints, transEndpoints))
 +            {
 +                commonRange.ranges.add(range);
                  return;
              }
          }
diff --cc src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index 4d8c4df,179abeb..1309d6e
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@@ -23,91 -20,32 +23,37 @@@ import java.util.concurrent.Executors
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.SnapshotCommand;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.IVerbHandler;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.utils.DiagnosticSnapshotService;
  
  public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
  {
 +    public static final SnapshotVerbHandler instance = new SnapshotVerbHandler();
-     public static final String REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX = "RepairedDataMismatch-";
-     private static final Executor REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR = Executors.newSingleThreadExecutor();
- 
      private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
  
 -    public void doVerb(MessageIn<SnapshotCommand> message, int id)
 +    public void doVerb(Message<SnapshotCommand> message)
      {
          SnapshotCommand command = message.payload;
          if (command.clear_snapshot)
          {
              Keyspace.clearSnapshot(command.snapshot_name, command.keyspace);
          }
-         else if (command.snapshot_name.startsWith(REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX))
+         else if (DiagnosticSnapshotService.isDiagnosticSnapshotRequest(command))
          {
-             REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR.execute(new RepairedDataSnapshotTask(command, message.from()));
 -            DiagnosticSnapshotService.snapshot(command, message.from);
++            DiagnosticSnapshotService.snapshot(command, message.from());
          }
          else
 +        {
              Keyspace.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
 -        logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from);
 -        MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
 +        }
 +
 +        logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from());
 +        MessagingService.instance().send(message.emptyResponse(), message.from());
      }
- 
-     private static class RepairedDataSnapshotTask implements Runnable
-     {
-         final SnapshotCommand command;
-         final InetAddressAndPort from;
- 
-         RepairedDataSnapshotTask(SnapshotCommand command, InetAddressAndPort from)
-         {
-             this.command = command;
-             this.from = from;
-         }
- 
-         public void run()
-         {
-             try
-             {
-                 Keyspace ks = Keyspace.open(command.keyspace);
-                 if (ks == null)
-                 {
-                     logger.info("Snapshot request received from {} for {}.{} but keyspace not found",
-                                 from,
-                                 command.keyspace,
-                                 command.column_family);
-                     return;
-                 }
- 
-                 ColumnFamilyStore cfs = ks.getColumnFamilyStore(command.column_family);
-                 if (cfs.snapshotExists(command.snapshot_name))
-                 {
-                     logger.info("Received snapshot request from {} for {}.{} following repaired data mismatch, " +
-                                 "but snapshot with tag {} already exists",
-                                 from,
-                                 command.keyspace,
-                                 command.column_family,
-                                 command.snapshot_name);
-                     return;
-                 }
-                 logger.info("Creating snapshot requested by {} of {}.{} following repaired data mismatch",
-                             from,
-                             command.keyspace,
-                             command.column_family);
-                 cfs.snapshot(command.snapshot_name);
-             }
-             catch (IllegalArgumentException e)
-             {
-                 logger.warn("Snapshot request received from {} for {}.{} but table not found",
-                             from,
-                             command.keyspace,
-                             command.column_family);
-             }
-         }
-     }
  }
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 38e9c44,2ca2c85..89a0d44
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -2764,125 -2813,65 +2764,179 @@@ public class StorageProxy implements St
          return Schema.instance.getNumberOfTables();
      }
  
 +    public String getIdealConsistencyLevel()
 +    {
 +        return DatabaseDescriptor.getIdealConsistencyLevel().toString();
 +    }
 +
 +    public String setIdealConsistencyLevel(String cl)
 +    {
 +        ConsistencyLevel original = DatabaseDescriptor.getIdealConsistencyLevel();
 +        ConsistencyLevel newCL = ConsistencyLevel.valueOf(cl.trim().toUpperCase());
 +        DatabaseDescriptor.setIdealConsistencyLevel(newCL);
 +        return String.format("Updating ideal consistency level new value: %s old value %s", newCL, original.toString());
 +    }
 +
 +    @Deprecated
      public int getOtcBacklogExpirationInterval() {
 -        return DatabaseDescriptor.getOtcBacklogExpirationInterval();
 +        return 0;
 +    }
 +
 +    @Deprecated
 +    public void setOtcBacklogExpirationInterval(int intervalInMillis) { }
 +
 +    @Override
 +    public void enableRepairedDataTrackingForRangeReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
 +    }
 +
 +    @Override
 +    public void disableRepairedDataTrackingForRangeReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
 +    }
 +
 +    @Override
 +    public boolean getRepairedDataTrackingEnabledForRangeReads()
 +    {
 +        return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
 +    }
 +
 +    @Override
 +    public void enableRepairedDataTrackingForPartitionReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
      }
  
 -    public void setOtcBacklogExpirationInterval(int intervalInMillis) {
 -        DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
 +    @Override
 +    public void disableRepairedDataTrackingForPartitionReads()
 +    {
 +        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
 +    }
 +
 +    @Override
 +    public boolean getRepairedDataTrackingEnabledForPartitionReads()
 +    {
 +        return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
 +    }
 +
 +    @Override
 +    public void enableReportingUnconfirmedRepairedDataMismatches()
 +    {
 +        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
 +    }
 +
 +    @Override
 +    public void disableReportingUnconfirmedRepairedDataMismatches()
 +    {
 +       DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
 +    }
 +
 +    @Override
 +    public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled()
 +    {
 +        return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
 +    }
 +
 +    @Override
 +    public boolean getSnapshotOnRepairedDataMismatchEnabled()
 +    {
 +        return DatabaseDescriptor.snapshotOnRepairedDataMismatch();
 +    }
 +
 +    @Override
 +    public void enableSnapshotOnRepairedDataMismatch()
 +    {
 +        DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(true);
 +    }
 +
 +    @Override
 +    public void disableSnapshotOnRepairedDataMismatch()
 +    {
 +        DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(false);
 +    }
 +
 +    static class PaxosBallotAndContention
 +    {
 +        final UUID ballot;
 +        final int contentions;
 +
 +        PaxosBallotAndContention(UUID ballot, int contentions)
 +        {
 +            this.ballot = ballot;
 +            this.contentions = contentions;
 +        }
 +
 +        @Override
 +        public final int hashCode()
 +        {
 +            int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode());
 +            return 31 * hashCode * this.contentions;
 +        }
 +
 +        @Override
 +        public final boolean equals(Object o)
 +        {
 +            if(!(o instanceof PaxosBallotAndContention))
 +                return false;
 +            PaxosBallotAndContention that = (PaxosBallotAndContention)o;
 +            // handles nulls properly
 +            return Objects.equals(ballot, that.ballot) && contentions == that.contentions;
 +        }
      }
+ 
+     @Override
+     public boolean getSnapshotOnDuplicateRowDetectionEnabled()
+     {
+         return DatabaseDescriptor.snapshotOnDuplicateRowDetection();
+     }
+ 
+     @Override
+     public void enableSnapshotOnDuplicateRowDetection()
+     {
+         DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+     }
+ 
+     @Override
+     public void disableSnapshotOnDuplicateRowDetection()
+     {
+         DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(false);
+     }
+ 
+     @Override
+     public boolean getCheckForDuplicateRowsDuringReads()
+     {
+         return DatabaseDescriptor.checkForDuplicateRowsDuringReads();
+     }
+ 
+     @Override
+     public void enableCheckForDuplicateRowsDuringReads()
+     {
+         DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(true);
+     }
+ 
+     @Override
+     public void disableCheckForDuplicateRowsDuringReads()
+     {
+         DatabaseDescriptor.setCheckForDuplicateRowsDuringReads(false);
+     }
+ 
+     @Override
+     public boolean getCheckForDuplicateRowsDuringCompaction()
+     {
+         return DatabaseDescriptor.checkForDuplicateRowsDuringCompaction();
+     }
+ 
+     @Override
+     public void enableCheckForDuplicateRowsDuringCompaction()
+     {
+         DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(true);
+     }
+ 
+     @Override
+     public void disableCheckForDuplicateRowsDuringCompaction()
+     {
+         DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(false);
+     }
  }
diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java
index e0d2c86,cdf07f4..e3cde4b
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@@ -71,25 -69,14 +71,36 @@@ public interface StorageProxyMBea
  
      public int getNumberOfTables();
  
 +    public String getIdealConsistencyLevel();
 +    public String setIdealConsistencyLevel(String cl);
 +
 +    /**
 +     * Tracking and reporting of variances in the repaired data set across replicas at read time
 +     */
 +    void enableRepairedDataTrackingForRangeReads();
 +    void disableRepairedDataTrackingForRangeReads();
 +    boolean getRepairedDataTrackingEnabledForRangeReads();
 +
 +    void enableRepairedDataTrackingForPartitionReads();
 +    void disableRepairedDataTrackingForPartitionReads();
 +    boolean getRepairedDataTrackingEnabledForPartitionReads();
 +
 +    void enableReportingUnconfirmedRepairedDataMismatches();
 +    void disableReportingUnconfirmedRepairedDataMismatches();
 +    boolean getReportingUnconfirmedRepairedDataMismatchesEnabled();
 +
 +    void enableSnapshotOnRepairedDataMismatch();
 +    void disableSnapshotOnRepairedDataMismatch();
 +    boolean getSnapshotOnRepairedDataMismatchEnabled();
++
+     void enableSnapshotOnDuplicateRowDetection();
+     void disableSnapshotOnDuplicateRowDetection();
+     boolean getSnapshotOnDuplicateRowDetectionEnabled();
+ 
+     boolean getCheckForDuplicateRowsDuringReads();
+     void enableCheckForDuplicateRowsDuringReads();
+     void disableCheckForDuplicateRowsDuringReads();
+     boolean getCheckForDuplicateRowsDuringCompaction();
+     void enableCheckForDuplicateRowsDuringCompaction();
+     void disableCheckForDuplicateRowsDuringCompaction();
  }
diff --cc src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 33abb15,0000000..8907e74
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@@ -1,434 -1,0 +1,435 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import com.google.common.base.Preconditions;
 +
 +import com.google.common.base.Predicates;
 +
++import org.apache.cassandra.db.transform.DuplicateRowChecker;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.locator.ReplicaPlans;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.exceptions.ReadFailureException;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.exceptions.UnavailableException;
 +import org.apache.cassandra.locator.EndpointsForToken;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.locator.ReplicaCollection;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
 +import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
 +import org.apache.cassandra.tracing.TraceState;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +import static com.google.common.collect.Iterables.all;
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +
 +/**
 + * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
 + *
 + * Optionally, may perform additional requests to provide redundancy against replica failure:
 + * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while
 + * SpeculatingReadExecutor will wait until it looks like the original request is in danger
 + * of timing out before performing extra reads.
 + */
 +public abstract class AbstractReadExecutor
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
 +
 +    protected final ReadCommand command;
 +    private   final ReplicaPlan.SharedForTokenRead replicaPlan;
 +    protected final ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> readRepair;
 +    protected final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> digestResolver;
 +    protected final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler;
 +    protected final TraceState traceState;
 +    protected final ColumnFamilyStore cfs;
 +    protected final long queryStartNanoTime;
 +    private   final int initialDataRequestCount;
 +    protected volatile PartitionIterator result = null;
 +
 +    AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, int initialDataRequestCount, long queryStartNanoTime)
 +    {
 +        this.command = command;
 +        this.replicaPlan = ReplicaPlan.shared(replicaPlan);
 +        this.initialDataRequestCount = initialDataRequestCount;
 +        // the ReadRepair and DigestResolver both need to see our updated
 +        this.readRepair = ReadRepair.create(command, this.replicaPlan, queryStartNanoTime);
 +        this.digestResolver = new DigestResolver<>(command, this.replicaPlan, queryStartNanoTime);
 +        this.handler = new ReadCallback<>(digestResolver, command, this.replicaPlan, queryStartNanoTime);
 +        this.cfs = cfs;
 +        this.traceState = Tracing.instance.get();
 +        this.queryStartNanoTime = queryStartNanoTime;
 +
 +
 +        // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
 +        // knows how to produce older digest but the reverse is not true.
 +        // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
 +        // we stop being compatible with pre-3.0 nodes.
 +        int digestVersion = MessagingService.current_version;
 +        for (Replica replica : replicaPlan.contacts())
 +            digestVersion = Math.min(digestVersion, MessagingService.instance().versions.get(replica.endpoint()));
 +        command.setDigestVersion(digestVersion);
 +    }
 +
 +    public DecoratedKey getKey()
 +    {
 +        Preconditions.checkState(command instanceof SinglePartitionReadCommand,
 +                                 "Can only get keys for SinglePartitionReadCommand");
 +        return ((SinglePartitionReadCommand) command).partitionKey();
 +    }
 +
 +    public ReadRepair getReadRepair()
 +    {
 +        return readRepair;
 +    }
 +
 +    protected void makeFullDataRequests(ReplicaCollection<?> replicas)
 +    {
 +        assert all(replicas, Replica::isFull);
 +        makeRequests(command, replicas);
 +    }
 +
 +    protected void makeTransientDataRequests(Iterable<Replica> replicas)
 +    {
 +        makeRequests(command.copyAsTransientQuery(replicas), replicas);
 +    }
 +
 +    protected void makeDigestRequests(Iterable<Replica> replicas)
 +    {
 +        assert all(replicas, Replica::isFull);
 +        // only send digest requests to full replicas, send data requests instead to the transient replicas
 +        makeRequests(command.copyAsDigestQuery(replicas), replicas);
 +    }
 +
 +    private void makeRequests(ReadCommand readCommand, Iterable<Replica> replicas)
 +    {
 +        boolean hasLocalEndpoint = false;
 +        Message<ReadCommand> message = null;
 +
 +        for (Replica replica: replicas)
 +        {
 +            assert replica.isFull() || readCommand.acceptsTransient();
 +
 +            InetAddressAndPort endpoint = replica.endpoint();
 +            if (replica.isSelf())
 +            {
 +                hasLocalEndpoint = true;
 +                continue;
 +            }
 +
 +            if (traceState != null)
 +                traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
 +
 +            if (null == message)
 +                message = readCommand.createMessage(false);
 +
 +            MessagingService.instance().sendWithCallback(message, endpoint, handler);
 +        }
 +
 +        // We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
 +        if (hasLocalEndpoint)
 +        {
 +            logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
 +            Stage.READ.maybeExecuteImmediately(new LocalReadRunnable(command, handler));
 +        }
 +    }
 +
 +    /**
 +     * Perform additional requests if it looks like the original will time out.  May block while it waits
 +     * to see if the original requests are answered first.
 +     */
 +    public abstract void maybeTryAdditionalReplicas();
 +
 +    /**
 +     * send the initial set of requests
 +     */
 +    public void executeAsync()
 +    {
 +        EndpointsForToken selected = replicaPlan().contacts();
 +        EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount);
 +        makeFullDataRequests(fullDataRequests);
 +        makeTransientDataRequests(selected.filterLazily(Replica::isTransient));
 +        makeDigestRequests(selected.filterLazily(r -> r.isFull() && !fullDataRequests.contains(r)));
 +    }
 +
 +    /**
 +     * @return an executor appropriate for the configured speculative read policy
 +     */
 +    public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
 +    {
 +        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
 +        SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
 +
 +        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
 +
 +        // Speculative retry is disabled *OR*
 +        // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
 +        if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
 +            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false);
 +
 +        // There are simply no extra replicas to speculate.
 +        // Handle this separately so it can record failed attempts to speculate due to lack of replicas
 +        if (replicaPlan.contacts().size() == replicaPlan.candidates().size())
 +        {
 +            boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
 +            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation);
 +        }
 +
 +        if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
 +            return new AlwaysSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
 +        else // PERCENTILE or CUSTOM.
 +            return new SpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
 +    }
 +
 +    /**
 +     *  Returns true if speculation should occur and if it should then block until it is time to
 +     *  send the speculative reads
 +     */
 +    boolean shouldSpeculateAndMaybeWait()
 +    {
 +        // no latency information, or we're overloaded
 +        if (cfs.sampleReadLatencyNanos > command.getTimeout(NANOSECONDS))
 +            return false;
 +
 +        return !handler.await(cfs.sampleReadLatencyNanos, NANOSECONDS);
 +    }
 +
 +    ReplicaPlan.ForTokenRead replicaPlan()
 +    {
 +        return replicaPlan.get();
 +    }
 +
 +    void onReadTimeout() {}
 +
 +    public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
 +    {
 +        /**
 +         * If never speculating due to lack of replicas
 +         * log it is as a failure if it should have happened
 +         * but couldn't due to lack of replicas
 +         */
 +        private final boolean logFailedSpeculation;
 +
 +        public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime, boolean logFailedSpeculation)
 +        {
 +            super(cfs, command, replicaPlan, 1, queryStartNanoTime);
 +            this.logFailedSpeculation = logFailedSpeculation;
 +        }
 +
 +        public void maybeTryAdditionalReplicas()
 +        {
 +            if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
 +            {
 +                cfs.metric.speculativeInsufficientReplicas.inc();
 +            }
 +        }
 +    }
 +
 +    static class SpeculatingReadExecutor extends AbstractReadExecutor
 +    {
 +        private volatile boolean speculated = false;
 +
 +        public SpeculatingReadExecutor(ColumnFamilyStore cfs,
 +                                       ReadCommand command,
 +                                       ReplicaPlan.ForTokenRead replicaPlan,
 +                                       long queryStartNanoTime)
 +        {
 +            // We're hitting additional targets for read repair (??).  Since our "extra" replica is the least-
 +            // preferred by the snitch, we do an extra data read to start with against a replica more
 +            // likely to respond; better to let RR fail than the entire query.
 +            super(cfs, command, replicaPlan, replicaPlan.blockFor() < replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime);
 +        }
 +
 +        public void maybeTryAdditionalReplicas()
 +        {
 +            if (shouldSpeculateAndMaybeWait())
 +            {
 +                //Handle speculation stats first in case the callback fires immediately
 +                cfs.metric.speculativeRetries.inc();
 +                speculated = true;
 +
 +                ReplicaPlan.ForTokenRead replicaPlan = replicaPlan();
 +                ReadCommand retryCommand;
 +                Replica extraReplica;
 +                if (handler.resolver.isDataPresent())
 +                {
 +                    extraReplica = replicaPlan.firstUncontactedCandidate(Predicates.alwaysTrue());
 +
 +                    // we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
 +                    assert extraReplica != null;
 +
 +                    retryCommand = extraReplica.isTransient()
 +                            ? command.copyAsTransientQuery(extraReplica)
 +                            : command.copyAsDigestQuery(extraReplica);
 +                }
 +                else
 +                {
 +                    extraReplica = replicaPlan.firstUncontactedCandidate(Replica::isFull);
 +                    retryCommand = command;
 +                    if (extraReplica == null)
 +                    {
 +                        cfs.metric.speculativeInsufficientReplicas.inc();
 +                        // cannot safely speculate a new data request, without more work - requests assumed to be
 +                        // unique per endpoint, and we have no full nodes left to speculate against
 +                        return;
 +                    }
 +                }
 +
 +                // we must update the plan to include this new node, else when we come to read-repair, we may not include this
 +                // speculated response in the data requests we make again, and we will not be able to 'speculate' an extra repair read,
 +                // nor would we be able to speculate a new 'write' if the repair writes are insufficient
 +                super.replicaPlan.addToContacts(extraReplica);
 +
 +                if (traceState != null)
 +                    traceState.trace("speculating read retry on {}", extraReplica);
 +                logger.trace("speculating read retry on {}", extraReplica);
 +                MessagingService.instance().sendWithCallback(retryCommand.createMessage(false), extraReplica.endpoint(), handler);
 +            }
 +        }
 +
 +        @Override
 +        void onReadTimeout()
 +        {
 +            //Shouldn't be possible to get here without first attempting to speculate even if the
 +            //timing is bad
 +            assert speculated;
 +            cfs.metric.speculativeFailedRetries.inc();
 +        }
 +    }
 +
 +    private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
 +    {
 +        public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
 +                                             ReadCommand command,
 +                                             ReplicaPlan.ForTokenRead replicaPlan,
 +                                             long queryStartNanoTime)
 +        {
 +            // presumably, we speculate an extra data request here in case it is our data request that fails to respond,
 +            // and there are no more nodes to consult
 +            super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 ? 2 : 1, queryStartNanoTime);
 +        }
 +
 +        public void maybeTryAdditionalReplicas()
 +        {
 +            // no-op
 +        }
 +
 +        @Override
 +        public void executeAsync()
 +        {
 +            super.executeAsync();
 +            cfs.metric.speculativeRetries.inc();
 +        }
 +
 +        @Override
 +        void onReadTimeout()
 +        {
 +            cfs.metric.speculativeFailedRetries.inc();
 +        }
 +    }
 +
 +    public void setResult(PartitionIterator result)
 +    {
 +        Preconditions.checkState(this.result == null, "Result can only be set once");
-         this.result = result;
++        this.result = DuplicateRowChecker.duringRead(result, this.replicaPlan.get().candidates().endpointList());
 +    }
 +
 +    /**
 +     * Wait for the CL to be satisfied by responses
 +     */
 +    public void awaitResponses() throws ReadTimeoutException
 +    {
 +        try
 +        {
 +            handler.awaitResults();
 +        }
 +        catch (ReadTimeoutException e)
 +        {
 +            try
 +            {
 +                onReadTimeout();
 +            }
 +            finally
 +            {
 +                throw e;
 +            }
 +        }
 +
 +        // return immediately, or begin a read repair
 +        if (digestResolver.responsesMatch())
 +        {
 +            setResult(digestResolver.getData());
 +        }
 +        else
 +        {
 +            Tracing.trace("Digest mismatch: Mismatch for key {}", getKey());
 +            readRepair.startRepair(digestResolver, this::setResult);
 +        }
 +    }
 +
 +    public void awaitReadRepair() throws ReadTimeoutException
 +    {
 +        try
 +        {
 +            readRepair.awaitReads();
 +        }
 +        catch (ReadTimeoutException e)
 +        {
 +            if (Tracing.isTracing())
 +                Tracing.trace("Timed out waiting on digest mismatch repair requests");
 +            else
 +                logger.trace("Timed out waiting on digest mismatch repair requests");
 +            // the caught exception here will have CL.ALL from the repair command,
 +            // not whatever CL the initial command was at (CASSANDRA-7947)
 +            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), handler.blockFor - 1, handler.blockFor, true);
 +        }
 +    }
 +
 +    boolean isDone()
 +    {
 +        return result != null;
 +    }
 +
 +    public void maybeSendAdditionalDataRequests()
 +    {
 +        if (isDone())
 +            return;
 +
 +        readRepair.maybeSendAdditionalReads();
 +    }
 +
 +    public PartitionIterator getResult() throws ReadFailureException, ReadTimeoutException
 +    {
 +        Preconditions.checkState(result != null, "Result must be set first");
 +        return result;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
index bed240c,0000000..d1cff11
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
@@@ -1,157 -1,0 +1,132 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.time.LocalDate;
 +import java.time.format.DateTimeFormatter;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.SnapshotCommand;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.service.SnapshotVerbHandler;
 +import org.apache.cassandra.tracing.Tracing;
++import org.apache.cassandra.utils.DiagnosticSnapshotService;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +
 +public interface RepairedDataVerifier
 +{
 +    public void verify(RepairedDataTracker tracker);
 +
 +    static RepairedDataVerifier verifier(ReadCommand command)
 +    {
 +        return DatabaseDescriptor.snapshotOnRepairedDataMismatch() ? snapshotting(command) : simple(command);
 +    }
 +
 +    static RepairedDataVerifier simple(ReadCommand command)
 +    {
 +        return new SimpleVerifier(command);
 +    }
 +
 +    static RepairedDataVerifier snapshotting(ReadCommand command)
 +    {
 +        return new SnapshottingVerifier(command);
 +    }
 +
 +    static class SimpleVerifier implements RepairedDataVerifier
 +    {
 +        private static final Logger logger = LoggerFactory.getLogger(SimpleVerifier.class);
 +        protected final ReadCommand command;
 +
 +        private static final String INCONSISTENCY_WARNING = "Detected mismatch between repaired datasets for table {}.{} during read of {}. {}";
 +
 +        SimpleVerifier(ReadCommand command)
 +        {
 +            this.command = command;
 +        }
 +
 +        @Override
 +        public void verify(RepairedDataTracker tracker)
 +        {
 +            Tracing.trace("Verifying repaired data tracker {}", tracker);
 +
 +            // some mismatch occurred between the repaired datasets on the replicas
 +            if (tracker.digests.keySet().size() > 1)
 +            {
 +                // if any of the digests should be considered inconclusive, because there were
 +                // pending repair sessions which had not yet been committed or unrepaired partition
 +                // deletes which meant some sstables were skipped during reads, mark the inconsistency
 +                // as confirmed
 +                if (tracker.inconclusiveDigests.isEmpty())
 +                {
 +                    TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
 +                    metrics.confirmedRepairedInconsistencies.mark();
 +                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
 +                                     INCONSISTENCY_WARNING, command.metadata().keyspace,
 +                                     command.metadata().name, command.toString(), tracker);
 +                }
 +                else if (DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
 +                {
 +                    TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
 +                    metrics.unconfirmedRepairedInconsistencies.mark();
 +                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
 +                                     INCONSISTENCY_WARNING, command.metadata().keyspace,
 +                                     command.metadata().name, command.toString(), tracker);
 +                }
 +            }
 +        }
 +    }
 +
 +    static class SnapshottingVerifier extends SimpleVerifier
 +    {
 +        private static final Logger logger = LoggerFactory.getLogger(SnapshottingVerifier.class);
-         private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE;
 +        private static final String SNAPSHOTTING_WARNING = "Issuing snapshot command for mismatch between repaired datasets for table {}.{} during read of {}. {}";
 +
-         // Issue at most 1 snapshot request per minute for any given table.
-         // Replicas will only create one snapshot per day, but this stops us
-         // from swamping the network if we start seeing mismatches.
-         private static final long SNAPSHOT_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
-         private static final ConcurrentHashMap<TableId, AtomicLong> LAST_SNAPSHOT_TIMES = new ConcurrentHashMap<>();
- 
 +        SnapshottingVerifier(ReadCommand command)
 +        {
 +            super(command);
 +        }
 +
 +        public void verify(RepairedDataTracker tracker)
 +        {
 +            super.verify(tracker);
 +            if (tracker.digests.keySet().size() > 1)
 +            {
 +                if (tracker.inconclusiveDigests.isEmpty() ||  DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
 +                {
-                     long now = System.nanoTime();
-                     AtomicLong cached = LAST_SNAPSHOT_TIMES.computeIfAbsent(command.metadata().id, u -> new AtomicLong(0));
-                     long last = cached.get();
-                     if (now - last > SNAPSHOT_INTERVAL_NANOS && cached.compareAndSet(last, now))
-                     {
-                         logger.warn(SNAPSHOTTING_WARNING, command.metadata().keyspace, command.metadata().name, command.toString(), tracker);
-                         Message<SnapshotCommand> msg = Message.out(Verb.SNAPSHOT_REQ,
-                                                                    new SnapshotCommand(command.metadata().keyspace,
-                                                                                        command.metadata().name,
-                                                                                        getSnapshotName(),
-                                                                                        false));
-                         for (InetAddressAndPort replica : tracker.digests.values())
-                             MessagingService.instance().send(msg, replica);
-                     }
++                    logger.warn(SNAPSHOTTING_WARNING, command.metadata().keyspace, command.metadata().name, command.toString(), tracker);
++                    DiagnosticSnapshotService.repairedDataMismatch(command.metadata(), tracker.digests.values());
 +                }
 +            }
 +        }
- 
-         public static String getSnapshotName()
-         {
-             return String.format("%s%s",
-                                  SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX,
-                                  DATE_FORMAT.format(LocalDate.now()));
-         }
 +    }
 +}
 +
diff --cc src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
index 0000000,5c48412..d1f33ed
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
+++ b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
@@@ -1,0 -1,188 +1,199 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.utils;
+ 
+ import java.net.InetAddress;
+ import java.time.LocalDate;
+ import java.time.format.DateTimeFormatter;
 -import java.util.UUID;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.concurrent.NamedThreadFactory;
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.*;
 -import org.apache.cassandra.net.MessageOut;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.schema.TableId;
++import org.apache.cassandra.schema.TableMetadata;
++import org.hsqldb.Table;
+ 
+ /**
+  * Provides a means to take snapshots when triggered by anomalous events or when the breaking of invariants is
+  * detected. When debugging certain classes of problems, having access to the relevant set of sstables when the problem
+  * is detected (or as close to then as possible) can be invaluable.
+  *
+  * This class performs two functions; on a replica where an anomaly is detected, it provides methods to issue snapshot
+  * requests to a provided set of replicas. For instance, if rows with duplicate clusterings are detected
+  * (CASSANDRA-15789) during a read, a snapshot request will be issued to all participating replicas. If detected during
+  * compaction, only the replica itself will receive the request. Requests are issued at a maximum rate of 1 per minute
+  * for any given table. Any additional triggers for the same table during the 60 second window are dropped, regardless
+  * of the replica set. This window is configurable via a system property (cassandra.diagnostic_snapshot_interval_nanos),
+  * but this is intended for use in testing only and operators are not expected to override the default.
+  *
+  * The second function performed is to handle snapshot requests on replicas. Snapshot names are prefixed with strings
+  * specific to the reason which triggered them. To manage consumption of disk space, replicas are restricted to taking
+  * a single snapshot for each prefix in a single calendar day. So if duplicate rows are detected by multiple
+  * coordinators during reads with the same replica set (or overlapping sets) on the same table, the coordinators may
+  * each issue snapshot  requests, but the replicas will only accept the first one they receive. Further requests will
+  * be dropped on the replica side.
+  */
+ public class DiagnosticSnapshotService
+ {
+     private static final Logger logger = LoggerFactory.getLogger(DiagnosticSnapshotService.class);
+ 
+     public static final DiagnosticSnapshotService instance =
+         new DiagnosticSnapshotService(Executors.newSingleThreadExecutor(new NamedThreadFactory("DiagnosticSnapshot")));
+ 
++    public static final String REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX = "RepairedDataMismatch-";
+     public static final String DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX = "DuplicateRows-";
+ 
+     private final Executor executor;
+ 
+     private DiagnosticSnapshotService(Executor executor)
+     {
+         this.executor = executor;
+     }
+ 
+     // Issue at most 1 snapshot request per minute for any given table.
+     // Replicas will only create one snapshot per day, but this stops us
+     // from swamping the network.
+     // Overridable via system property for testing.
+     private static final long SNAPSHOT_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
+     private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE;
 -    private final ConcurrentHashMap<UUID, AtomicLong> lastSnapshotTimes = new ConcurrentHashMap<>();
++    private final ConcurrentHashMap<TableId, AtomicLong> lastSnapshotTimes = new ConcurrentHashMap<>();
+ 
 -    public static void duplicateRows(CFMetaData metadata, Iterable<InetAddress> replicas)
++    public static void duplicateRows(TableMetadata metadata, Iterable<InetAddressAndPort> replicas)
+     {
+         instance.maybeTriggerSnapshot(metadata, DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX, replicas);
+     }
+ 
++    public static void repairedDataMismatch(TableMetadata metadata, Iterable<InetAddressAndPort> replicas)
++    {
++        instance.maybeTriggerSnapshot(metadata, REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX, replicas);
++    }
++
+     public static boolean isDiagnosticSnapshotRequest(SnapshotCommand command)
+     {
 -        return command.snapshot_name.startsWith(DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX);
++        return command.snapshot_name.startsWith(REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX)
++            || command.snapshot_name.startsWith(DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX);
+     }
+ 
 -    public static void snapshot(SnapshotCommand command, InetAddress initiator)
++    public static void snapshot(SnapshotCommand command, InetAddressAndPort initiator)
+     {
+         Preconditions.checkArgument(isDiagnosticSnapshotRequest(command));
+         instance.maybeSnapshot(command, initiator);
+     }
+ 
+     public static String getSnapshotName(String prefix)
+     {
+         return String.format("%s%s", prefix, DATE_FORMAT.format(LocalDate.now()));
+     }
+ 
+     @VisibleForTesting
+     public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+     {
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, executor);
+     }
+ 
 -    private void maybeTriggerSnapshot(CFMetaData metadata, String prefix, Iterable<InetAddress> endpoints)
++    private void maybeTriggerSnapshot(TableMetadata metadata, String prefix, Iterable<InetAddressAndPort> endpoints)
+     {
+         long now = System.nanoTime();
 -        AtomicLong cached = lastSnapshotTimes.computeIfAbsent(metadata.cfId, u -> new AtomicLong(0));
++        AtomicLong cached = lastSnapshotTimes.computeIfAbsent(metadata.id, u -> new AtomicLong(0));
+         long last = cached.get();
+         long interval = Long.getLong("cassandra.diagnostic_snapshot_interval_nanos", SNAPSHOT_INTERVAL_NANOS);
+         if (now - last > interval && cached.compareAndSet(last, now))
+         {
 -            MessageOut<?> msg = new SnapshotCommand(metadata.ksName,
 -                                                    metadata.cfName,
 -                                                    getSnapshotName(prefix),
 -                                                    false).createMessage();
 -            for (InetAddress replica : endpoints)
 -                MessagingService.instance().sendOneWay(msg, replica);
++            Message<SnapshotCommand> msg = Message.out(Verb.SNAPSHOT_REQ,
++                                                       new SnapshotCommand(metadata.keyspace,
++                                                                           metadata.name,
++                                                                           getSnapshotName(prefix),
++                                                                           false));
++            for (InetAddressAndPort replica : endpoints)
++                MessagingService.instance().send(msg, replica);
+         }
+         else
+         {
+             logger.debug("Diagnostic snapshot request dropped due to throttling");
+         }
+     }
+ 
 -    private void maybeSnapshot(SnapshotCommand command, InetAddress initiator)
++    private void maybeSnapshot(SnapshotCommand command, InetAddressAndPort initiator)
+     {
+         executor.execute(new DiagnosticSnapshotTask(command, initiator));
+     }
+ 
+     private static class DiagnosticSnapshotTask implements Runnable
+     {
+         final SnapshotCommand command;
 -        final InetAddress from;
++        final InetAddressAndPort from;
+ 
 -        DiagnosticSnapshotTask(SnapshotCommand command, InetAddress from)
++        DiagnosticSnapshotTask(SnapshotCommand command, InetAddressAndPort from)
+         {
+             this.command = command;
+             this.from = from;
+         }
+ 
+         public void run()
+         {
+             try
+             {
+                 Keyspace ks = Keyspace.open(command.keyspace);
+                 if (ks == null)
+                 {
+                     logger.info("Snapshot request received from {} for {}.{} but keyspace not found",
+                                 from,
+                                 command.keyspace,
+                                 command.column_family);
+                     return;
+                 }
+ 
+                 ColumnFamilyStore cfs = ks.getColumnFamilyStore(command.column_family);
+                 if (cfs.snapshotExists(command.snapshot_name))
+                 {
+                     logger.info("Received diagnostic snapshot request from {} for {}.{}, " +
+                                 "but snapshot with tag {} already exists",
+                                 from,
+                                 command.keyspace,
+                                 command.column_family,
+                                 command.snapshot_name);
+                     return;
+                 }
+                 logger.info("Creating snapshot requested by {} of {}.{} tag: {}",
+                             from,
+                             command.keyspace,
+                             command.column_family,
+                             command.snapshot_name);
+                 cfs.snapshot(command.snapshot_name);
+             }
+             catch (IllegalArgumentException e)
+             {
+                 logger.warn("Snapshot request received from {} for {}.{} but CFS not found",
+                             from,
+                             command.keyspace,
+                             command.column_family);
+             }
+         }
+     }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e79a182,5bb449e..6f9577a
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -102,10 -99,11 +102,11 @@@ import org.apache.cassandra.tools.NodeT
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
+ import org.apache.cassandra.utils.DiagnosticSnapshotService;
  import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Throwables;
 -import org.apache.cassandra.utils.UUIDGen;
  import org.apache.cassandra.utils.concurrent.Ref;
  import org.apache.cassandra.utils.memory.BufferPool;
  
@@@ -554,21 -652,20 +555,22 @@@ public class Instance extends IsolatedE
                                  () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
                                  () -> Ref.shutdownReferenceReaper(1L, MINUTES),
                                  () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
++                                () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES),
 +                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
                                  () -> SSTableReader.shutdownBlocking(1L, MINUTES),
 -                                () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
 +                                () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor)),
 +                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
              );
 +
              error = parallelRun(error, executor,
 -                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
 -                                MessagingService.instance()::shutdown
 +                                CommitLog.instance::shutdownBlocking,
 +                                () -> MessagingService.instance().shutdown(1L, MINUTES, false, true)
              );
              error = parallelRun(error, executor,
 -                                () -> StageManager.shutdownAndWait(1L, MINUTES),
 +                                () -> GlobalEventExecutor.INSTANCE.awaitInactivity(1l, MINUTES),
 +                                () -> Stage.shutdownAndWait(1L, MINUTES),
                                  () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
              );
 -            error = parallelRun(error, executor,
 -                                CommitLog.instance::shutdownBlocking
 -            );
  
              Throwables.maybeFail(error);
          }).apply(isolatedExecutor);
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 4b382a1,0000000..308702a
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@@ -1,477 -1,0 +1,477 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.time.LocalDate;
 +import java.time.format.DateTimeFormatter;
 +import java.util.EnumSet;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.IntStream;
 +import java.util.stream.Stream;
 +
 +import com.google.common.util.concurrent.Uninterruptibles;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.SnapshotVerbHandler;
 +import org.apache.cassandra.service.StorageProxy;
++import org.apache.cassandra.utils.DiagnosticSnapshotService;
 +
 +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
 +import static org.junit.Assert.fail;
 +
 +public class RepairDigestTrackingTest extends TestBaseImpl
 +{
 +    private static final String TABLE = "tbl";
 +    private static final String KS_TABLE = KEYSPACE + "." + TABLE;
 +
 +    @Test
 +    public void testInconsistenciesFound() throws Throwable
 +    {
 +        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
 +        {
 +
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +            });
 +
 +            cluster.schemaChange("CREATE TABLE " + KS_TABLE+ " (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
 +            cluster.forEach(i -> i.flush(KEYSPACE));
 +
 +            for (int i = 10; i < 20; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
 +            cluster.forEach(i -> i.flush(KEYSPACE));
 +            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
 +
 +            // mark everything on node 2 repaired
 +            cluster.get(2).runOnInstance(markAllRepaired());
 +            cluster.get(2).runOnInstance(assertRepaired());
 +
 +            // insert more data on node1 to generate an initial mismatch
 +            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
 +            cluster.get(1).runOnInstance(assertNotRepaired());
 +
 +            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
 +            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
 +            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
 +            Assert.assertEquals("confirmed count should differ by 1 after range read", ccBefore + 1, ccAfter);
 +        }
 +    }
 +
 +    @Test
 +    public void testPurgeableTombstonesAreIgnored() throws Throwable
 +    {
 +        try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
 +        {
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +            });
 +
 +            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
 +            // on node1 only insert some tombstones, then flush
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.get(1).executeInternal("DELETE v1 FROM " + KS_TABLE + " USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
 +            }
 +            cluster.get(1).flush(KEYSPACE);
 +
 +            // insert data on both nodes and flush
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL,
 +                                               i, i, i);
 +            }
 +            cluster.forEach(i -> i.flush(KEYSPACE));
 +
 +            // nothing is repaired yet
 +            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
 +            // mark everything repaired
 +            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
 +            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
 +
 +            // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
 +            }
 +
 +            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
 +            // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
 +            TimeUnit.SECONDS.sleep(2);
 +            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
 +            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
 +
 +            Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
 +        }
 +    }
 +
 +    @Test
 +    public void testSnapshottingOnInconsistency() throws Throwable
 +    {
 +        try (Cluster cluster = init(Cluster.create(2)))
 +        {
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
 +            });
 +
 +            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v INT, PRIMARY KEY (k,c))");
 +            for (int i = 0; i < 10; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
 +                                               ConsistencyLevel.ALL, i, i);
 +            }
 +            cluster.forEach(c -> c.flush(KEYSPACE));
 +
 +            for (int i = 10; i < 20; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
 +                                               ConsistencyLevel.ALL, i, i);
 +            }
 +            cluster.forEach(c -> c.flush(KEYSPACE));
 +            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
 +            // Mark everything repaired on node2
 +            cluster.get(2).runOnInstance(markAllRepaired());
 +            cluster.get(2).runOnInstance(assertRepaired());
 +
 +            // now overwrite on node1 only to generate digest mismatches
 +            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 55);
 +            cluster.get(1).runOnInstance(assertNotRepaired());
 +
 +            // Execute a partition read and assert inconsistency is detected (as nothing is repaired on node1)
 +            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
 +            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
 +            long ccAfter = getConfirmedInconsistencies(cluster.get(1));
 +            Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 1, ccAfter);
 +
-             String snapshotName = SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX
-                                   + DateTimeFormatter.BASIC_ISO_DATE.format(LocalDate.now());
++            String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX);
 +
 +            cluster.forEach(i -> i.runOnInstance(assertSnapshotNotPresent(snapshotName)));
 +
 +            // re-introduce a mismatch, enable snapshotting and try again
 +            cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 555);
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableSnapshotOnRepairedDataMismatch();
 +            });
 +
 +            cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
 +            ccAfter = getConfirmedInconsistencies(cluster.get(1));
 +            Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 2, ccAfter);
 +
 +            cluster.forEach(i -> i.runOnInstance(assertSnapshotPresent(snapshotName)));
 +        }
 +    }
 +
 +    @Test
 +    public void testRepairedReadCountNormalizationWithInitialUnderread() throws Throwable
 +    {
 +        // Asserts that the amount of repaired data read for digest generation is consistent
 +        // across replicas where one has to read less repaired data to satisfy the original
 +        // limits of the read request.
 +        try (Cluster cluster = init(Cluster.create(2)))
 +        {
 +
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
 +            });
 +
 +            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
 +                                 "WITH CLUSTERING ORDER BY (c DESC)");
 +
 +            // insert data on both nodes and flush
 +            for (int i=0; i<20; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
 +                                               ConsistencyLevel.ALL, i, i);
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL, i, i);
 +            }
 +            cluster.forEach(c -> c.flush(KEYSPACE));
 +            // nothing is repaired yet
 +            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
 +            // mark everything repaired
 +            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
 +            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
 +
 +            // Add some unrepaired data to both nodes
 +            for (int i=20; i<30; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL, i, i);
 +            }
 +            // And some more unrepaired data to node2 only. This causes node2 to read less repaired data than node1
 +            // when satisfying the limits of the read. So node2 needs to overread more repaired data than node1 when
 +            // calculating the repaired data digest.
 +            cluster.get(2).executeInternal("INSERT INTO "  + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", 30, 30);
 +
 +            // Verify single partition read
 +            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=1 LIMIT 20", ConsistencyLevel.ALL),
 +                       rows(1, 30, 11));
 +            long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
 +
 +            // Recreate a mismatch in unrepaired data and verify partition range read
 +            cluster.get(2).executeInternal("INSERT INTO "  + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?)", 31, 31);
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 30", ConsistencyLevel.ALL),
 +                       rows(1, 31, 2));
 +            long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
 +
 +            if (ccAfterPartitionRead != ccAfterRangeRead)
 +                if (ccAfterPartitionRead != ccBefore)
 +                    fail("Both range and partition reads reported data inconsistencies but none were expected");
 +                else
 +                    fail("Reported inconsistency during range read but none were expected");
 +            else if (ccAfterPartitionRead != ccBefore)
 +                fail("Reported inconsistency during partition read but none were expected");
 +        }
 +    }
 +
 +    @Test
 +    public void testRepairedReadCountNormalizationWithInitialOverread() throws Throwable
 +    {
 +        // Asserts that the amount of repaired data read for digest generation is consistent
 +        // across replicas where one has to read more repaired data to satisfy the original
 +        // limits of the read request.
 +        try (Cluster cluster = init(Cluster.create(2)))
 +        {
 +
 +            cluster.get(1).runOnInstance(() -> {
 +                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
 +                StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
 +            });
 +
 +            cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
 +                                 "WITH CLUSTERING ORDER BY (c DESC)");
 +
 +            // insert data on both nodes and flush
 +            for (int i=0; i<10; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
 +                                               ConsistencyLevel.ALL, i, i);
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL, i, i);
 +            }
 +            cluster.forEach(c -> c.flush(KEYSPACE));
 +            // nothing is repaired yet
 +            cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
 +            // mark everything repaired
 +            cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
 +            cluster.forEach(i -> i.runOnInstance(assertRepaired()));
 +
 +            // Add some unrepaired data to both nodes
 +            for (int i=10; i<13; i++)
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL, i, i);
 +                cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
 +                                               ConsistencyLevel.ALL, i, i);
 +            }
 +            cluster.forEach(c -> c.flush(KEYSPACE));
 +            // And some row deletions on node2 only which cover data in the repaired set
 +            // This will cause node2 to read more repaired data in satisfying the limit of the read request
 +            // so it should overread less than node1 (in fact, it should not overread at all) in order to
 +            // calculate the repaired data digest.
 +            for (int i=7; i<10; i++)
 +            {
 +                cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 0 AND c = ?", i);
 +                cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 1 AND c = ?", i);
 +            }
 +
 +            // Verify single partition read
 +            long ccBefore = getConfirmedInconsistencies(cluster.get(1));
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0 LIMIT 5", ConsistencyLevel.ALL),
 +                       rows(rows(0, 12, 10), rows(0, 6, 5)));
 +            long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
 +
 +            // Verify partition range read
 +            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 11", ConsistencyLevel.ALL),
 +                       rows(rows(1, 12, 10), rows(1, 6, 0), rows(0, 12, 12)));
 +            long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
 +
 +            if (ccAfterPartitionRead != ccAfterRangeRead)
 +                if (ccAfterPartitionRead != ccBefore)
 +                    fail("Both range and partition reads reported data inconsistencies but none were expected");
 +                else
 +                    fail("Reported inconsistency during range read but none were expected");
 +            else if (ccAfterPartitionRead != ccBefore)
 +                fail("Reported inconsistency during partition read but none were expected");
 +        }
 +    }
 +
 +    private Object[][] rows(Object[][] head, Object[][]...tail)
 +    {
 +        return Stream.concat(Stream.of(head),
 +                             Stream.of(tail).flatMap(Stream::of))
 +                     .toArray(Object[][]::new);
 +    }
 +
 +    private Object[][] rows(int partitionKey, int start, int end)
 +    {
 +        if (start == end)
 +            return new Object[][] { new Object[] { partitionKey, start, end } };
 +
 +        IntStream clusterings = start > end
 +                                ? IntStream.range(end -1, start).map(i -> start - i + end - 1)
 +                                : IntStream.range(start, end);
 +
 +        return clusterings.mapToObj(i -> new Object[] {partitionKey, i, i}).toArray(Object[][]::new);
 +    }
 +
 +    private IIsolatedExecutor.SerializableRunnable assertNotRepaired()
 +    {
 +        return () ->
 +        {
 +            try
 +            {
 +                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
 +                                                           .getColumnFamilyStore(TABLE)
 +                                                           .getLiveSSTables()
 +                                                           .iterator();
 +                while (sstables.hasNext())
 +                {
 +                    SSTableReader sstable = sstables.next();
 +                    Descriptor descriptor = sstable.descriptor;
 +                    Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
 +                                                                              .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
 +
 +                    StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
 +                    Assert.assertEquals("repaired at is set for sstable: " + descriptor,
 +                                        stats.repairedAt,
 +                                        ActiveRepairService.UNREPAIRED_SSTABLE);
 +                }
 +            } catch (IOException e) {
 +                throw new RuntimeException(e);
 +            }
 +        };
 +    }
 +
 +    private IIsolatedExecutor.SerializableRunnable markAllRepaired()
 +    {
 +        return () ->
 +        {
 +            try
 +            {
 +                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
 +                                                           .getColumnFamilyStore(TABLE)
 +                                                           .getLiveSSTables()
 +                                                           .iterator();
 +                while (sstables.hasNext())
 +                {
 +                    SSTableReader sstable = sstables.next();
 +                    Descriptor descriptor = sstable.descriptor;
 +                    descriptor.getMetadataSerializer()
 +                              .mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
 +                    sstable.reloadSSTableMetadata();
 +                }
 +            } catch (IOException e) {
 +                throw new RuntimeException(e);
 +            }
 +        };
 +    }
 +
 +    private IIsolatedExecutor.SerializableRunnable assertRepaired()
 +    {
 +        return () ->
 +        {
 +            try
 +            {
 +                Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
 +                                                           .getColumnFamilyStore(TABLE)
 +                                                           .getLiveSSTables()
 +                                                           .iterator();
 +                while (sstables.hasNext())
 +                {
 +                    SSTableReader sstable = sstables.next();
 +                    Descriptor descriptor = sstable.descriptor;
 +                    Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
 +                                                                              .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
 +
 +                    StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
 +                    Assert.assertTrue("repaired at is not set for sstable: " + descriptor, stats.repairedAt > 0);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        };
 +    }
 +
 +    private IInvokableInstance.SerializableRunnable assertSnapshotPresent(String snapshotName)
 +    {
 +        return () ->
 +        {
 +            // snapshots are taken asynchronously, this is crude but it gives it a chance to happen
 +            int attempts = 100;
 +            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
 +
 +            while (cfs.getSnapshotDetails().isEmpty())
 +            {
 +                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +                if (attempts-- < 0)
 +                    throw new AssertionError(String.format("Snapshot %s not found for for %s", snapshotName, KS_TABLE));
 +            }
 +        };
 +    }
 +
 +    private IInvokableInstance.SerializableRunnable assertSnapshotNotPresent(String snapshotName)
 +    {
 +        return () ->
 +        {
 +            ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
 +            Assert.assertFalse(cfs.snapshotExists(snapshotName));
 +        };
 +    }
 +
 +    private long getConfirmedInconsistencies(IInvokableInstance instance)
 +    {
 +        return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
 +                                                     .getColumnFamilyStore(TABLE)
 +                                             .metric
 +                                             .confirmedRepairedInconsistencies
 +                                             .table
 +                                             .getCount());
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index babfdbc,e9391e0..cc50053
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@@ -49,4 -56,82 +49,4 @@@ public class MixedModeReadRepairTest ex
          .runAfterClusterUpgrade((cluster) -> cluster.get(2).forceCompact(DistributedTestBase.KEYSPACE, "tbl"))
          .run();
      }
- }
 -
 -    @Test
 -    public void mixedModeReadRepairDuplicateRows() throws Throwable
 -    {
 -        final String[] workload1 = new String[]
 -        {
 -            "DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 1 AND ck = 2;",
 -            "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, {'a':'b'}) USING TIMESTAMP 3;",
 -            "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, {'c':'d'}) USING TIMESTAMP 3;",
 -            "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, {'e':'f'}) USING TIMESTAMP 3;",
 -        };
 -
 -        final String[] workload2 = new String[]
 -        {
 -            "INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, {'g':'h'}) USING TIMESTAMP 5;",
 -        };
 -
 -        new TestCase()
 -        .nodes(2)
 -        .upgrade(Versions.Major.v22, Versions.Major.v30)
 -        .setup((cluster) ->
 -        {
 -            cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck));");
 -        })
 -        .runAfterNodeUpgrade((cluster, node) ->
 -        {
 -            if (node == 2)
 -                return;
 -
 -            // now node1 is 3.0 and node2 is 2.2
 -            for (int i = 0; i < workload1.length; i++ )
 -                cluster.coordinator(2).execute(workload1[i], ConsistencyLevel.QUORUM);
 -
 -            cluster.get(1).flush(KEYSPACE);
 -            cluster.get(2).flush(KEYSPACE);
 -
 -            validate(cluster, 2, false);
 -
 -            for (int i = 0; i < workload2.length; i++ )
 -                cluster.coordinator(2).execute(workload2[i], ConsistencyLevel.QUORUM);
 -
 -            cluster.get(1).flush(KEYSPACE);
 -            cluster.get(2).flush(KEYSPACE);
 -
 -            validate(cluster, 1, true);
 -        })
 -        .run();
 -    }
 -
 -    private void validate(UpgradeableCluster cluster, int nodeid, boolean local)
 -    {
 -        String query = "SELECT * FROM " + KEYSPACE + ".tbl";
 -
 -        Iterator<Object[]> iter = local
 -                                ? Iterators.forArray(cluster.get(nodeid).executeInternal(query))
 -                                : cluster.coordinator(nodeid).executeWithPaging(query, ConsistencyLevel.ALL, 2);
 -
 -        Object[] prevRow = null;
 -        Object prevClustering = null;
 -
 -        while (iter.hasNext())
 -        {
 -            Object[] row = iter.next();
 -            Object clustering = row[1];
 -
 -            if (clustering.equals(prevClustering))
 -            {
 -                fail(String.format("Duplicate rows on node %d in %s mode: \n%s\n%s",
 -                                   nodeid,
 -                                   local ? "local" : "distributed",
 -                                   Arrays.toString(prevRow),
 -                                   Arrays.toString(row)));
 -            }
 -
 -            prevRow = row;
 -            prevClustering = clustering;
 -        }
 -    }
+ }
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index d5b066b,58c5a00..719cd7d
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@@ -17,8 -17,12 +17,11 @@@
   */
  package org.apache.cassandra.db.compaction;
  
+ import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.assertCommandIssued;
+ import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.makeRow;
+ import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.rows;
  import static org.junit.Assert.*;
  
 -import java.net.InetAddress;
  import java.util.*;
  import java.util.regex.Matcher;
  import java.util.regex.Pattern;
@@@ -29,7 -33,9 +32,8 @@@ import org.junit.Test
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.CQLTester;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.DeletionTime;
@@@ -39,11 -45,15 +43,16 @@@ import org.apache.cassandra.db.marshal.
  import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
  import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.net.IMessageSink;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 -import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.net.Message;
  import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.TableMetadata;
++import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
  
- public class CompactionIteratorTest
+ public class CompactionIteratorTest extends CQLTester
  {
  
      private static final int NOW = 1000;
@@@ -442,9 -397,75 +451,62 @@@
          }
  
          @Override
 -        public String getBackingFiles()
 +        public Set<SSTableReader> getBackingSSTables()
          {
 -            return null;
 +            return ImmutableSet.of();
          }
      }
+ 
+     @Test
+     public void duplicateRowsTest() throws Throwable
+     {
+         System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0");
+         // Create a table and insert some data. The actual rows read in the test will be synthetic
+         // but this creates an sstable on disk to be snapshotted.
+         createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))");
+         for (int i = 0; i < 10; i++)
+             execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i);
 -        flush();
++        getCurrentColumnFamilyStore().forceBlockingFlush();
+ 
+         DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
 -        CFMetaData metadata = getCurrentColumnFamilyStore().metadata;
++        TableMetadata metadata = getCurrentColumnFamilyStore().metadata();
+ 
 -        final HashMap<InetAddress, MessageOut> sentMessages = new HashMap<>();
 -        IMessageSink sink = new IMessageSink()
 -        {
 -            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
 -            {
 -                sentMessages.put(to, message);
 -                return false;
 -            }
 -
 -            public boolean allowIncomingMessage(MessageIn message, int id)
 -            {
 -                return false;
 -            }
 -        };
 -        MessagingService.instance().addMessageSink(sink);
++        final HashMap<InetAddressAndPort, Message<?>> sentMessages = new HashMap<>();
++        MessagingService.instance().outboundSink.add((message, to) -> { sentMessages.put(to, message); return false;});
+ 
+         // no duplicates
+         sentMessages.clear();
+         iterate(makeRow(metadata,0, 0),
+                 makeRow(metadata,0, 1),
+                 makeRow(metadata,0, 2));
+         assertCommandIssued(sentMessages, false);
+ 
+         // now test with a duplicate row and see that we issue a snapshot command
+         sentMessages.clear();
+         iterate(makeRow(metadata, 0, 0),
+                 makeRow(metadata, 0, 1),
+                 makeRow(metadata, 0, 1));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     private void iterate(Unfiltered...unfiltereds)
+     {
+         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 -        DecoratedKey key = cfs.metadata.partitioner.decorateKey(ByteBufferUtil.bytes("key"));
++        DecoratedKey key = cfs.getPartitioner().decorateKey(ByteBufferUtil.bytes("key"));
+         try (CompactionController controller = new CompactionController(cfs, Integer.MAX_VALUE);
+              UnfilteredRowIterator rows = rows(metadata, key, false, unfiltereds);
+              ISSTableScanner scanner = new Scanner(Collections.singletonList(rows));
+              CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+                                                               Collections.singletonList(scanner),
+                                                               controller, FBUtilities.nowInSeconds(), null))
+         {
+             while (iter.hasNext())
+             {
+                 try (UnfilteredRowIterator partition = iter.next())
+                 {
+                     partition.forEachRemaining(u -> {});
+                 }
+             }
+         }
+     }
  }
diff --cc test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
index 0000000,432bce3..6c3a5c0
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
+++ b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
@@@ -1,0 -1,246 +1,234 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.transform;
+ 
+ import java.net.InetAddress;
+ import java.nio.ByteBuffer;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ 
+ import com.google.common.collect.Iterators;
+ import org.junit.*;
+ 
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.partitions.*;
+ import org.apache.cassandra.db.rows.*;
++import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.*;
++import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.utils.DiagnosticSnapshotService;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class DuplicateRowCheckerTest extends CQLTester
+ {
+     ColumnFamilyStore cfs;
 -    CFMetaData metadata;
 -    static HashMap<InetAddress, MessageOut> sentMessages;
++    TableMetadata metadata;
++    static HashMap<InetAddressAndPort, Message<?>> sentMessages;
+ 
+     @BeforeClass
+     public static void setupMessaging()
+     {
+         sentMessages = new HashMap<>();
 -        IMessageSink sink = new IMessageSink()
 -        {
 -            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
 -            {
 -                sentMessages.put(to, message);
 -                return false;
 -            }
 -
 -            public boolean allowIncomingMessage(MessageIn message, int id)
 -            {
 -                return false;
 -            }
 -        };
 -        MessagingService.instance().addMessageSink(sink);
++        MessagingService.instance().outboundSink.add((message, to) -> { sentMessages.put(to, message); return false;});
+     }
+ 
+     @Before
+     public void setup() throws Throwable
+     {
+         DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+         System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0");
+         // Create a table and insert some data. The actual rows read in the test will be synthetic
+         // but this creates an sstable on disk to be snapshotted.
+         createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))");
+         for (int i = 0; i < 10; i++)
+             execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i);
+         getCurrentColumnFamilyStore().forceBlockingFlush();
+ 
 -        metadata = getCurrentColumnFamilyStore().metadata;
++        metadata = getCurrentColumnFamilyStore().metadata();
+         cfs = getCurrentColumnFamilyStore();
+         sentMessages.clear();
+     }
+ 
+     @Test
+     public void noDuplicates()
+     {
+         // no duplicates
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, false);
+     }
+ 
+     @Test
+     public void singleDuplicateForward()
+     {
+ 
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void singleDuplicateReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleContiguousForward()
+     {
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleContiguousReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleDisjointForward()
+     {
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleDisjointReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
 -    public static void assertCommandIssued(HashMap<InetAddress, MessageOut> sent, boolean isExpected)
++    public static void assertCommandIssued(HashMap<InetAddressAndPort, Message<?>> sent, boolean isExpected)
+     {
+         assertEquals(isExpected, !sent.isEmpty());
+         if (isExpected)
+         {
+             assertEquals(1, sent.size());
 -            assertTrue(sent.containsKey(FBUtilities.getBroadcastAddress()));
 -            SnapshotCommand command = (SnapshotCommand) sent.get(FBUtilities.getBroadcastAddress()).payload;
++            assertTrue(sent.containsKey(FBUtilities.getBroadcastAddressAndPort()));
++            SnapshotCommand command = (SnapshotCommand) sent.get(FBUtilities.getBroadcastAddressAndPort()).payload;
+             assertTrue(command.snapshot_name.startsWith(DiagnosticSnapshotService.DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX));
+         }
+     }
+ 
+     private void iterate(UnfilteredPartitionIterator iter)
+     {
+         try (PartitionIterator partitions = applyChecker(iter))
+         {
+             while (partitions.hasNext())
+             {
+                 try (RowIterator partition = partitions.next())
+                 {
+                     partition.forEachRemaining(u -> {});
+                 }
+             }
+         }
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+     {
+         return ((AbstractType<T>) type).decompose(value);
+     }
+ 
 -    public static Row makeRow(CFMetaData metadata, Object... clusteringValues)
++    public static Row makeRow(TableMetadata metadata, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
+         return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(0, 0));
+     }
+ 
 -    public static UnfilteredRowIterator rows(CFMetaData metadata,
++    public static UnfilteredRowIterator rows(TableMetadata metadata,
+                                              DecoratedKey key,
+                                              boolean isReversedOrder,
+                                              Unfiltered... unfiltereds)
+     {
+         Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+         return new AbstractUnfilteredRowIterator(metadata,
+                                                  key,
+                                                  DeletionTime.LIVE,
 -                                                 metadata.partitionColumns(),
++                                                 metadata.regularAndStaticColumns(),
+                                                  Rows.EMPTY_STATIC_ROW,
+                                                  isReversedOrder,
+                                                  EncodingStats.NO_STATS)
+         {
+             protected Unfiltered computeNext()
+             {
+                 return iterator.hasNext() ? iterator.next() : endOfData();
+             }
+         };
+     }
+ 
+     private static PartitionIterator applyChecker(UnfilteredPartitionIterator unfiltered)
+     {
+         int nowInSecs = 0;
+         return DuplicateRowChecker.duringRead(FilteredPartitions.filter(unfiltered, nowInSecs),
 -                                              Collections.singletonList(FBUtilities.getBroadcastAddress()));
++                                              Collections.singletonList(FBUtilities.getBroadcastAddressAndPort()));
+     }
+ 
 -    public static UnfilteredPartitionIterator iter(CFMetaData metadata, boolean isReversedOrder, Unfiltered... unfiltereds)
++    public static UnfilteredPartitionIterator iter(TableMetadata metadata, boolean isReversedOrder, Unfiltered... unfiltereds)
+     {
+         DecoratedKey key = metadata.partitioner.decorateKey(bytes("key"));
+         UnfilteredRowIterator rowIter = rows(metadata, key, isReversedOrder, unfiltereds);
 -        return new SingletonUnfilteredPartitionIterator(rowIter, false);
++        return new SingletonUnfilteredPartitionIterator(rowIter);
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org