You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2018/09/05 18:13:34 UTC

[2/2] cassandra git commit: Detect inconsistencies in repaired data on the read path

Detect inconsistencies in repaired data on the read path

Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Jordan West for CASSANDRA-14145


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5fbb938a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5fbb938a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5fbb938a

Branch: refs/heads/trunk
Commit: 5fbb938adaafd91e7bea1672f09a03c7ac5b9b9d
Parents: 744973e
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed May 9 18:57:30 2018 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 5 19:07:12 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  18 +
 .../org/apache/cassandra/config/Config.java     |  20 +
 .../cassandra/config/DatabaseDescriptor.java    |  29 ++
 .../cassandra/db/PartitionRangeReadCommand.java |  33 +-
 .../org/apache/cassandra/db/ReadCommand.java    | 346 +++++++++++++++++-
 .../cassandra/db/ReadCommandVerbHandler.java    |   4 +
 .../org/apache/cassandra/db/ReadResponse.java   | 123 ++++++-
 .../db/SinglePartitionReadCommand.java          |  60 +--
 .../db/filter/ClusteringIndexNamesFilter.java   |  14 +
 .../db/rows/UnfilteredRowIterators.java         |   1 +
 .../cassandra/metrics/KeyspaceMetrics.java      |  22 ++
 .../apache/cassandra/metrics/TableMetrics.java  |  50 +++
 .../org/apache/cassandra/net/ParameterType.java |   3 +-
 .../repair/consistent/LocalSessions.java        |  17 +
 .../apache/cassandra/service/StorageProxy.java  |  76 +++-
 .../cassandra/service/StorageProxyMBean.java    |  15 +
 .../cassandra/service/reads/DataResolver.java   |  44 ++-
 .../service/reads/ShortReadProtection.java      |   1 +
 .../reads/repair/AbstractReadRepair.java        |  21 +-
 .../reads/repair/RepairedDataTracker.java       |  87 +++++
 .../reads/repair/RepairedDataVerifier.java      |  95 +++++
 .../apache/cassandra/db/ReadCommandTest.java    | 287 ++++++++++++++-
 .../db/ReadCommandVerbHandlerTest.java          | 171 +++++++++
 .../apache/cassandra/db/ReadResponseTest.java   | 261 +++++++++++++
 .../service/reads/AbstractReadResponseTest.java |  32 +-
 .../service/reads/DataResolverTest.java         | 363 +++++++++++++++++++
 .../reads/repair/BlockingReadRepairTest.java    |   4 +-
 .../DiagEventsBlockingReadRepairTest.java       |   2 +-
 .../reads/repair/ReadOnlyReadRepairTest.java    |   5 +-
 .../reads/repair/RepairedDataVerifierTest.java  | 291 +++++++++++++++
 31 files changed, 2399 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 301f97f..5cfc7ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
  * Add checksumming to the native protocol (CASSANDRA-13304)
  * Make AuthCache more easily extendable (CASSANDRA-14662)
  * Extend RolesCache to include detailed role info (CASSANDRA-14497)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 995a520..190ce77 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1238,3 +1238,21 @@ diagnostic_events_enabled: false
 # Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
 # legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
 #native_transport_flush_in_batches_legacy: false
+
+# Enable tracking of repaired state of data during reads and comparison between replicas
+# Mismatches between the repaired sets of replicas can be characterized as either confirmed
+# or unconfirmed. In this context, unconfirmed indicates that the presence of pending repair
+# sessions, unrepaired partition tombstones, or some other condition means that the disparity
+# cannot be considered conclusive. Confirmed mismatches should be a trigger for investigation
+# as they may be indicative of corruption or data loss.
+# There are separate flags for range vs partition reads as single partition reads are only tracked
+# when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+# enabled for range reads, all range reads will include repaired data tracking. As this adds
+# some overhead, operators may wish to disable it whilst still enabling it for partition reads
+repaired_data_tracking_for_range_reads_enabled: false
+repaired_data_tracking_for_partition_reads_enabled: false
+# If false, only confirmed mismatches will be reported. If true, a separate metric for unconfirmed
+# mismatches will also be recorded. This is to avoid potential signal:noise issues are unconfirmed
+# mismatches are less actionable than confirmed ones.
+report_unconfirmed_repaired_data_mismatches: false
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b04f9ec..782815e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -395,6 +395,26 @@ public class Config
     public volatile boolean diagnostic_events_enabled = false;
 
     /**
+     * flags for enabling tracking repaired state of data during reads
+     * separate flags for range & single partition reads as single partition reads are only tracked
+     * when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+     * enabled for range reads, all such reads will include repaired data tracking. As this adds
+     * some overhead, operators may wish to disable it whilst still enabling it for partition reads
+     */
+    public volatile boolean repaired_data_tracking_for_range_reads_enabled = false;
+    public volatile boolean repaired_data_tracking_for_partition_reads_enabled = false;
+    /* If true, unconfirmed mismatches (those which cannot be considered conclusive proof of out of
+     * sync repaired data due to the presence of pending repair sessions, or unrepaired partition
+     * deletes) will increment a metric, distinct from confirmed mismatches. If false, unconfirmed
+     * mismatches are simply ignored by the coordinator.
+     * This is purely to allow operators to avoid potential signal:noise issues as these types of
+     * mismatches are considerably less actionable than their confirmed counterparts. Setting this
+     * to true only disables the incrementing of the counters when an unconfirmed mismatch is found
+     * and has no other effect on the collection or processing of the repaired data.
+     */
+    public volatile boolean report_unconfirmed_repaired_data_mismatches = false;
+
+    /**
      * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
      */
     @Deprecated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ddea8f4..2ad9b18 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2672,4 +2672,33 @@ public class DatabaseDescriptor
         conf.corrupted_tombstone_strategy = strategy;
     }
 
+    public static boolean getRepairedDataTrackingForRangeReadsEnabled()
+    {
+        return conf.repaired_data_tracking_for_range_reads_enabled;
+    }
+
+    public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled)
+    {
+        conf.repaired_data_tracking_for_range_reads_enabled = enabled;
+    }
+
+    public static boolean getRepairedDataTrackingForPartitionReadsEnabled()
+    {
+        return conf.repaired_data_tracking_for_partition_reads_enabled;
+    }
+
+    public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled)
+    {
+        conf.repaired_data_tracking_for_partition_reads_enabled = enabled;
+    }
+
+    public static boolean reportUnconfirmedRepairedDataMismatches()
+    {
+        return conf.report_unconfirmed_repaired_data_mismatches;
+    }
+
+    public static void reportUnconfirmedRepairedDataMismatches(boolean enabled)
+    {
+        conf.report_unconfirmed_repaired_data_mismatches = enabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 7eab016..79db18a 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -18,12 +18,10 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.*;
@@ -46,7 +44,6 @@ import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * A read command that selects a (part of a) range of partitions.
@@ -56,7 +53,6 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
     protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 
     private final DataRange dataRange;
-    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 
     private PartitionRangeReadCommand(boolean isDigest,
                                      int digestVersion,
@@ -257,8 +253,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));
 
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.
-        final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
-
+        InputCollector<UnfilteredPartitionIterator> inputCollector = iteratorsForRange(view);
         try
         {
             for (Memtable memtable : view.memtables)
@@ -266,7 +261,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                 @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
                 Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange());
                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
-                iterators.add(iter);
+                inputCollector.addMemtableIterator(iter);
             }
 
             SSTableReadsListener readCountUpdater = newReadCountUpdater();
@@ -274,25 +269,27 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
             {
                 @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
                 UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater);
-                iterators.add(iter);
+                inputCollector.addSSTableIterator(sstable, iter);
+
                 if (!sstable.isRepaired())
                     oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
             }
             // iterators can be empty for offline tools
-            return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata())
-                                       : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators), cfs);
+            if (inputCollector.isEmpty())
+                return EmptyIterators.unfilteredPartition(metadata());
+
+            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators()), cfs);
         }
         catch (RuntimeException | Error e)
         {
             try
             {
-                FBUtilities.closeAll(iterators);
+                inputCollector.close();
             }
-            catch (Exception suppressed)
+            catch (Exception e1)
             {
-                e.addSuppressed(suppressed);
+                e.addSuppressed(e1);
             }
-
             throw e;
         }
     }
@@ -313,12 +310,6 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                 };
     }
 
-    @Override
-    protected int oldestUnrepairedTombstone()
-    {
-        return oldestUnrepairedTombstone;
-    }
-
     private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
     {
         class CacheFilter extends Transformation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 736e3a3..e146b8a 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,10 +18,17 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.BiFunction;
 import java.util.function.LongPredicate;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +46,7 @@ import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
 import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
@@ -48,9 +56,12 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingUtils;
 
 /**
  * General interface for storage-engine read commands (common to both range and
@@ -71,6 +82,23 @@ public abstract class ReadCommand extends AbstractReadQuery
     // if a digest query, the version for which the digest is expected. Ignored if not a digest.
     private int digestVersion;
 
+    // for data queries, coordinators may request information on the repaired data used in constructing the response
+    private boolean trackRepairedStatus = false;
+    // tracker for repaired data, initialized to singelton null object
+    private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
+    {
+        void trackPartitionKey(DecoratedKey key){}
+        void trackDeletion(DeletionTime deletion){}
+        void trackRangeTombstoneMarker(RangeTombstoneMarker marker){}
+        void trackRow(Row row){}
+        boolean isConclusive(){ return true; }
+        ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
+    };
+
+    private RepairedDataInfo repairedDataInfo = NULL_REPAIRED_DATA_INFO;
+
+    int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
     @Nullable
     private final IndexMetadata index;
 
@@ -187,6 +215,68 @@ public abstract class ReadCommand extends AbstractReadQuery
     }
 
     /**
+     * Activates repaired data tracking for this command.
+     *
+     * When active, a digest will be created from data read from repaired SSTables. The digests
+     * from each replica can then be compared on the coordinator to detect any divergence in their
+     * repaired datasets. In this context, an sstable is considered repaired if it is marked
+     * repaired or has a pending repair session which has been committed.
+     * In addition to the digest, a set of ids for any pending but as yet uncommitted repair sessions
+     * is recorded and returned to the coordinator. This is to help reduce false positives caused
+     * by compaction lagging which can leave sstables from committed sessions in the pending state
+     * for a time.
+     */
+    public void trackRepairedStatus()
+    {
+        trackRepairedStatus = true;
+    }
+
+    /**
+     * Whether or not repaired status of any data read is being tracked or not
+     *
+     * @return Whether repaired status tracking is active for this command
+     */
+    public boolean isTrackingRepairedStatus()
+    {
+        return trackRepairedStatus;
+    }
+
+    /**
+     * Returns a digest of the repaired data read in the execution of this command.
+     *
+     * If either repaired status tracking is not active or the command has not yet been
+     * executed, then this digest will be an empty buffer.
+     * Otherwise, it will contain a digest* of the repaired data read, or empty buffer
+     * if no repaired data was read.
+     * @return digest of the repaired data read in the execution of the command
+     */
+    public ByteBuffer getRepairedDataDigest()
+    {
+        return repairedDataInfo.getDigest();
+    }
+
+    /**
+     * Returns a boolean indicating whether any relevant sstables were skipped during the read
+     * that produced the repaired data digest.
+     *
+     * If true, then no pending repair sessions or partition deletes have influenced the extent
+     * of the repaired sstables that went into generating the digest.
+     * This indicates whether or not the digest can reliably be used to infer consistency
+     * issues between the repaired sets across replicas.
+     *
+     * If either repaired status tracking is not active or the command has not yet been
+     * executed, then this will always return true.
+     *
+     * @return boolean to indicate confidence in the dwhether or not the digest of the repaired data can be
+     * reliably be used to infer inconsistency issues between the repaired sets across
+     * replicas.
+     */
+    public boolean isRepairedDataDigestConclusive()
+    {
+        return repairedDataInfo.isConclusive();
+    }
+
+    /**
      * Index (metadata) chosen for this query. Can be null.
      *
      * @return index (metadata) chosen for this query
@@ -225,7 +315,11 @@ public abstract class ReadCommand extends AbstractReadQuery
 
     protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
 
-    protected abstract int oldestUnrepairedTombstone();
+    protected int oldestUnrepairedTombstone()
+    {
+        return oldestUnrepairedTombstone;
+    }
+
 
     @SuppressWarnings("resource")
     public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
@@ -305,6 +399,9 @@ public abstract class ReadCommand extends AbstractReadQuery
             Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
         }
 
+        if (isTrackingRepairedStatus())
+            repairedDataInfo = new RepairedDataInfo();
+
         UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
 
         try
@@ -569,6 +666,253 @@ public abstract class ReadCommand extends AbstractReadQuery
         return toCQLString();
     }
 
+    private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
+                                                               final RepairedDataInfo repairedDataInfo)
+    {
+        class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
+        {
+            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+            {
+                return withRepairedDataInfo(partition, repairedDataInfo);
+            }
+        }
+
+        return Transformation.apply(iterator, new WithRepairedDataTracking());
+    }
+
+    private static UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator,
+                                                              final RepairedDataInfo repairedDataInfo)
+    {
+        class WithTracking extends Transformation
+        {
+            protected DecoratedKey applyToPartitionKey(DecoratedKey key)
+            {
+                repairedDataInfo.trackPartitionKey(key);
+                return key;
+            }
+
+            protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+            {
+                repairedDataInfo.trackDeletion(deletionTime);
+                return deletionTime;
+            }
+
+            protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+            {
+                repairedDataInfo.trackRangeTombstoneMarker(marker);
+                return marker;
+            }
+
+            protected Row applyToStatic(Row row)
+            {
+                repairedDataInfo.trackRow(row);
+                return row;
+            }
+
+            protected Row applyToRow(Row row)
+            {
+                repairedDataInfo.trackRow(row);
+                return row;
+            }
+        }
+
+        return Transformation.apply(iterator, new WithTracking());
+    }
+
+    private static class RepairedDataInfo
+    {
+        private Hasher hasher;
+        private boolean isConclusive = true;
+
+        ByteBuffer getDigest()
+        {
+            return hasher == null
+                   ? ByteBufferUtil.EMPTY_BYTE_BUFFER
+                   : ByteBuffer.wrap(getHasher().hash().asBytes());
+        }
+
+        boolean isConclusive()
+        {
+            return isConclusive;
+        }
+
+        void markInconclusive()
+        {
+            isConclusive = false;
+        }
+
+        void trackPartitionKey(DecoratedKey key)
+        {
+            HashingUtils.updateBytes(getHasher(), key.getKey().duplicate());
+        }
+
+        void trackDeletion(DeletionTime deletion)
+        {
+            deletion.digest(getHasher());
+        }
+
+        void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
+        {
+            marker.digest(getHasher());
+        }
+
+        void trackRow(Row row)
+        {
+            row.digest(getHasher());
+        }
+
+        private Hasher getHasher()
+        {
+            if (hasher == null)
+                hasher = Hashing.crc32c().newHasher();
+
+            return hasher;
+        }
+    }
+
+    @SuppressWarnings("resource") // resultant iterators are closed by their callers
+    InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view)
+    {
+        BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
+            (unfilteredRowIterators, repairedDataInfo) ->
+                withRepairedDataInfo(UnfilteredRowIterators.merge(unfilteredRowIterators), repairedDataInfo);
+
+        return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+    }
+
+    @SuppressWarnings("resource") // resultant iterators are closed by their callers
+    InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view)
+    {
+        BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
+            (unfilteredPartitionIterators, repairedDataInfo) ->
+                withRepairedDataInfo(UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, UnfilteredPartitionIterators.MergeListener.NOOP), repairedDataInfo);
+
+        return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+    }
+
+    /**
+     * Handles the collation of unfiltered row or partition iterators that comprise the
+     * input for a query. Separates them according to repaired status and of repaired
+     * status is being tracked, handles the merge and wrapping in a digest generator of
+     * the repaired iterators.
+     *
+     * Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks
+     * as this prematurely closes the underlying iterators
+     */
+    static class InputCollector<T extends AutoCloseable>
+    {
+        final RepairedDataInfo repairedDataInfo;
+        private final boolean isTrackingRepairedStatus;
+        Set<SSTableReader> repairedSSTables;
+        BiFunction<List<T>, RepairedDataInfo, T> repairedMerger;
+        List<T> repairedIters;
+        List<T> unrepairedIters;
+
+        InputCollector(ColumnFamilyStore.ViewFragment view,
+                       RepairedDataInfo repairedDataInfo,
+                       BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
+                       boolean isTrackingRepairedStatus)
+        {
+            this.repairedDataInfo = repairedDataInfo;
+            this.isTrackingRepairedStatus = isTrackingRepairedStatus;
+            if (isTrackingRepairedStatus)
+            {
+                for (SSTableReader sstable : view.sstables)
+                {
+                    if (considerRepairedForTracking(sstable))
+                    {
+                        if (repairedSSTables == null)
+                            repairedSSTables = Sets.newHashSetWithExpectedSize(view.sstables.size());
+                        repairedSSTables.add(sstable);
+                    }
+                }
+            }
+            if (repairedSSTables == null)
+            {
+                repairedIters = Collections.emptyList();
+                unrepairedIters = new ArrayList<>(view.sstables.size());
+            }
+            else
+            {
+                repairedIters = new ArrayList<>(repairedSSTables.size());
+                // when we're done collating, we'll merge the repaired iters and add the
+                // result to the unrepaired list, so size that list accordingly
+                unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
+            }
+            this.repairedMerger = repairedMerger;
+        }
+
+        void addMemtableIterator(T iter)
+        {
+            unrepairedIters.add(iter);
+        }
+
+        void addSSTableIterator(SSTableReader sstable, T iter)
+        {
+            if (repairedSSTables != null && repairedSSTables.contains(sstable))
+                repairedIters.add(iter);
+            else
+                unrepairedIters.add(iter);
+        }
+
+        List<T> finalizeIterators()
+        {
+            if (repairedIters.isEmpty())
+                return unrepairedIters;
+
+            // merge the repaired data before returning, wrapping in a digest generator
+            unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
+            return unrepairedIters;
+        }
+
+        boolean isEmpty()
+        {
+            return repairedIters.isEmpty() && unrepairedIters.isEmpty();
+        }
+
+        // For tracking purposes we consider data repaired if the sstable is either:
+        // * marked repaired
+        // * marked pending, but the local session has been committed. This reduces the window
+        //   whereby the tracking is affected by compaction backlog causing repaired sstables to
+        //   remain in the pending state
+        // If an sstable is involved in a pending repair which is not yet committed, we mark the
+        // repaired data info inconclusive, as the same data on other replicas may be in a
+        // slightly different state.
+        private boolean considerRepairedForTracking(SSTableReader sstable)
+        {
+            if (!isTrackingRepairedStatus)
+                return false;
+
+            UUID pendingRepair = sstable.getPendingRepair();
+            if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
+            {
+                if (ActiveRepairService.instance.consistent.local.isSessionFinalized(pendingRepair))
+                    return true;
+
+                // In the edge case where compaction is backed up long enough for the session to
+                // timeout and be purged by LocalSessions::cleanup, consider the sstable unrepaired
+                // as it will be marked unrepaired when compaction catches up
+                if (!ActiveRepairService.instance.consistent.local.sessionExists(pendingRepair))
+                    return false;
+
+                repairedDataInfo.markInconclusive();
+            }
+
+            return sstable.isRepaired();
+        }
+
+        void markInconclusive()
+        {
+            repairedDataInfo.markInconclusive();
+        }
+
+        public void close() throws Exception
+        {
+            FBUtilities.closeAll(unrepairedIters);
+            FBUtilities.closeAll(repairedIters);
+        }
+    }
+
     private static class Serializer implements IVersionedSerializer<ReadCommand>
     {
         private static int digestFlag(boolean isDigest)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index a71e92d..1b28c2c 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -43,6 +44,9 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         ReadCommand command = message.payload;
         command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout());
 
+        if (message.parameters.containsKey(ParameterType.TRACK_REPAIRED_DATA))
+            command.trackRepairedStatus();
+
         ReadResponse response;
         try (ReadExecutionController executionController = command.executionController();
              UnfilteredPartitionIterator iterator = command.executeLocally(executionController))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 486980d..2ddb6a7 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -51,11 +51,19 @@ public abstract class ReadResponse
     }
 
     @VisibleForTesting
-    public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
+    public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data,
+                                                        ByteBuffer repairedDataDigest,
+                                                        boolean isRepairedDigestConclusive,
+                                                        ReadCommand command,
+                                                        int version)
     {
-        return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()), MessagingService.current_version);
+        return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()),
+                                      repairedDataDigest,
+                                      isRepairedDigestConclusive,
+                                      version);
     }
 
+
     public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command)
     {
         return new DigestResponse(makeDigest(data, command));
@@ -63,6 +71,9 @@ public abstract class ReadResponse
 
     public abstract UnfilteredPartitionIterator makeIterator(ReadCommand command);
     public abstract ByteBuffer digest(ReadCommand command);
+    public abstract ByteBuffer repairedDataDigest();
+    public abstract boolean isRepairedDigestConclusive();
+    public abstract boolean mayIncludeRepairedDigest();
 
     public abstract boolean isDigestResponse();
 
@@ -85,18 +96,22 @@ public abstract class ReadResponse
                 }
             }
         }
-        return "<key " + key + " not found>";
+        return String.format("<key %s not found (repaired_digest=%s repaired_digest_conclusive=%s)>",
+                             key, ByteBufferUtil.bytesToHex(repairedDataDigest()), isRepairedDigestConclusive());
     }
 
     private String toDebugString(UnfilteredRowIterator partition, TableMetadata metadata)
     {
         StringBuilder sb = new StringBuilder();
 
-        sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s",
+        sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s repaired_digest=%s repaired_digest_conclusive==%s",
                                 metadata,
                                 metadata.partitionKeyType.getString(partition.partitionKey().getKey()),
                                 partition.partitionLevelDeletion(),
-                                partition.columns()));
+                                partition.columns(),
+                                ByteBufferUtil.bytesToHex(repairedDataDigest()),
+                                isRepairedDigestConclusive()
+                                ));
 
         if (partition.staticRow() != Rows.EMPTY_STATIC_ROW)
             sb.append("\n    ").append(partition.staticRow().toString(metadata, true));
@@ -130,6 +145,21 @@ public abstract class ReadResponse
             throw new UnsupportedOperationException();
         }
 
+        public boolean mayIncludeRepairedDigest()
+        {
+            return false;
+        }
+
+        public ByteBuffer repairedDataDigest()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isRepairedDigestConclusive()
+        {
+            throw new UnsupportedOperationException();
+        }
+
         public ByteBuffer digest(ReadCommand command)
         {
             // We assume that the digest is in the proper version, which bug excluded should be true since this is called with
@@ -150,7 +180,11 @@ public abstract class ReadResponse
     {
         private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
         {
-            super(build(iter, command.columnFilter()), MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+            super(build(iter, command.columnFilter()),
+                  command.getRepairedDataDigest(),
+                  command.isRepairedDataDigestConclusive(),
+                  MessagingService.current_version,
+                  SerializationHelper.Flag.LOCAL);
         }
 
         private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
@@ -171,9 +205,12 @@ public abstract class ReadResponse
     // built on the coordinator node receiving a response
     private static class RemoteDataResponse extends DataResponse
     {
-        protected RemoteDataResponse(ByteBuffer data, int version)
+        protected RemoteDataResponse(ByteBuffer data,
+                                     ByteBuffer repairedDataDigest,
+                                     boolean isRepairedDigestConclusive,
+                                     int version)
         {
-            super(data, version, SerializationHelper.Flag.FROM_REMOTE);
+            super(data, repairedDataDigest, isRepairedDigestConclusive, version, SerializationHelper.Flag.FROM_REMOTE);
         }
     }
 
@@ -182,13 +219,21 @@ public abstract class ReadResponse
         // TODO: can the digest be calculated over the raw bytes now?
         // The response, serialized in the current messaging version
         private final ByteBuffer data;
+        private final ByteBuffer repairedDataDigest;
+        private final boolean isRepairedDigestConclusive;
         private final int dataSerializationVersion;
         private final SerializationHelper.Flag flag;
 
-        protected DataResponse(ByteBuffer data, int dataSerializationVersion, SerializationHelper.Flag flag)
+        protected DataResponse(ByteBuffer data,
+                               ByteBuffer repairedDataDigest,
+                               boolean isRepairedDigestConclusive,
+                               int dataSerializationVersion,
+                               SerializationHelper.Flag flag)
         {
             super();
             this.data = data;
+            this.repairedDataDigest = repairedDataDigest;
+            this.isRepairedDigestConclusive = isRepairedDigestConclusive;
             this.dataSerializationVersion = dataSerializationVersion;
             this.flag = flag;
         }
@@ -213,6 +258,21 @@ public abstract class ReadResponse
             }
         }
 
+        public boolean mayIncludeRepairedDigest()
+        {
+            return dataSerializationVersion >= MessagingService.VERSION_40;
+        }
+
+        public ByteBuffer repairedDataDigest()
+        {
+            return repairedDataDigest;
+        }
+
+        public boolean isRepairedDigestConclusive()
+        {
+            return isRepairedDigestConclusive;
+        }
+
         public ByteBuffer digest(ReadCommand command)
         {
             try (UnfilteredPartitionIterator iterator = makeIterator(command))
@@ -233,10 +293,25 @@ public abstract class ReadResponse
         {
             boolean isDigest = response instanceof DigestResponse;
             ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
             ByteBufferUtil.writeWithVIntLength(digest, out);
             if (!isDigest)
             {
+                // From 4.0, a coordinator may request additional info about the repaired data that
+                // makes up the response, namely a digest generated from the repaired data and a
+                // flag indicating our level of confidence in that digest. The digest may be considered
+                // inconclusive if it may have been affected by some unrepaired data during read.
+                // e.g. some sstables read during this read were involved in pending but not yet
+                // committed repair sessions or an unrepaired partition tombstone meant that not all
+                // repaired sstables were read (but they might be on other replicas).
+                // If the coordinator did not request this info, the response contains an empty digest
+                // and a true for the isConclusive flag.
+                // If the messaging version is < 4.0, these are omitted altogether.
+                if (version >= MessagingService.VERSION_40)
+                {
+                    ByteBufferUtil.writeWithVIntLength(response.repairedDataDigest(), out);
+                    out.writeBoolean(response.isRepairedDigestConclusive());
+                }
+
                 ByteBuffer data = ((DataResponse)response).data;
                 ByteBufferUtil.writeWithVIntLength(data, out);
             }
@@ -248,18 +323,42 @@ public abstract class ReadResponse
             if (digest.hasRemaining())
                 return new DigestResponse(digest);
 
+            // A data response may also contain a digest of the portion of its payload
+            // that comes from the replica's repaired set, along with a flag indicating
+            // whether or not the digest may be influenced by unrepaired/pending
+            // repaired data
+            boolean repairedDigestConclusive;
+            if (version >= MessagingService.VERSION_40)
+            {
+                digest = ByteBufferUtil.readWithVIntLength(in);
+                repairedDigestConclusive = in.readBoolean();
+            }
+            else
+            {
+                digest = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                repairedDigestConclusive = true;
+            }
+
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
-            return new RemoteDataResponse(data, version);
+            return new RemoteDataResponse(data, digest, repairedDigestConclusive, version);
         }
 
         public long serializedSize(ReadResponse response, int version)
         {
             boolean isDigest = response instanceof DigestResponse;
             ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
             long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
+
             if (!isDigest)
             {
+                // From 4.0, a coordinator may request an additional info about the repaired data
+                // that makes up the response.
+                if (version >= MessagingService.VERSION_40)
+                {
+                    size += ByteBufferUtil.serializedSizeWithVIntLength(response.repairedDataDigest());
+                    size += 1;
+                }
+
                 // In theory, we should deserialize/re-serialize if the version asked is different from the current
                 // version as the content could have a different serialization format. So far though, we haven't made
                 // change to partition iterators serialization since 3.0 so we skip this.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c81185e..e99a487 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.cache.IRowCacheEntry;
@@ -44,12 +43,11 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SearchIterator;
@@ -65,8 +63,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
     private final DecoratedKey partitionKey;
     private final ClusteringIndexFilter clusteringIndexFilter;
 
-    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
-
     @VisibleForTesting
     protected SinglePartitionReadCommand(boolean isDigest,
                                          int digestVersion,
@@ -393,7 +389,9 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
     @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
     protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
     {
-        UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+        // skip the row cache and go directly to sstables/memtable if repaired status of
+        // data is being tracked. This is only requested after an initial digest mismatch
+        UnfilteredRowIterator partition = cfs.isRowCacheEnabled() && !isTrackingRepairedStatus()
                                         ? getThroughCache(cfs, executionController)
                                         : queryMemtableAndDisk(cfs, executionController);
         return new SingletonUnfilteredPartitionIterator(partition);
@@ -564,12 +562,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         return queryMemtableAndDiskInternal(cfs);
     }
 
-    @Override
-    protected int oldestUnrepairedTombstone()
-    {
-        return oldestUnrepairedTombstone;
-    }
-
     private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs)
     {
         /*
@@ -582,16 +574,19 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
          *      and if we have neither non-frozen collections/UDTs nor counters (indeed, for a non-frozen collection or UDT,
          *      we can't guarantee an older sstable won't have some elements that weren't in the most recent sstables,
          *      and counters are intrinsically a collection of shards and so have the same problem).
+         *      Also, if tracking repaired data then we skip this optimization so we can collate the repaired sstables
+         *      and generate a digest over their merge, which procludes an early return.
          */
-        if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType())
+        if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType() && !isTrackingRepairedStatus())
             return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter());
 
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
-        List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
         ClusteringIndexFilter filter = clusteringIndexFilter();
         long minTimestamp = Long.MAX_VALUE;
-
+        long mostRecentPartitionTombstone = Long.MIN_VALUE;
+        InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view);
         try
         {
             for (Memtable memtable : view.memtables)
@@ -604,8 +599,13 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
 
                 @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
                 UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+
+                // Memtable data is always considered unrepaired
                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
-                iterators.add(iter);
+                inputCollector.addMemtableIterator(iter);
+
+                mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+                                                        iter.partitionLevelDeletion().markedForDeleteAt());
             }
 
             /*
@@ -620,18 +620,25 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
              * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
              * in one pass, and minimize the number of sstables for which we read a partition tombstone.
              */
-            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-            long mostRecentPartitionTombstone = Long.MIN_VALUE;
             int nonIntersectingSSTables = 0;
             List<SSTableReader> skippedSSTablesWithTombstones = null;
             SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
 
+            if (isTrackingRepairedStatus())
+                Tracing.trace("Collecting data from sstables and tracking repaired status");
+
             for (SSTableReader sstable : view.sstables)
             {
                 // if we've already seen a partition tombstone with a timestamp greater
                 // than the most recent update to this sstable, we can skip it
+                // if we're tracking repaired status, we mark the repaired digest inconclusive
+                // as other replicas may not have seen this partition delete and so could include
+                // data from this sstable (or others) in their digests
                 if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+                {
+                    inputCollector.markInconclusive();
                     break;
+                }
 
                 if (!shouldInclude(sstable))
                 {
@@ -654,7 +661,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 if (!sstable.isRepaired())
                     oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
-                iterators.add(iter);
+                inputCollector.addSSTableIterator(sstable, iter);
                 mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
                                                         iter.partitionLevelDeletion().markedForDeleteAt());
             }
@@ -674,7 +681,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                     if (!sstable.isRepaired())
                         oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
-                    iterators.add(iter);
+                    inputCollector.addSSTableIterator(sstable, iter);
                     includedDueToTombstones++;
                 }
             }
@@ -682,21 +689,22 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
                                nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
 
-            if (iterators.isEmpty())
+            if (inputCollector.isEmpty())
                 return EmptyIterators.unfilteredRow(cfs.metadata(), partitionKey(), filter.isReversed());
 
             StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
-            return withSSTablesIterated(iterators, cfs.metric, metricsCollector);
+
+            return withSSTablesIterated(inputCollector.finalizeIterators(), cfs.metric, metricsCollector);
         }
         catch (RuntimeException | Error e)
         {
             try
             {
-                FBUtilities.closeAll(iterators);
+                inputCollector.close();
             }
-            catch (Exception suppressed)
+            catch (Exception e1)
             {
-                e.addSuppressed(suppressed);
+                e.addSuppressed(e1);
             }
             throw e;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index 4850fd5..f25dc91 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -206,6 +206,20 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
         return sb.toString();
     }
 
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ClusteringIndexNamesFilter that = (ClusteringIndexNamesFilter) o;
+        return Objects.equals(clusterings, that.clusterings) &&
+               Objects.equals(reversed, that.reversed);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(clusterings, reversed);
+    }
+
     public Kind kind()
     {
         return Kind.NAMES;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 851e447..42807a2 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -599,4 +599,5 @@ public abstract class UnfilteredRowIterators
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 5a90804..f1df026 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -123,6 +124,24 @@ public class KeyspaceMetrics
     /** histogram over the number of partitions we have validated */
     public final Histogram partitionsValidated;
 
+    /*
+     * Metrics for inconsistencies detected between repaired data sets across replicas. These
+     * are tracked on the coordinator.
+     */
+
+    /**
+     * Incremented where an inconsistency is detected and there are no pending repair sessions affecting
+     * the data being read, indicating a genuine mismatch between replicas' repaired data sets.
+     */
+    public final Meter confirmedRepairedInconsistencies;
+    /**
+     * Incremented where an inconsistency is detected, but there are pending & uncommitted repair sessions
+     * in play on at least one replica. This may indicate a false positive as the inconsistency could be due to
+     * replicas marking the repair session as committed at slightly different times and so some consider it to
+     * be part of the repaired set whilst others do not.
+     */
+    public final Meter unconfirmedRepairedInconsistencies;
+
     public final MetricNameFactory factory;
     private Keyspace keyspace;
 
@@ -283,6 +302,9 @@ public class KeyspaceMetrics
         repairSyncTime = Metrics.timer(factory.createMetricName("RepairSyncTime"));
         partitionsValidated = Metrics.histogram(factory.createMetricName("PartitionsValidated"), false);
         bytesValidated = Metrics.histogram(factory.createMetricName("BytesValidated"), false);
+
+        confirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesConfirmed"));
+        unconfirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesUnconfirmed"));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 53ebcb0..52c50b8 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -217,6 +217,19 @@ public class TableMetrics
     public final Counter speculativeWrites;
     public final Gauge<Long> speculativeWriteLatencyNanos;
 
+    /**
+     * Metrics for inconsistencies detected between repaired data sets across replicas. These
+     * are tracked on the coordinator.
+     */
+    // Incremented where an inconsistency is detected and there are no pending repair sessions affecting
+    // the data being read, indicating a genuine mismatch between replicas' repaired data sets.
+    public final TableMeter confirmedRepairedInconsistencies;
+    // Incremented where an inconsistency is detected, but there are pending & uncommitted repair sessions
+    // in play on at least one replica. This may indicate a false positive as the inconsistency could be due to
+    // replicas marking the repair session as committed at slightly different times and so some consider it to
+    // be part of the repaired set whilst others do not.
+    public final TableMeter unconfirmedRepairedInconsistencies;
+
     public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -922,6 +935,9 @@ public class TableMetrics
 
         readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
         shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
+
+        confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies);
+        unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
     }
 
     public void updateSSTableIterated(int count)
@@ -1091,6 +1107,21 @@ public class TableMetrics
                                             globalAliasFactory.createMetricName(alias)));
     }
 
+    protected TableMeter createTableMeter(String name, Meter keyspaceMeter)
+    {
+        return createTableMeter(name, name, keyspaceMeter);
+    }
+
+    protected TableMeter createTableMeter(String name, String alias, Meter keyspaceMeter)
+    {
+        Meter meter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias));
+        register(name, alias, meter);
+        return new TableMeter(meter,
+                              keyspaceMeter,
+                              Metrics.meter(globalFactory.createMetricName(name),
+                                            globalAliasFactory.createMetricName(alias)));
+    }
+
     /**
      * Registers a metric to be removed when unloading CF.
      * @return true if first time metric with that name has been registered
@@ -1103,6 +1134,25 @@ public class TableMetrics
         return ret;
     }
 
+    public static class TableMeter
+    {
+        public final Meter[] all;
+        public final Meter table;
+        private TableMeter(Meter table, Meter keyspace, Meter global)
+        {
+            this.table = table;
+            this.all = new Meter[]{table, keyspace, global};
+        }
+
+        public void mark()
+        {
+            for (Meter meter : all)
+            {
+                meter.mark();
+            }
+        }
+    }
+
     public static class TableHistogram
     {
         public final Histogram[] all;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/net/ParameterType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ParameterType.java b/src/java/org/apache/cassandra/net/ParameterType.java
index 0a1f73f..b7a88a8 100644
--- a/src/java/org/apache/cassandra/net/ParameterType.java
+++ b/src/java/org/apache/cassandra/net/ParameterType.java
@@ -39,7 +39,8 @@ public enum ParameterType
     FAILURE_REASON("FAIL_REASON", ShortVersionedSerializer.instance),
     FAILURE_CALLBACK("CAL_BAC", DummyByteVersionedSerializer.instance),
     TRACE_SESSION("TraceSession", UUIDSerializer.serializer),
-    TRACE_TYPE("TraceType", Tracing.traceTypeSerializer);
+    TRACE_TYPE("TraceType", Tracing.traceTypeSerializer),
+    TRACK_REPAIRED_DATA("TrackRepaired", DummyByteVersionedSerializer.instance);
 
     public static final Map<String, ParameterType> byName;
     public final String key;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 4089e77..eac1ea0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -783,6 +783,23 @@ public class LocalSessions
         return session != null && session.getState() != FINALIZED && session.getState() != FAILED;
     }
 
+    /**
+     * determines if a local session exists, and if it's in the finalized state
+     */
+    public boolean isSessionFinalized(UUID sessionID)
+    {
+        LocalSession session = getSession(sessionID);
+        return session != null && session.getState() == FINALIZED;
+    }
+
+    /**
+     * determines if a local session exists
+     */
+    public boolean sessionExists(UUID sessionID)
+    {
+        return getSession(sessionID) != null;
+    }
+
     @VisibleForTesting
     protected boolean sessionHasData(LocalSession session)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c23eb88..9d9c628 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2122,12 +2122,21 @@ public class StorageProxy implements StorageProxyMBean
             Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
 
             ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver,
-                                                                              replicaLayout.consistencyLevel().blockFor(keyspace),
-                                                                              rangeCommand,
-                                                                              replicaLayout,
-                                                                              queryStartNanoTime);
+                                                                                                 replicaLayout.consistencyLevel().blockFor(keyspace),
+                                                                                                 rangeCommand,
+                                                                                                 replicaLayout,
+                                                                                                 queryStartNanoTime);
 
             handler.assureSufficientLiveNodes();
+
+            // If enabled, request repaired data tracking info from full replicas but
+            // only if there are multiple full replicas to compare results from
+            if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
+                && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+            {
+                command.trackRepairedStatus();
+            }
+
             if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal())
             {
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
@@ -2137,7 +2146,10 @@ public class StorageProxy implements StorageProxyMBean
                 for (Replica replica : replicaLayout.selected())
                 {
                     Tracing.trace("Enqueuing request to {}", replica);
-                    MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), replica.endpoint(), handler);
+                    MessageOut<ReadCommand> message = rangeCommand.createMessage();
+                    if (command.isTrackingRepairedStatus() && replica.isFull())
+                        message =  message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+                    MessagingService.instance().sendRRWithFailure(message, replica.endpoint(), handler);
                 }
             }
 
@@ -2798,6 +2810,60 @@ public class StorageProxy implements StorageProxyMBean
         DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
     }
 
+    @Override
+    public void enableRepairedDataTrackingForRangeReads()
+    {
+        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
+    }
+
+    @Override
+    public void disableRepairedDataTrackingForRangeReads()
+    {
+        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
+    }
+
+    @Override
+    public boolean getRepairedDataTrackingEnabledForRangeReads()
+    {
+        return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
+    }
+
+    @Override
+    public void enableRepairedDataTrackingForPartitionReads()
+    {
+        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
+    }
+
+    @Override
+    public void disableRepairedDataTrackingForPartitionReads()
+    {
+        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
+    }
+
+    @Override
+    public boolean getRepairedDataTrackingEnabledForPartitionReads()
+    {
+        return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
+    }
+
+    @Override
+    public void enableReportingUnconfirmedRepairedDataMismatches()
+    {
+        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+    }
+
+    @Override
+    public void disableReportingUnconfirmedRepairedDataMismatches()
+    {
+       DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
+    }
+
+    @Override
+    public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled()
+    {
+        return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
+    }
+
     static class PaxosBallotAndContention
     {
         final UUID ballot;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 76a6617..efc163d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -93,4 +93,19 @@ public interface StorageProxyMBean
      * Stop logging queries but leave any generated files on disk.
      */
     public void stopFullQueryLogger();
+
+    /**
+     * Tracking and reporting of variances in the repaired data set across replicas at read time
+     */
+    void enableRepairedDataTrackingForRangeReads();
+    void disableRepairedDataTrackingForRangeReads();
+    boolean getRepairedDataTrackingEnabledForRangeReads();
+
+    void enableRepairedDataTrackingForPartitionReads();
+    void disableRepairedDataTrackingForPartitionReads();
+    boolean getRepairedDataTrackingEnabledForPartitionReads();
+
+    void enableReportingUnconfirmedRepairedDataMismatches();
+    void disableReportingUnconfirmedRepairedDataMismatches();
+    boolean getReportingUnconfirmedRepairedDataMismatchesEnabled();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 9043e87..1f69d6a 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
@@ -43,13 +42,12 @@ import org.apache.cassandra.db.transform.Filter;
 import org.apache.cassandra.db.transform.FilteredPartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaCollection;
 import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
+import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 
 import static com.google.common.collect.Iterables.*;
 
@@ -84,9 +82,25 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
 
         E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
         List<UnfilteredPartitionIterator> iters = new ArrayList<>(
-                Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
+        Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
         assert replicas.size() == iters.size();
 
+        // If requested, inspect each response for a digest of the replica's repaired data set
+        RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
+                                                  ? new RepairedDataTracker(getRepairedDataVerifier(command))
+                                                  : null;
+        if (repairedDataTracker != null)
+        {
+            messages.forEach(msg -> {
+                if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from).isFull())
+                {
+                    repairedDataTracker.recordDigest(msg.from,
+                                                     msg.payload.repairedDataDigest(),
+                                                     msg.payload.isRepairedDigestConclusive());
+                }
+            });
+        }
+
         /*
          * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
          * have more rows than the client requested. To make sure that we still conform to the original limit,
@@ -105,17 +119,25 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
 
         UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
                                                                           replicaLayout.withSelected(replicas),
-                                                                          mergedResultCounter);
+                                                                          mergedResultCounter,
+                                                                          repairedDataTracker);
         FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
         PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
         return Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
+    protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
+    {
+        return RepairedDataVerifier.simple(command);
+    }
+
     private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
                                                                      L sources,
-                                                                     DataLimits.Counter mergedResultCounter)
+                                                                     DataLimits.Counter mergedResultCounter,
+                                                                     RepairedDataTracker repairedDataTracker)
     {
-        // If we have only one results, there is no read repair to do and we can't get short reads
+        // If we have only one results, there is no read repair to do, we can't get short
+        // reads and we can't make a comparison between repaired data sets
         if (results.size() == 1)
             return results.get(0);
 
@@ -127,7 +149,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
             for (int i = 0; i < results.size(); i++)
                 results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
 
-        return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources));
+        return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
     }
 
     private String makeResponsesDebugString(DecoratedKey partitionKey)
@@ -135,7 +157,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
         return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
     }
 
-    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources)
+    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources, RepairedDataTracker repairedDataTracker)
     {
         return new UnfilteredPartitionIterators.MergeListener()
         {
@@ -224,6 +246,8 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
             public void close()
             {
                 partitionListener.close();
+                if (repairedDataTracker != null)
+                    repairedDataTracker.verify();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
index ef1d45b..1a454f9 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service.reads;
 
 import java.net.InetAddress;
 
+
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 30dea74..493b9d0 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
 import com.google.common.base.Preconditions;
 
 import com.codahale.metrics.Meter;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
@@ -32,11 +33,12 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.service.reads.DataResolver;
 import org.apache.cassandra.service.reads.DigestResolver;
 import org.apache.cassandra.service.reads.ReadCallback;
@@ -75,9 +77,14 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
         this.cfs = Keyspace.openAndGetStore(command.metadata());
     }
 
-    void sendReadCommand(InetAddressAndPort to, ReadCallback readCallback)
+    void sendReadCommand(Replica to, ReadCallback readCallback)
     {
-        MessagingService.instance().sendRRWithFailure(command.createMessage(), to, readCallback);
+        MessageOut<ReadCommand> message = command.createMessage();
+        // if enabled, request additional info about repaired data from any full replicas
+        if (command.isTrackingRepairedStatus() && to.isFull())
+            message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+
+        MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback);
     }
 
     abstract Meter getRepairMeter();
@@ -94,10 +101,14 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
 
         digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
 
+        // if enabled, request additional info about repaired data from any full replicas
+        if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
+            command.trackRepairedStatus();
+
         for (Replica replica : replicaLayout.selected())
         {
             Tracing.trace("Enqueuing full data read to {}", replica);
-            sendReadCommand(replica.endpoint(), readCallback);
+            sendReadCommand(replica, readCallback);
         }
         ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints());
     }
@@ -137,7 +148,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
 
             Replica replica = uncontacted.selected().iterator().next();
             Tracing.trace("Enqueuing speculative full data read to {}", replica);
-            sendReadCommand(replica.endpoint(), repair.readCallback);
+            sendReadCommand(replica, repair.readCallback);
             ReadRepairMetrics.speculatedRead.mark();
             ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
new file mode 100644
index 0000000..5024e86
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class RepairedDataTracker
+{
+    private final RepairedDataVerifier verifier;
+
+    public final Multimap<ByteBuffer, InetAddressAndPort> digests = HashMultimap.create();
+    public final Set<InetAddressAndPort> inconclusiveDigests = new HashSet<>();
+
+    public RepairedDataTracker(RepairedDataVerifier verifier)
+    {
+        this.verifier = verifier;
+    }
+
+    public void recordDigest(InetAddressAndPort source, ByteBuffer digest, boolean isConclusive)
+    {
+        digests.put(digest, source);
+        if (!isConclusive)
+            inconclusiveDigests.add(source);
+    }
+
+    public void verify()
+    {
+        verifier.verify(this);
+    }
+
+    public String toString()
+    {
+        return MoreObjects.toStringHelper(this)
+                          .add("digests", hexDigests())
+                          .add("inconclusive", inconclusiveDigests).toString();
+    }
+
+    private Map<String, Collection<InetAddressAndPort>> hexDigests()
+    {
+        Map<String, Collection<InetAddressAndPort>> hexDigests = new HashMap<>();
+        digests.asMap().forEach((k, v) -> hexDigests.put(ByteBufferUtil.bytesToHex(k), v));
+        return hexDigests;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        RepairedDataTracker that = (RepairedDataTracker) o;
+        return Objects.equals(digests, that.digests) &&
+               Objects.equals(inconclusiveDigests, that.inconclusiveDigests);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(digests, inconclusiveDigests);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org