You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2018/09/05 18:13:33 UTC

[1/2] cassandra git commit: Detect inconsistencies in repaired data on the read path

Repository: cassandra
Updated Branches:
  refs/heads/trunk 744973e4c -> 5fbb938ad


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
new file mode 100644
index 0000000..816fe9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+public interface RepairedDataVerifier
+{
+    public void verify(RepairedDataTracker tracker);
+
+    static RepairedDataVerifier simple(ReadCommand command)
+    {
+        return new SimpleVerifier(command);
+    }
+
+    static class SimpleVerifier implements RepairedDataVerifier
+    {
+        private static final Logger logger = LoggerFactory.getLogger(SimpleVerifier.class);
+        private final ReadCommand command;
+
+        private static final String INCONSISTENCY_WARNING = "Detected mismatch between repaired datasets for table {}.{} during read of {}. {}";
+
+        SimpleVerifier(ReadCommand command)
+        {
+            this.command = command;
+        }
+
+        @Override
+        public void verify(RepairedDataTracker tracker)
+        {
+            Tracing.trace("Verifying repaired data tracker {}", tracker);
+
+            // some mismatch occurred between the repaired datasets on the replicas
+            if (tracker.digests.keySet().size() > 1)
+            {
+                // if any of the digests should be considered inconclusive, because there were
+                // pending repair sessions which had not yet been committed or unrepaired partition
+                // deletes which meant some sstables were skipped during reads, mark the inconsistency
+                // as confirmed
+                if (tracker.inconclusiveDigests.isEmpty())
+                {
+                    TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
+                    metrics.confirmedRepairedInconsistencies.mark();
+                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
+                                     INCONSISTENCY_WARNING, command.metadata().keyspace,
+                                     command.metadata().name, getCommandString(), tracker);
+                }
+                else if (DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
+                {
+                    TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
+                    metrics.unconfirmedRepairedInconsistencies.mark();
+                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
+                                     INCONSISTENCY_WARNING, command.metadata().keyspace,
+                                     command.metadata().name, getCommandString(), tracker);
+                }
+            }
+        }
+
+        private String getCommandString()
+        {
+            return command instanceof SinglePartitionReadCommand
+                   ? ((SinglePartitionReadCommand)command).partitionKey().toString()
+                   : ((PartitionRangeReadCommand)command).dataRange().keyRange().getString(command.metadata().partitionKeyType);
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 5020b95..8df7651 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -37,28 +39,36 @@ import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.partitions.FilteredPartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class ReadCommandTest
 {
@@ -68,6 +78,20 @@ public class ReadCommandTest
     private static final String CF3 = "Standard3";
     private static final String CF4 = "Standard4";
     private static final String CF5 = "Standard5";
+    private static final String CF6 = "Standard6";
+
+    private static final InetAddressAndPort REPAIR_COORDINATOR;
+    static {
+        try
+        {
+            REPAIR_COORDINATOR = InetAddressAndPort.getByName("10.0.0.1");
+        }
+        catch (UnknownHostException e)
+        {
+
+            throw new AssertionError(e);
+        }
+    }
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
@@ -116,6 +140,14 @@ public class ReadCommandTest
                      .addRegularColumn("e", AsciiType.instance)
                      .addRegularColumn("f", AsciiType.instance);
 
+        TableMetadata.Builder metadata6 =
+        TableMetadata.builder(KEYSPACE, CF6)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .caching(CachingParams.CACHE_EVERYTHING);
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
@@ -123,7 +155,10 @@ public class ReadCommandTest
                                     metadata2,
                                     metadata3,
                                     metadata4,
-                                    metadata5);
+                                    metadata5,
+                                    metadata6);
+
+        LocalSessionAccessor.startup();
     }
 
     @Test
@@ -540,4 +575,242 @@ public class ReadCommandTest
         assertEquals(1, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
     }
 
+    @Test
+    public void testSinglePartitionSliceRepairedDataTracking() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+        testRepairedDataTracking(cfs, readCommand);
+    }
+
+    @Test
+    public void testPartitionRangeRepairedDataTracking() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+        ReadCommand readCommand = Util.cmd(cfs).build();
+        testRepairedDataTracking(cfs, readCommand);
+    }
+
+    @Test
+    public void testSinglePartitionNamesRepairedDataTracking() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
+        testRepairedDataTracking(cfs, readCommand);
+    }
+
+    @Test
+    public void testSinglePartitionNamesSkipsOptimisationsIfTrackingRepairedData()
+    {
+        // when tracking, the optimizations of querying sstables in timestamp order and
+        // returning once all requested columns are not available so just assert that
+        // all sstables are read when performing such queries
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+            .clustering("dd")
+            .add("a", ByteBufferUtil.bytes("abcd"))
+            .build()
+            .apply();
+
+        cfs.forceBlockingFlush();
+
+        new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
+            .clustering("dd")
+            .add("a", ByteBufferUtil.bytes("wxyz"))
+            .build()
+            .apply();
+
+        cfs.forceBlockingFlush();
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        assertEquals(2, sstables.size());
+        Collections.sort(sstables, SSTableReader.maxTimestampComparator);
+
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").columns("a").build();
+
+        assertEquals(0, readCount(sstables.get(0)));
+        assertEquals(0, readCount(sstables.get(1)));
+        ReadCommand withTracking = readCommand.copy();
+        withTracking.trackRepairedStatus();
+        Util.getAll(withTracking);
+        assertEquals(1, readCount(sstables.get(0)));
+        assertEquals(1, readCount(sstables.get(1)));
+
+        // same command without tracking touches only the table with the higher timestamp
+        Util.getAll(readCommand.copy());
+        assertEquals(2, readCount(sstables.get(0)));
+        assertEquals(1, readCount(sstables.get(1)));
+    }
+
+    private long readCount(SSTableReader sstable)
+    {
+        return sstable.getReadMeter().count();
+    }
+
+    @Test
+    public void skipRowCacheIfTrackingRepairedData()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+                .clustering("cc")
+                .add("a", ByteBufferUtil.bytes("abcd"))
+                .build()
+                .apply();
+
+        cfs.forceBlockingFlush();
+
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+        assertTrue(cfs.isRowCacheEnabled());
+        // warm the cache
+        assertFalse(Util.getAll(readCommand).isEmpty());
+        long cacheHits = cfs.metric.rowCacheHit.getCount();
+
+        Util.getAll(readCommand);
+        assertTrue(cfs.metric.rowCacheHit.getCount() > cacheHits);
+        cacheHits = cfs.metric.rowCacheHit.getCount();
+
+        ReadCommand withRepairedInfo = readCommand.copy();
+        withRepairedInfo.trackRepairedStatus();
+        Util.getAll(withRepairedInfo);
+        assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
+    }
+
+    private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand readCommand) throws IOException
+    {
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+                .clustering("cc")
+                .add("a", ByteBufferUtil.bytes("abcd"))
+                .build()
+                .apply();
+
+        cfs.forceBlockingFlush();
+
+        new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
+                .clustering("dd")
+                .add("a", ByteBufferUtil.bytes("abcd"))
+                .build()
+                .apply();
+
+        cfs.forceBlockingFlush();
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+        assertEquals(2, sstables.size());
+        sstables.forEach(sstable -> assertFalse(sstable.isRepaired() || sstable.isPendingRepair()));
+        SSTableReader sstable1 = sstables.get(0);
+        SSTableReader sstable2 = sstables.get(1);
+
+        int numPartitions = 1;
+        int rowsPerPartition = 2;
+
+        // Capture all the digest versions as we mutate the table's repaired status. Each time
+        // we make a change, we expect a different digest.
+        Set<ByteBuffer> digests = new HashSet<>();
+        // first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive
+        ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true);
+        assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+        digests.add(digest);
+
+        // add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive
+        UUID session1 = UUIDGen.getTimeUUID();
+        mutateRepaired(cfs, sstable1, ActiveRepairService.UNREPAIRED_SSTABLE, session1);
+        digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+        assertEquals(1, digests.size());
+
+        // add a different pending session to table2, digest should remain the same and still consider it inconclusive
+        UUID session2 = UUIDGen.getTimeUUID();
+        mutateRepaired(cfs, sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, session2);
+        digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+        assertEquals(1, digests.size());
+
+        // mark one table repaired
+        mutateRepaired(cfs, sstable1, 111, null);
+        // this time, digest should not be empty, session2 still means that the result is inconclusive
+        digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+        assertEquals(2, digests.size());
+
+        // mark the second table repaired
+        mutateRepaired(cfs, sstable2, 222, null);
+        // digest should be updated again and as there are no longer any pending sessions, it should be considered conclusive
+        digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true));
+        assertEquals(3, digests.size());
+
+        // insert a partition tombstone into the memtable, then re-check the repaired info.
+        // This is to ensure that when the optimisations which skip reading from sstables
+        // when a newer partition tombstone has already been cause the digest to be marked
+        // as inconclusive.
+        // the exception to this case is for partition range reads, where we always read
+        // and generate digests for all sstables, so we only test this path for single partition reads
+        if (readCommand.isLimitedToOnePartition())
+        {
+            new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), ByteBufferUtil.bytes("key"))
+                                        .delete()
+                                        .build()).apply();
+            digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
+            assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+
+            // now flush so we have an unrepaired table with the deletion and repeat the check
+            cfs.forceBlockingFlush();
+            digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
+            assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+        }
+    }
+
+    private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession) throws IOException
+    {
+        sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false);
+        sstable.reloadSSTableMetadata();
+        if (pendingSession != null)
+        {
+            // setup a minimal repair session. This is necessary because we
+            // check for sessions which have exceeded timeout and been purged
+            Range<Token> range = new Range<>(cfs.metadata().partitioner.getMinimumToken(),
+                                             cfs.metadata().partitioner.getRandomToken());
+            ActiveRepairService.instance.registerParentRepairSession(pendingSession,
+                                                                     REPAIR_COORDINATOR,
+                                                                     Lists.newArrayList(cfs),
+                                                                     Sets.newHashSet(range),
+                                                                     true,
+                                                                     repairedAt,
+                                                                     true,
+                                                                     PreviewKind.NONE);
+
+            LocalSessionAccessor.prepareUnsafe(pendingSession, null, Sets.newHashSet(REPAIR_COORDINATOR));
+        }
+    }
+
+    private ByteBuffer performReadAndVerifyRepairedInfo(ReadCommand command,
+                                                        int expectedPartitions,
+                                                        int expectedRowsPerPartition,
+                                                        boolean expectConclusive)
+    {
+        // perform equivalent read command multiple times and assert that
+        // the repaired data info is always consistent. Return the digest
+        // so we can verify that it changes when the repaired status of
+        // the queried tables does.
+        Set<ByteBuffer> digests = new HashSet<>();
+        for (int i = 0; i < 10; i++)
+        {
+            ReadCommand withRepairedInfo = command.copy();
+            withRepairedInfo.trackRepairedStatus();
+
+            List<FilteredPartition> partitions = Util.getAll(withRepairedInfo);
+            assertEquals(expectedPartitions, partitions.size());
+            partitions.forEach(p -> assertEquals(expectedRowsPerPartition, p.rowCount()));
+
+            ByteBuffer digest = withRepairedInfo.getRepairedDataDigest();
+            digests.add(digest);
+            assertEquals(1, digests.size());
+            assertEquals(expectConclusive, withRepairedInfo.isRepairedDataDigestConclusive());
+        }
+        return digests.iterator().next();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
new file mode 100644
index 0000000..0c43661
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ReadCommandVerbHandlerTest
+{
+    private static final String TEST_NAME = "read_command_vh_test_";
+    private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+    private static final String TABLE = "table1";
+
+    private final Random random = new Random();
+    private ReadCommandVerbHandler handler;
+    private TableMetadata metadata;
+
+    @BeforeClass
+    public static void init()
+    {
+        SchemaLoader.loadSchema();
+        SchemaLoader.schemaDefinition(TEST_NAME);
+    }
+
+    @Before
+    public void setup()
+    {
+        MessagingService.instance().clearMessageSinks();
+        MessagingService.instance().addMessageSink(new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
+            {
+                return false;
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return false;
+            }
+        });
+
+        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+        handler = new ReadCommandVerbHandler();
+    }
+
+    @Test
+    public void setRepairedDataTrackingFlagIfHeaderPresent()
+    {
+        ReadCommand command = command(key());
+        assertFalse(command.isTrackingRepairedStatus());
+        Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACK_REPAIRED_DATA,
+                                                            MessagingService.ONE_BYTE);
+        handler.doVerb(MessageIn.create(peer(),
+                                        command,
+                                        params,
+                                        MessagingService.Verb.READ,
+                                        MessagingService.current_version),
+                       messageId());
+        assertTrue(command.isTrackingRepairedStatus());
+    }
+
+    @Test
+    public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent()
+    {
+        ReadCommand command = command(key());
+        assertFalse(command.isTrackingRepairedStatus());
+        Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACE_SESSION,
+                                                            UUID.randomUUID());
+        handler.doVerb(MessageIn.create(peer(),
+                                        command,
+                                        params,
+                                        MessagingService.Verb.READ,
+                                        MessagingService.current_version),
+                       messageId());
+        assertFalse(command.isTrackingRepairedStatus());
+    }
+
+    @Test
+    public void dontSetRepairedDataTrackingFlagIfHeadersEmpty()
+    {
+        ReadCommand command = command(key());
+        assertFalse(command.isTrackingRepairedStatus());
+        handler.doVerb(MessageIn.create(peer(),
+                                        command,
+                                        ImmutableMap.of(),
+                                        MessagingService.Verb.READ,
+                                        MessagingService.current_version),
+                       messageId());
+        assertFalse(command.isTrackingRepairedStatus());
+    }
+
+    private int key()
+    {
+        return random.nextInt();
+    }
+
+    private int messageId()
+    {
+        return random.nextInt();
+    }
+
+    private InetAddressAndPort peer()
+    {
+        try
+        {
+            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, 9});
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private ReadCommand command(int key)
+    {
+        return new SinglePartitionReadCommand(false,
+              0,
+              false,
+              metadata,
+              FBUtilities.nowInSeconds(),
+              ColumnFilter.all(metadata),
+              RowFilter.NONE,
+              DataLimits.NONE,
+              metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+              new ClusteringIndexSliceFilter(Slices.ALL, false),
+              null);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
new file mode 100644
index 0000000..6e1a804
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ReadResponseTest
+{
+
+    private final Random random = new Random();
+    private TableMetadata metadata;
+
+    @Before
+    public void setup()
+    {
+        metadata = TableMetadata.builder("ks", "t1")
+                                .addPartitionKeyColumn("p", Int32Type.instance)
+                                .addRegularColumn("v", Int32Type.instance)
+                                .partitioner(Murmur3Partitioner.instance)
+                                .build();
+    }
+
+    @Test
+    public void fromCommandWithConclusiveRepairedDigest()
+    {
+        ByteBuffer digest = digest();
+        ReadCommand command = command(key(), metadata, digest, true);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isRepairedDigestConclusive());
+        assertEquals(digest, response.repairedDataDigest());
+        verifySerDe(response);
+    }
+
+    @Test
+    public void fromCommandWithInconclusiveRepairedDigest()
+    {
+        ByteBuffer digest = digest();
+        ReadCommand command = command(key(), metadata, digest, false);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertFalse(response.isRepairedDigestConclusive());
+        assertEquals(digest, response.repairedDataDigest());
+        verifySerDe(response);
+    }
+
+    @Test
+    public void fromCommandWithConclusiveEmptyRepairedDigest()
+    {
+        ReadCommand command = command(key(), metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isRepairedDigestConclusive());
+        assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest());
+        verifySerDe(response);
+    }
+
+    @Test
+    public void fromCommandWithInconclusiveEmptyRepairedDigest()
+    {
+        ReadCommand command = command(key(), metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertFalse(response.isRepairedDigestConclusive());
+        assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest());
+        verifySerDe(response);
+    }
+
+    /*
+     * Digest responses should never include repaired data tracking as we only request
+     * it in read repair or for range queries
+     */
+    @Test (expected = UnsupportedOperationException.class)
+    public void digestResponseErrorsIfRepairedDataDigestRequested()
+    {
+        ReadCommand command = digestCommand(key(), metadata);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isDigestResponse());
+        assertFalse(response.mayIncludeRepairedDigest());
+        response.repairedDataDigest();
+    }
+
+    @Test (expected = UnsupportedOperationException.class)
+    public void digestResponseErrorsIfIsConclusiveRequested()
+    {
+        ReadCommand command = digestCommand(key(), metadata);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isDigestResponse());
+        assertFalse(response.mayIncludeRepairedDigest());
+        response.isRepairedDigestConclusive();
+    }
+
+    @Test (expected = UnsupportedOperationException.class)
+    public void digestResponseErrorsIfIteratorRequested()
+    {
+        ReadCommand command = digestCommand(key(), metadata);
+        ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+        assertTrue(response.isDigestResponse());
+        assertFalse(response.mayIncludeRepairedDigest());
+        response.makeIterator(command);
+    }
+
+    @Test
+    public void makeDigestDoesntConsiderRepairedDataInfo()
+    {
+        // It shouldn't be possible to get false positive DigestMismatchExceptions based
+        // on differing repaired data tracking info because it isn't requested on initial
+        // requests, only following a digest mismatch. Having a test doesn't hurt though
+        int key = key();
+        ByteBuffer digest1 = digest();
+        ReadCommand command1 = command(key, metadata, digest1, true);
+        ReadResponse response1 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata));
+
+        ByteBuffer digest2 = digest();
+        ReadCommand command2 = command(key, metadata, digest2, false);
+        ReadResponse response2 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata));
+
+        assertEquals(response1.digest(command1), response2.digest(command2));
+    }
+
+    private void verifySerDe(ReadResponse response) {
+        // check that roundtripping through ReadResponse.serializer behaves as expected.
+        // ReadResponses from pre-4.0 nodes will never contain repaired data digest
+        // or pending session info, but we run all messages through both pre/post 4.0
+        // serde to check that the defaults are correctly applied
+        roundTripSerialization(response, MessagingService.current_version);
+        roundTripSerialization(response, MessagingService.VERSION_30);
+
+    }
+
+    private void roundTripSerialization(ReadResponse response, int version)
+    {
+        try
+        {
+            DataOutputBuffer out = new DataOutputBuffer();
+            ReadResponse.serializer.serialize(response, out, version);
+
+            DataInputBuffer in = new DataInputBuffer(out.buffer(), false);
+            ReadResponse deser = ReadResponse.serializer.deserialize(in, version);
+            if (version < MessagingService.VERSION_40)
+            {
+                assertFalse(deser.mayIncludeRepairedDigest());
+                // even though that means they should never be used, verify that the default values are present
+                assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, deser.repairedDataDigest());
+                assertTrue(deser.isRepairedDigestConclusive());
+            }
+            else
+            {
+                assertTrue(deser.mayIncludeRepairedDigest());
+                assertEquals(response.repairedDataDigest(), deser.repairedDataDigest());
+                assertEquals(response.isRepairedDigestConclusive(), deser.isRepairedDigestConclusive());
+            }
+        }
+        catch (IOException e)
+        {
+            fail("Caught unexpected IOException during SerDe: " + e.getMessage());
+        }
+    }
+
+
+    private int key()
+    {
+        return random.nextInt();
+    }
+
+    private ByteBuffer digest()
+    {
+        byte[] bytes = new byte[4];
+        random.nextBytes(bytes);
+        return ByteBuffer.wrap(bytes);
+    }
+
+    private ReadCommand digestCommand(int key, TableMetadata metadata)
+    {
+        return new StubReadCommand(key, metadata, true, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+    }
+
+    private ReadCommand command(int key, TableMetadata metadata, ByteBuffer repairedDigest, boolean conclusive)
+    {
+        return new StubReadCommand(key, metadata, false, repairedDigest, conclusive);
+    }
+
+    private static class StubReadCommand extends SinglePartitionReadCommand
+    {
+
+        private final ByteBuffer repairedDigest;
+        private final boolean conclusive;
+
+        StubReadCommand(int key, TableMetadata metadata,
+                        boolean isDigest,
+                        final ByteBuffer repairedDigest,
+                        final boolean conclusive)
+        {
+            super(isDigest,
+                  0,
+                  false,
+                  metadata,
+                  FBUtilities.nowInSeconds(),
+                  ColumnFilter.all(metadata),
+                  RowFilter.NONE,
+                  DataLimits.NONE,
+                  metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+                  null,
+                  null);
+            this.repairedDigest = repairedDigest;
+            this.conclusive = conclusive;
+        }
+
+        @Override
+        public ByteBuffer getRepairedDataDigest()
+        {
+            return repairedDigest;
+        }
+
+        @Override
+        public boolean isRepairedDataDigestConclusive()
+        {
+            return conclusive;
+        }
+
+        public UnfilteredPartitionIterator executeLocally(ReadExecutionController controller)
+        {
+            return EmptyIterators.unfilteredPartition(this.metadata());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
index c6f2232..582aff8 100644
--- a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.reads;
 
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.SortedSet;
@@ -214,15 +215,38 @@ public abstract class AbstractReadResponseTest
         return dk(Integer.toString(k));
     }
 
-    static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean digest)
+
+    static MessageIn<ReadResponse> response(ReadCommand command,
+                                            InetAddressAndPort from,
+                                            UnfilteredPartitionIterator data,
+                                            boolean isDigestResponse,
+                                            int fromVersion,
+                                            ByteBuffer repairedDataDigest,
+                                            boolean hasPendingRepair)
+    {
+        ReadResponse response = isDigestResponse
+                                ? ReadResponse.createDigestResponse(data, command)
+                                : ReadResponse.createRemoteDataResponse(data, repairedDataDigest, hasPendingRepair, command, fromVersion);
+        return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, fromVersion);
+    }
+
+    static MessageIn<ReadResponse> response(InetAddressAndPort from,
+                                            UnfilteredPartitionIterator partitionIterator,
+                                            ByteBuffer repairedDigest,
+                                            boolean hasPendingRepair,
+                                            ReadCommand cmd)
+    {
+        return response(cmd, from, partitionIterator, false, MessagingService.current_version, repairedDigest, hasPendingRepair);
+    }
+
+    static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean isDigestResponse)
     {
-        ReadResponse response = digest ? ReadResponse.createDigestResponse(data, command) : ReadResponse.createDataResponse(data, command);
-        return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, MessagingService.current_version);
+        return response(command, from, data, false, MessagingService.current_version, ByteBufferUtil.EMPTY_BYTE_BUFFER, isDigestResponse);
     }
 
     static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data)
     {
-        return response(command, from, data, false);
+        return response(command, from, data, false, MessagingService.current_version, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
     }
 
     public RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index abec25d..968ef16 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -55,8 +55,13 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
+import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 import org.apache.cassandra.service.reads.repair.TestableReadRepair;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -81,6 +86,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     public void setup()
     {
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
+        command.trackRepairedStatus();
         readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
     }
 
@@ -586,6 +592,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     public void testRepairRangeTombstoneWithPartitionDeletion()
     {
         EndpointsForRange replicas = makeReplicas(2);
+
         DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
         InetAddressAndPort peer1 = replicas.get(0).endpoint();
         InetAddressAndPort peer2 = replicas.get(1).endpoint();
@@ -898,6 +905,347 @@ public class DataResolverTest extends AbstractReadResponseTest
         Assert.assertNull(readRepair.sent.get(peer2));
     }
 
+    /** Tests for repaired data tracking */
+
+    @Test
+    public void trackMatchingEmptyDigestsWithAllConclusive()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingEmptyDigestsWithSomeConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingEmptyDigestsWithNoneConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest1, false);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingDigestsWithAllConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingDigestsWithSomeConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest1, false);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingDigestsWithNoneConclusive()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest1, false);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMatchingRepairedDigestsWithDifferentData()
+    {
+        // As far as repaired data tracking is concerned, the actual data in the response is not relevant
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMismatchingRepairedDigestsWithAllConclusive()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest2, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMismatchingRepairedDigestsWithSomeConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest2, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMismatchingRepairedDigestsWithNoneConclusive()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, false);
+        verifier.expectDigest(peer2, digest2, false);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, false, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void trackMismatchingRepairedDigestsWithDifferentData()
+    {
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        EndpointsForRange replicas = makeReplicas(2);
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+        verifier.expectDigest(peer2, digest2, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void noVerificationForSingletonResponse()
+    {
+        // for CL <= 1 a coordinator shouldn't request repaired data tracking but we
+        // can easily assert that the verification isn't attempted even if it did
+        EndpointsForRange replicas = makeReplicas(2);
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        verifier.expectDigest(peer1, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+
+        resolveAndConsume(resolver);
+        assertFalse(verifier.verified);
+    }
+
+    @Test
+    public void responsesFromOlderVersionsAreNotTracked()
+    {
+        // In a mixed version cluster, responses from a replicas running older versions won't include
+        // tracking info, so the digest and pending session status are defaulted. To make sure these
+        // default values don't result in false positives we make sure not to consider them when
+        // processing in DataResolver
+        EndpointsForRange replicas = makeReplicas(2);
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        verifier.expectDigest(peer1, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+        // peer2 is advertising an older version, so when we deserialize its response there are two things to note:
+        // i) the actual serialized response cannot contain any tracking info so deserialization will use defaults of
+        //    an empty digest and pending sessions = false
+        // ii) under normal circumstances, this would cause a mismatch with peer1, but because of the older version,
+        //     here it will not
+        resolver.preprocess(response(command, peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)),
+                                     false, MessagingService.VERSION_30,
+                                     ByteBufferUtil.EMPTY_BYTE_BUFFER, false));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    @Test
+    public void responsesFromTransientReplicasAreNotTracked()
+    {
+        EndpointsForRange replicas = makeReplicas(2);
+        EndpointsForRange.Mutable mutable = replicas.newMutable(2);
+        mutable.add(replicas.get(0));
+        mutable.add(Replica.transientReplica(replicas.get(1).endpoint(), replicas.range()));
+        replicas = mutable.asSnapshot();
+
+        TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+        ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+        ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+        InetAddressAndPort peer1 = replicas.get(0).endpoint();
+        InetAddressAndPort peer2 = replicas.get(1).endpoint();
+        verifier.expectDigest(peer1, digest1, true);
+
+        DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(),  verifier);
+
+        resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+        resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest2, true, command));
+
+        resolveAndConsume(resolver);
+        assertTrue(verifier.verified);
+    }
+
+    private static class TestRepairedDataVerifier implements RepairedDataVerifier
+    {
+        private final RepairedDataTracker expected = new RepairedDataTracker(null);
+        private boolean verified = false;
+
+        private void expectDigest(InetAddressAndPort from, ByteBuffer digest, boolean conclusive)
+        {
+            expected.recordDigest(from, digest, conclusive);
+        }
+
+        @Override
+        public void verify(RepairedDataTracker tracker)
+        {
+            verified = expected.equals(tracker);
+        }
+    }
+
+    private DataResolver resolverWithVerifier(final ReadCommand command,
+                                              final ReplicaLayout.ForRange plan,
+                                              final ReadRepair readRepair,
+                                              final long queryStartNanoTime,
+                                              final RepairedDataVerifier verifier)
+    {
+        class TestableDataResolver extends DataResolver
+        {
+
+            public TestableDataResolver(ReadCommand command, ReplicaLayout.ForRange plan, ReadRepair readRepair, long queryStartNanoTime)
+            {
+                super(command, plan, readRepair, queryStartNanoTime);
+            }
+
+            protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
+            {
+                return verifier;
+            }
+        }
+
+        return new TestableDataResolver(command, plan, readRepair, queryStartNanoTime);
+    }
+
     private void assertRepairContainsDeletions(Mutation mutation,
                                                DeletionTime deletionTime,
                                                RangeTombstone...rangeTombstones)
@@ -954,4 +1302,19 @@ public class DataResolverTest extends AbstractReadResponseTest
     {
         return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas);
     }
+
+    private static void resolveAndConsume(DataResolver resolver)
+    {
+        try (PartitionIterator iterator = resolver.resolve())
+        {
+            while (iterator.hasNext())
+            {
+                try (RowIterator partition = iterator.next())
+                {
+                    while (partition.hasNext())
+                        partition.next();
+                }
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index a574d02..a4b7615 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -93,10 +93,10 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         ReadCallback readCallback = null;
 
         @Override
-        void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback)
         {
             assert readCallback == null || readCallback == callback;
-            readCommandRecipients.add(to);
+            readCommandRecipients.add(to.endpoint());
             readCallback = callback;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index c345ee6..a5efe27 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -141,7 +141,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
             Assert.assertNotNull(e.toMap());
         }
 
-        void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback)
         {
             assert readCallback == null || readCallback == callback;
             readCallback = callback;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index 9bb7eed..bee5ddd 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.service.reads.ReadCallback;
 
@@ -46,10 +47,10 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
         ReadCallback readCallback = null;
 
         @Override
-        void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback)
         {
             assert readCallback == null || readCallback == callback;
-            readCommandRecipients.add(to);
+            readCommandRecipients.add(to.endpoint());
             readCallback = callback;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
new file mode 100644
index 0000000..c916d13
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.net.UnknownHostException;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class RepairedDataVerifierTest
+{
+    private static final String TEST_NAME = "read_command_vh_test_";
+    private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+    private static final String TABLE = "table1";
+
+    private final Random random = new Random();
+    private TableMetadata metadata;
+    private TableMetrics metrics;
+
+    // counter to generate the last byte of peer addresses
+    private int addressSuffix = 10;
+
+    @BeforeClass
+    public static void init()
+    {
+        SchemaLoader.loadSchema();
+        SchemaLoader.schemaDefinition(TEST_NAME);
+        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+    }
+
+    @Before
+    public void setup()
+    {
+        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+        metrics = ColumnFamilyStore.metricsFor(metadata.id);
+    }
+
+    @Test
+    public void repairedDataMismatchWithSomeConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount + 1 , unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMismatchWithNoneConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount + 1 , unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMismatchWithAllConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
+
+        tracker.verify();
+        assertEquals(confirmedCount + 1, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMatchesWithAllConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMatchesWithSomeConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void repairedDataMatchesWithNoneConclusive()
+    {
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+        tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void allEmptyDigestWithAllConclusive()
+    {
+        // if a read didn't touch any repaired sstables, digests will be empty
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void allEmptyDigestsWithSomeConclusive()
+    {
+        // if a read didn't touch any repaired sstables, digests will be empty
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void allEmptyDigestsWithNoneConclusive()
+    {
+        // if a read didn't touch any repaired sstables, digests will be empty
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        InetAddressAndPort peer1 = peer();
+        InetAddressAndPort peer2 = peer();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+        tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    @Test
+    public void noTrackingDataRecorded()
+    {
+        // if a read didn't land on any replicas which support repaired data tracking, nothing will be recorded
+        long confirmedCount =  confirmedCount();
+        long unconfirmedCount =  unconfirmedCount();
+        RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+        RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+        tracker.verify();
+        assertEquals(confirmedCount, confirmedCount());
+        assertEquals(unconfirmedCount, unconfirmedCount());
+    }
+
+    private long confirmedCount()
+    {
+        return metrics.confirmedRepairedInconsistencies.table.getCount();
+    }
+
+    private long unconfirmedCount()
+    {
+        return metrics.unconfirmedRepairedInconsistencies.table.getCount();
+    }
+
+    private InetAddressAndPort peer()
+    {
+        try
+        {
+            return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private int key()
+    {
+        return random.nextInt();
+    }
+
+    private ReadCommand command(int key)
+    {
+        return new StubReadCommand(key, metadata, false);
+    }
+
+    private static class StubReadCommand extends SinglePartitionReadCommand
+    {
+        StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
+        {
+            super(isDigest,
+                  0,
+                  false,
+                  metadata,
+                  FBUtilities.nowInSeconds(),
+                  ColumnFilter.all(metadata),
+                  RowFilter.NONE,
+                  DataLimits.NONE,
+                  metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+                  null,
+                  null);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/2] cassandra git commit: Detect inconsistencies in repaired data on the read path

Posted by sa...@apache.org.
Detect inconsistencies in repaired data on the read path

Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Jordan West for CASSANDRA-14145


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5fbb938a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5fbb938a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5fbb938a

Branch: refs/heads/trunk
Commit: 5fbb938adaafd91e7bea1672f09a03c7ac5b9b9d
Parents: 744973e
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed May 9 18:57:30 2018 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 5 19:07:12 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  18 +
 .../org/apache/cassandra/config/Config.java     |  20 +
 .../cassandra/config/DatabaseDescriptor.java    |  29 ++
 .../cassandra/db/PartitionRangeReadCommand.java |  33 +-
 .../org/apache/cassandra/db/ReadCommand.java    | 346 +++++++++++++++++-
 .../cassandra/db/ReadCommandVerbHandler.java    |   4 +
 .../org/apache/cassandra/db/ReadResponse.java   | 123 ++++++-
 .../db/SinglePartitionReadCommand.java          |  60 +--
 .../db/filter/ClusteringIndexNamesFilter.java   |  14 +
 .../db/rows/UnfilteredRowIterators.java         |   1 +
 .../cassandra/metrics/KeyspaceMetrics.java      |  22 ++
 .../apache/cassandra/metrics/TableMetrics.java  |  50 +++
 .../org/apache/cassandra/net/ParameterType.java |   3 +-
 .../repair/consistent/LocalSessions.java        |  17 +
 .../apache/cassandra/service/StorageProxy.java  |  76 +++-
 .../cassandra/service/StorageProxyMBean.java    |  15 +
 .../cassandra/service/reads/DataResolver.java   |  44 ++-
 .../service/reads/ShortReadProtection.java      |   1 +
 .../reads/repair/AbstractReadRepair.java        |  21 +-
 .../reads/repair/RepairedDataTracker.java       |  87 +++++
 .../reads/repair/RepairedDataVerifier.java      |  95 +++++
 .../apache/cassandra/db/ReadCommandTest.java    | 287 ++++++++++++++-
 .../db/ReadCommandVerbHandlerTest.java          | 171 +++++++++
 .../apache/cassandra/db/ReadResponseTest.java   | 261 +++++++++++++
 .../service/reads/AbstractReadResponseTest.java |  32 +-
 .../service/reads/DataResolverTest.java         | 363 +++++++++++++++++++
 .../reads/repair/BlockingReadRepairTest.java    |   4 +-
 .../DiagEventsBlockingReadRepairTest.java       |   2 +-
 .../reads/repair/ReadOnlyReadRepairTest.java    |   5 +-
 .../reads/repair/RepairedDataVerifierTest.java  | 291 +++++++++++++++
 31 files changed, 2399 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 301f97f..5cfc7ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
  * Add checksumming to the native protocol (CASSANDRA-13304)
  * Make AuthCache more easily extendable (CASSANDRA-14662)
  * Extend RolesCache to include detailed role info (CASSANDRA-14497)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 995a520..190ce77 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1238,3 +1238,21 @@ diagnostic_events_enabled: false
 # Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
 # legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
 #native_transport_flush_in_batches_legacy: false
+
+# Enable tracking of repaired state of data during reads and comparison between replicas
+# Mismatches between the repaired sets of replicas can be characterized as either confirmed
+# or unconfirmed. In this context, unconfirmed indicates that the presence of pending repair
+# sessions, unrepaired partition tombstones, or some other condition means that the disparity
+# cannot be considered conclusive. Confirmed mismatches should be a trigger for investigation
+# as they may be indicative of corruption or data loss.
+# There are separate flags for range vs partition reads as single partition reads are only tracked
+# when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+# enabled for range reads, all range reads will include repaired data tracking. As this adds
+# some overhead, operators may wish to disable it whilst still enabling it for partition reads
+repaired_data_tracking_for_range_reads_enabled: false
+repaired_data_tracking_for_partition_reads_enabled: false
+# If false, only confirmed mismatches will be reported. If true, a separate metric for unconfirmed
+# mismatches will also be recorded. This is to avoid potential signal:noise issues are unconfirmed
+# mismatches are less actionable than confirmed ones.
+report_unconfirmed_repaired_data_mismatches: false
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b04f9ec..782815e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -395,6 +395,26 @@ public class Config
     public volatile boolean diagnostic_events_enabled = false;
 
     /**
+     * flags for enabling tracking repaired state of data during reads
+     * separate flags for range & single partition reads as single partition reads are only tracked
+     * when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+     * enabled for range reads, all such reads will include repaired data tracking. As this adds
+     * some overhead, operators may wish to disable it whilst still enabling it for partition reads
+     */
+    public volatile boolean repaired_data_tracking_for_range_reads_enabled = false;
+    public volatile boolean repaired_data_tracking_for_partition_reads_enabled = false;
+    /* If true, unconfirmed mismatches (those which cannot be considered conclusive proof of out of
+     * sync repaired data due to the presence of pending repair sessions, or unrepaired partition
+     * deletes) will increment a metric, distinct from confirmed mismatches. If false, unconfirmed
+     * mismatches are simply ignored by the coordinator.
+     * This is purely to allow operators to avoid potential signal:noise issues as these types of
+     * mismatches are considerably less actionable than their confirmed counterparts. Setting this
+     * to true only disables the incrementing of the counters when an unconfirmed mismatch is found
+     * and has no other effect on the collection or processing of the repaired data.
+     */
+    public volatile boolean report_unconfirmed_repaired_data_mismatches = false;
+
+    /**
      * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
      */
     @Deprecated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ddea8f4..2ad9b18 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2672,4 +2672,33 @@ public class DatabaseDescriptor
         conf.corrupted_tombstone_strategy = strategy;
     }
 
+    public static boolean getRepairedDataTrackingForRangeReadsEnabled()
+    {
+        return conf.repaired_data_tracking_for_range_reads_enabled;
+    }
+
+    public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled)
+    {
+        conf.repaired_data_tracking_for_range_reads_enabled = enabled;
+    }
+
+    public static boolean getRepairedDataTrackingForPartitionReadsEnabled()
+    {
+        return conf.repaired_data_tracking_for_partition_reads_enabled;
+    }
+
+    public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled)
+    {
+        conf.repaired_data_tracking_for_partition_reads_enabled = enabled;
+    }
+
+    public static boolean reportUnconfirmedRepairedDataMismatches()
+    {
+        return conf.report_unconfirmed_repaired_data_mismatches;
+    }
+
+    public static void reportUnconfirmedRepairedDataMismatches(boolean enabled)
+    {
+        conf.report_unconfirmed_repaired_data_mismatches = enabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 7eab016..79db18a 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -18,12 +18,10 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.*;
@@ -46,7 +44,6 @@ import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * A read command that selects a (part of a) range of partitions.
@@ -56,7 +53,6 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
     protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 
     private final DataRange dataRange;
-    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 
     private PartitionRangeReadCommand(boolean isDigest,
                                      int digestVersion,
@@ -257,8 +253,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));
 
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.
-        final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
-
+        InputCollector<UnfilteredPartitionIterator> inputCollector = iteratorsForRange(view);
         try
         {
             for (Memtable memtable : view.memtables)
@@ -266,7 +261,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                 @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
                 Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange());
                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
-                iterators.add(iter);
+                inputCollector.addMemtableIterator(iter);
             }
 
             SSTableReadsListener readCountUpdater = newReadCountUpdater();
@@ -274,25 +269,27 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
             {
                 @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
                 UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater);
-                iterators.add(iter);
+                inputCollector.addSSTableIterator(sstable, iter);
+
                 if (!sstable.isRepaired())
                     oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
             }
             // iterators can be empty for offline tools
-            return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata())
-                                       : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators), cfs);
+            if (inputCollector.isEmpty())
+                return EmptyIterators.unfilteredPartition(metadata());
+
+            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators()), cfs);
         }
         catch (RuntimeException | Error e)
         {
             try
             {
-                FBUtilities.closeAll(iterators);
+                inputCollector.close();
             }
-            catch (Exception suppressed)
+            catch (Exception e1)
             {
-                e.addSuppressed(suppressed);
+                e.addSuppressed(e1);
             }
-
             throw e;
         }
     }
@@ -313,12 +310,6 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                 };
     }
 
-    @Override
-    protected int oldestUnrepairedTombstone()
-    {
-        return oldestUnrepairedTombstone;
-    }
-
     private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
     {
         class CacheFilter extends Transformation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 736e3a3..e146b8a 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,10 +18,17 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.BiFunction;
 import java.util.function.LongPredicate;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +46,7 @@ import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
 import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
@@ -48,9 +56,12 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingUtils;
 
 /**
  * General interface for storage-engine read commands (common to both range and
@@ -71,6 +82,23 @@ public abstract class ReadCommand extends AbstractReadQuery
     // if a digest query, the version for which the digest is expected. Ignored if not a digest.
     private int digestVersion;
 
+    // for data queries, coordinators may request information on the repaired data used in constructing the response
+    private boolean trackRepairedStatus = false;
+    // tracker for repaired data, initialized to singelton null object
+    private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
+    {
+        void trackPartitionKey(DecoratedKey key){}
+        void trackDeletion(DeletionTime deletion){}
+        void trackRangeTombstoneMarker(RangeTombstoneMarker marker){}
+        void trackRow(Row row){}
+        boolean isConclusive(){ return true; }
+        ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
+    };
+
+    private RepairedDataInfo repairedDataInfo = NULL_REPAIRED_DATA_INFO;
+
+    int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
     @Nullable
     private final IndexMetadata index;
 
@@ -187,6 +215,68 @@ public abstract class ReadCommand extends AbstractReadQuery
     }
 
     /**
+     * Activates repaired data tracking for this command.
+     *
+     * When active, a digest will be created from data read from repaired SSTables. The digests
+     * from each replica can then be compared on the coordinator to detect any divergence in their
+     * repaired datasets. In this context, an sstable is considered repaired if it is marked
+     * repaired or has a pending repair session which has been committed.
+     * In addition to the digest, a set of ids for any pending but as yet uncommitted repair sessions
+     * is recorded and returned to the coordinator. This is to help reduce false positives caused
+     * by compaction lagging which can leave sstables from committed sessions in the pending state
+     * for a time.
+     */
+    public void trackRepairedStatus()
+    {
+        trackRepairedStatus = true;
+    }
+
+    /**
+     * Whether or not repaired status of any data read is being tracked or not
+     *
+     * @return Whether repaired status tracking is active for this command
+     */
+    public boolean isTrackingRepairedStatus()
+    {
+        return trackRepairedStatus;
+    }
+
+    /**
+     * Returns a digest of the repaired data read in the execution of this command.
+     *
+     * If either repaired status tracking is not active or the command has not yet been
+     * executed, then this digest will be an empty buffer.
+     * Otherwise, it will contain a digest* of the repaired data read, or empty buffer
+     * if no repaired data was read.
+     * @return digest of the repaired data read in the execution of the command
+     */
+    public ByteBuffer getRepairedDataDigest()
+    {
+        return repairedDataInfo.getDigest();
+    }
+
+    /**
+     * Returns a boolean indicating whether any relevant sstables were skipped during the read
+     * that produced the repaired data digest.
+     *
+     * If true, then no pending repair sessions or partition deletes have influenced the extent
+     * of the repaired sstables that went into generating the digest.
+     * This indicates whether or not the digest can reliably be used to infer consistency
+     * issues between the repaired sets across replicas.
+     *
+     * If either repaired status tracking is not active or the command has not yet been
+     * executed, then this will always return true.
+     *
+     * @return boolean to indicate confidence in the dwhether or not the digest of the repaired data can be
+     * reliably be used to infer inconsistency issues between the repaired sets across
+     * replicas.
+     */
+    public boolean isRepairedDataDigestConclusive()
+    {
+        return repairedDataInfo.isConclusive();
+    }
+
+    /**
      * Index (metadata) chosen for this query. Can be null.
      *
      * @return index (metadata) chosen for this query
@@ -225,7 +315,11 @@ public abstract class ReadCommand extends AbstractReadQuery
 
     protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
 
-    protected abstract int oldestUnrepairedTombstone();
+    protected int oldestUnrepairedTombstone()
+    {
+        return oldestUnrepairedTombstone;
+    }
+
 
     @SuppressWarnings("resource")
     public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
@@ -305,6 +399,9 @@ public abstract class ReadCommand extends AbstractReadQuery
             Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
         }
 
+        if (isTrackingRepairedStatus())
+            repairedDataInfo = new RepairedDataInfo();
+
         UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
 
         try
@@ -569,6 +666,253 @@ public abstract class ReadCommand extends AbstractReadQuery
         return toCQLString();
     }
 
+    private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
+                                                               final RepairedDataInfo repairedDataInfo)
+    {
+        class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
+        {
+            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+            {
+                return withRepairedDataInfo(partition, repairedDataInfo);
+            }
+        }
+
+        return Transformation.apply(iterator, new WithRepairedDataTracking());
+    }
+
+    private static UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator,
+                                                              final RepairedDataInfo repairedDataInfo)
+    {
+        class WithTracking extends Transformation
+        {
+            protected DecoratedKey applyToPartitionKey(DecoratedKey key)
+            {
+                repairedDataInfo.trackPartitionKey(key);
+                return key;
+            }
+
+            protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+            {
+                repairedDataInfo.trackDeletion(deletionTime);
+                return deletionTime;
+            }
+
+            protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+            {
+                repairedDataInfo.trackRangeTombstoneMarker(marker);
+                return marker;
+            }
+
+            protected Row applyToStatic(Row row)
+            {
+                repairedDataInfo.trackRow(row);
+                return row;
+            }
+
+            protected Row applyToRow(Row row)
+            {
+                repairedDataInfo.trackRow(row);
+                return row;
+            }
+        }
+
+        return Transformation.apply(iterator, new WithTracking());
+    }
+
+    private static class RepairedDataInfo
+    {
+        private Hasher hasher;
+        private boolean isConclusive = true;
+
+        ByteBuffer getDigest()
+        {
+            return hasher == null
+                   ? ByteBufferUtil.EMPTY_BYTE_BUFFER
+                   : ByteBuffer.wrap(getHasher().hash().asBytes());
+        }
+
+        boolean isConclusive()
+        {
+            return isConclusive;
+        }
+
+        void markInconclusive()
+        {
+            isConclusive = false;
+        }
+
+        void trackPartitionKey(DecoratedKey key)
+        {
+            HashingUtils.updateBytes(getHasher(), key.getKey().duplicate());
+        }
+
+        void trackDeletion(DeletionTime deletion)
+        {
+            deletion.digest(getHasher());
+        }
+
+        void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
+        {
+            marker.digest(getHasher());
+        }
+
+        void trackRow(Row row)
+        {
+            row.digest(getHasher());
+        }
+
+        private Hasher getHasher()
+        {
+            if (hasher == null)
+                hasher = Hashing.crc32c().newHasher();
+
+            return hasher;
+        }
+    }
+
+    @SuppressWarnings("resource") // resultant iterators are closed by their callers
+    InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view)
+    {
+        BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
+            (unfilteredRowIterators, repairedDataInfo) ->
+                withRepairedDataInfo(UnfilteredRowIterators.merge(unfilteredRowIterators), repairedDataInfo);
+
+        return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+    }
+
+    @SuppressWarnings("resource") // resultant iterators are closed by their callers
+    InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view)
+    {
+        BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
+            (unfilteredPartitionIterators, repairedDataInfo) ->
+                withRepairedDataInfo(UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, UnfilteredPartitionIterators.MergeListener.NOOP), repairedDataInfo);
+
+        return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+    }
+
+    /**
+     * Handles the collation of unfiltered row or partition iterators that comprise the
+     * input for a query. Separates them according to repaired status and of repaired
+     * status is being tracked, handles the merge and wrapping in a digest generator of
+     * the repaired iterators.
+     *
+     * Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks
+     * as this prematurely closes the underlying iterators
+     */
+    static class InputCollector<T extends AutoCloseable>
+    {
+        final RepairedDataInfo repairedDataInfo;
+        private final boolean isTrackingRepairedStatus;
+        Set<SSTableReader> repairedSSTables;
+        BiFunction<List<T>, RepairedDataInfo, T> repairedMerger;
+        List<T> repairedIters;
+        List<T> unrepairedIters;
+
+        InputCollector(ColumnFamilyStore.ViewFragment view,
+                       RepairedDataInfo repairedDataInfo,
+                       BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
+                       boolean isTrackingRepairedStatus)
+        {
+            this.repairedDataInfo = repairedDataInfo;
+            this.isTrackingRepairedStatus = isTrackingRepairedStatus;
+            if (isTrackingRepairedStatus)
+            {
+                for (SSTableReader sstable : view.sstables)
+                {
+                    if (considerRepairedForTracking(sstable))
+                    {
+                        if (repairedSSTables == null)
+                            repairedSSTables = Sets.newHashSetWithExpectedSize(view.sstables.size());
+                        repairedSSTables.add(sstable);
+                    }
+                }
+            }
+            if (repairedSSTables == null)
+            {
+                repairedIters = Collections.emptyList();
+                unrepairedIters = new ArrayList<>(view.sstables.size());
+            }
+            else
+            {
+                repairedIters = new ArrayList<>(repairedSSTables.size());
+                // when we're done collating, we'll merge the repaired iters and add the
+                // result to the unrepaired list, so size that list accordingly
+                unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
+            }
+            this.repairedMerger = repairedMerger;
+        }
+
+        void addMemtableIterator(T iter)
+        {
+            unrepairedIters.add(iter);
+        }
+
+        void addSSTableIterator(SSTableReader sstable, T iter)
+        {
+            if (repairedSSTables != null && repairedSSTables.contains(sstable))
+                repairedIters.add(iter);
+            else
+                unrepairedIters.add(iter);
+        }
+
+        List<T> finalizeIterators()
+        {
+            if (repairedIters.isEmpty())
+                return unrepairedIters;
+
+            // merge the repaired data before returning, wrapping in a digest generator
+            unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
+            return unrepairedIters;
+        }
+
+        boolean isEmpty()
+        {
+            return repairedIters.isEmpty() && unrepairedIters.isEmpty();
+        }
+
+        // For tracking purposes we consider data repaired if the sstable is either:
+        // * marked repaired
+        // * marked pending, but the local session has been committed. This reduces the window
+        //   whereby the tracking is affected by compaction backlog causing repaired sstables to
+        //   remain in the pending state
+        // If an sstable is involved in a pending repair which is not yet committed, we mark the
+        // repaired data info inconclusive, as the same data on other replicas may be in a
+        // slightly different state.
+        private boolean considerRepairedForTracking(SSTableReader sstable)
+        {
+            if (!isTrackingRepairedStatus)
+                return false;
+
+            UUID pendingRepair = sstable.getPendingRepair();
+            if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
+            {
+                if (ActiveRepairService.instance.consistent.local.isSessionFinalized(pendingRepair))
+                    return true;
+
+                // In the edge case where compaction is backed up long enough for the session to
+                // timeout and be purged by LocalSessions::cleanup, consider the sstable unrepaired
+                // as it will be marked unrepaired when compaction catches up
+                if (!ActiveRepairService.instance.consistent.local.sessionExists(pendingRepair))
+                    return false;
+
+                repairedDataInfo.markInconclusive();
+            }
+
+            return sstable.isRepaired();
+        }
+
+        void markInconclusive()
+        {
+            repairedDataInfo.markInconclusive();
+        }
+
+        public void close() throws Exception
+        {
+            FBUtilities.closeAll(unrepairedIters);
+            FBUtilities.closeAll(repairedIters);
+        }
+    }
+
     private static class Serializer implements IVersionedSerializer<ReadCommand>
     {
         private static int digestFlag(boolean isDigest)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index a71e92d..1b28c2c 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -43,6 +44,9 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         ReadCommand command = message.payload;
         command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout());
 
+        if (message.parameters.containsKey(ParameterType.TRACK_REPAIRED_DATA))
+            command.trackRepairedStatus();
+
         ReadResponse response;
         try (ReadExecutionController executionController = command.executionController();
              UnfilteredPartitionIterator iterator = command.executeLocally(executionController))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 486980d..2ddb6a7 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -51,11 +51,19 @@ public abstract class ReadResponse
     }
 
     @VisibleForTesting
-    public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
+    public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data,
+                                                        ByteBuffer repairedDataDigest,
+                                                        boolean isRepairedDigestConclusive,
+                                                        ReadCommand command,
+                                                        int version)
     {
-        return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()), MessagingService.current_version);
+        return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()),
+                                      repairedDataDigest,
+                                      isRepairedDigestConclusive,
+                                      version);
     }
 
+
     public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command)
     {
         return new DigestResponse(makeDigest(data, command));
@@ -63,6 +71,9 @@ public abstract class ReadResponse
 
     public abstract UnfilteredPartitionIterator makeIterator(ReadCommand command);
     public abstract ByteBuffer digest(ReadCommand command);
+    public abstract ByteBuffer repairedDataDigest();
+    public abstract boolean isRepairedDigestConclusive();
+    public abstract boolean mayIncludeRepairedDigest();
 
     public abstract boolean isDigestResponse();
 
@@ -85,18 +96,22 @@ public abstract class ReadResponse
                 }
             }
         }
-        return "<key " + key + " not found>";
+        return String.format("<key %s not found (repaired_digest=%s repaired_digest_conclusive=%s)>",
+                             key, ByteBufferUtil.bytesToHex(repairedDataDigest()), isRepairedDigestConclusive());
     }
 
     private String toDebugString(UnfilteredRowIterator partition, TableMetadata metadata)
     {
         StringBuilder sb = new StringBuilder();
 
-        sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s",
+        sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s repaired_digest=%s repaired_digest_conclusive==%s",
                                 metadata,
                                 metadata.partitionKeyType.getString(partition.partitionKey().getKey()),
                                 partition.partitionLevelDeletion(),
-                                partition.columns()));
+                                partition.columns(),
+                                ByteBufferUtil.bytesToHex(repairedDataDigest()),
+                                isRepairedDigestConclusive()
+                                ));
 
         if (partition.staticRow() != Rows.EMPTY_STATIC_ROW)
             sb.append("\n    ").append(partition.staticRow().toString(metadata, true));
@@ -130,6 +145,21 @@ public abstract class ReadResponse
             throw new UnsupportedOperationException();
         }
 
+        public boolean mayIncludeRepairedDigest()
+        {
+            return false;
+        }
+
+        public ByteBuffer repairedDataDigest()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isRepairedDigestConclusive()
+        {
+            throw new UnsupportedOperationException();
+        }
+
         public ByteBuffer digest(ReadCommand command)
         {
             // We assume that the digest is in the proper version, which bug excluded should be true since this is called with
@@ -150,7 +180,11 @@ public abstract class ReadResponse
     {
         private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
         {
-            super(build(iter, command.columnFilter()), MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+            super(build(iter, command.columnFilter()),
+                  command.getRepairedDataDigest(),
+                  command.isRepairedDataDigestConclusive(),
+                  MessagingService.current_version,
+                  SerializationHelper.Flag.LOCAL);
         }
 
         private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
@@ -171,9 +205,12 @@ public abstract class ReadResponse
     // built on the coordinator node receiving a response
     private static class RemoteDataResponse extends DataResponse
     {
-        protected RemoteDataResponse(ByteBuffer data, int version)
+        protected RemoteDataResponse(ByteBuffer data,
+                                     ByteBuffer repairedDataDigest,
+                                     boolean isRepairedDigestConclusive,
+                                     int version)
         {
-            super(data, version, SerializationHelper.Flag.FROM_REMOTE);
+            super(data, repairedDataDigest, isRepairedDigestConclusive, version, SerializationHelper.Flag.FROM_REMOTE);
         }
     }
 
@@ -182,13 +219,21 @@ public abstract class ReadResponse
         // TODO: can the digest be calculated over the raw bytes now?
         // The response, serialized in the current messaging version
         private final ByteBuffer data;
+        private final ByteBuffer repairedDataDigest;
+        private final boolean isRepairedDigestConclusive;
         private final int dataSerializationVersion;
         private final SerializationHelper.Flag flag;
 
-        protected DataResponse(ByteBuffer data, int dataSerializationVersion, SerializationHelper.Flag flag)
+        protected DataResponse(ByteBuffer data,
+                               ByteBuffer repairedDataDigest,
+                               boolean isRepairedDigestConclusive,
+                               int dataSerializationVersion,
+                               SerializationHelper.Flag flag)
         {
             super();
             this.data = data;
+            this.repairedDataDigest = repairedDataDigest;
+            this.isRepairedDigestConclusive = isRepairedDigestConclusive;
             this.dataSerializationVersion = dataSerializationVersion;
             this.flag = flag;
         }
@@ -213,6 +258,21 @@ public abstract class ReadResponse
             }
         }
 
+        public boolean mayIncludeRepairedDigest()
+        {
+            return dataSerializationVersion >= MessagingService.VERSION_40;
+        }
+
+        public ByteBuffer repairedDataDigest()
+        {
+            return repairedDataDigest;
+        }
+
+        public boolean isRepairedDigestConclusive()
+        {
+            return isRepairedDigestConclusive;
+        }
+
         public ByteBuffer digest(ReadCommand command)
         {
             try (UnfilteredPartitionIterator iterator = makeIterator(command))
@@ -233,10 +293,25 @@ public abstract class ReadResponse
         {
             boolean isDigest = response instanceof DigestResponse;
             ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
             ByteBufferUtil.writeWithVIntLength(digest, out);
             if (!isDigest)
             {
+                // From 4.0, a coordinator may request additional info about the repaired data that
+                // makes up the response, namely a digest generated from the repaired data and a
+                // flag indicating our level of confidence in that digest. The digest may be considered
+                // inconclusive if it may have been affected by some unrepaired data during read.
+                // e.g. some sstables read during this read were involved in pending but not yet
+                // committed repair sessions or an unrepaired partition tombstone meant that not all
+                // repaired sstables were read (but they might be on other replicas).
+                // If the coordinator did not request this info, the response contains an empty digest
+                // and a true for the isConclusive flag.
+                // If the messaging version is < 4.0, these are omitted altogether.
+                if (version >= MessagingService.VERSION_40)
+                {
+                    ByteBufferUtil.writeWithVIntLength(response.repairedDataDigest(), out);
+                    out.writeBoolean(response.isRepairedDigestConclusive());
+                }
+
                 ByteBuffer data = ((DataResponse)response).data;
                 ByteBufferUtil.writeWithVIntLength(data, out);
             }
@@ -248,18 +323,42 @@ public abstract class ReadResponse
             if (digest.hasRemaining())
                 return new DigestResponse(digest);
 
+            // A data response may also contain a digest of the portion of its payload
+            // that comes from the replica's repaired set, along with a flag indicating
+            // whether or not the digest may be influenced by unrepaired/pending
+            // repaired data
+            boolean repairedDigestConclusive;
+            if (version >= MessagingService.VERSION_40)
+            {
+                digest = ByteBufferUtil.readWithVIntLength(in);
+                repairedDigestConclusive = in.readBoolean();
+            }
+            else
+            {
+                digest = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                repairedDigestConclusive = true;
+            }
+
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
-            return new RemoteDataResponse(data, version);
+            return new RemoteDataResponse(data, digest, repairedDigestConclusive, version);
         }
 
         public long serializedSize(ReadResponse response, int version)
         {
             boolean isDigest = response instanceof DigestResponse;
             ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
             long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
+
             if (!isDigest)
             {
+                // From 4.0, a coordinator may request an additional info about the repaired data
+                // that makes up the response.
+                if (version >= MessagingService.VERSION_40)
+                {
+                    size += ByteBufferUtil.serializedSizeWithVIntLength(response.repairedDataDigest());
+                    size += 1;
+                }
+
                 // In theory, we should deserialize/re-serialize if the version asked is different from the current
                 // version as the content could have a different serialization format. So far though, we haven't made
                 // change to partition iterators serialization since 3.0 so we skip this.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c81185e..e99a487 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.cache.IRowCacheEntry;
@@ -44,12 +43,11 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SearchIterator;
@@ -65,8 +63,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
     private final DecoratedKey partitionKey;
     private final ClusteringIndexFilter clusteringIndexFilter;
 
-    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
-
     @VisibleForTesting
     protected SinglePartitionReadCommand(boolean isDigest,
                                          int digestVersion,
@@ -393,7 +389,9 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
     @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
     protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
     {
-        UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+        // skip the row cache and go directly to sstables/memtable if repaired status of
+        // data is being tracked. This is only requested after an initial digest mismatch
+        UnfilteredRowIterator partition = cfs.isRowCacheEnabled() && !isTrackingRepairedStatus()
                                         ? getThroughCache(cfs, executionController)
                                         : queryMemtableAndDisk(cfs, executionController);
         return new SingletonUnfilteredPartitionIterator(partition);
@@ -564,12 +562,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         return queryMemtableAndDiskInternal(cfs);
     }
 
-    @Override
-    protected int oldestUnrepairedTombstone()
-    {
-        return oldestUnrepairedTombstone;
-    }
-
     private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs)
     {
         /*
@@ -582,16 +574,19 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
          *      and if we have neither non-frozen collections/UDTs nor counters (indeed, for a non-frozen collection or UDT,
          *      we can't guarantee an older sstable won't have some elements that weren't in the most recent sstables,
          *      and counters are intrinsically a collection of shards and so have the same problem).
+         *      Also, if tracking repaired data then we skip this optimization so we can collate the repaired sstables
+         *      and generate a digest over their merge, which procludes an early return.
          */
-        if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType())
+        if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType() && !isTrackingRepairedStatus())
             return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter());
 
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
-        List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
         ClusteringIndexFilter filter = clusteringIndexFilter();
         long minTimestamp = Long.MAX_VALUE;
-
+        long mostRecentPartitionTombstone = Long.MIN_VALUE;
+        InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view);
         try
         {
             for (Memtable memtable : view.memtables)
@@ -604,8 +599,13 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
 
                 @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
                 UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+
+                // Memtable data is always considered unrepaired
                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
-                iterators.add(iter);
+                inputCollector.addMemtableIterator(iter);
+
+                mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+                                                        iter.partitionLevelDeletion().markedForDeleteAt());
             }
 
             /*
@@ -620,18 +620,25 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
              * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
              * in one pass, and minimize the number of sstables for which we read a partition tombstone.
              */
-            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-            long mostRecentPartitionTombstone = Long.MIN_VALUE;
             int nonIntersectingSSTables = 0;
             List<SSTableReader> skippedSSTablesWithTombstones = null;
             SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
 
+            if (isTrackingRepairedStatus())
+                Tracing.trace("Collecting data from sstables and tracking repaired status");
+
             for (SSTableReader sstable : view.sstables)
             {
                 // if we've already seen a partition tombstone with a timestamp greater
                 // than the most recent update to this sstable, we can skip it
+                // if we're tracking repaired status, we mark the repaired digest inconclusive
+                // as other replicas may not have seen this partition delete and so could include
+                // data from this sstable (or others) in their digests
                 if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+                {
+                    inputCollector.markInconclusive();
                     break;
+                }
 
                 if (!shouldInclude(sstable))
                 {
@@ -654,7 +661,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 if (!sstable.isRepaired())
                     oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
-                iterators.add(iter);
+                inputCollector.addSSTableIterator(sstable, iter);
                 mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
                                                         iter.partitionLevelDeletion().markedForDeleteAt());
             }
@@ -674,7 +681,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                     if (!sstable.isRepaired())
                         oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
-                    iterators.add(iter);
+                    inputCollector.addSSTableIterator(sstable, iter);
                     includedDueToTombstones++;
                 }
             }
@@ -682,21 +689,22 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                 Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
                                nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
 
-            if (iterators.isEmpty())
+            if (inputCollector.isEmpty())
                 return EmptyIterators.unfilteredRow(cfs.metadata(), partitionKey(), filter.isReversed());
 
             StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
-            return withSSTablesIterated(iterators, cfs.metric, metricsCollector);
+
+            return withSSTablesIterated(inputCollector.finalizeIterators(), cfs.metric, metricsCollector);
         }
         catch (RuntimeException | Error e)
         {
             try
             {
-                FBUtilities.closeAll(iterators);
+                inputCollector.close();
             }
-            catch (Exception suppressed)
+            catch (Exception e1)
             {
-                e.addSuppressed(suppressed);
+                e.addSuppressed(e1);
             }
             throw e;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index 4850fd5..f25dc91 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -206,6 +206,20 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
         return sb.toString();
     }
 
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ClusteringIndexNamesFilter that = (ClusteringIndexNamesFilter) o;
+        return Objects.equals(clusterings, that.clusterings) &&
+               Objects.equals(reversed, that.reversed);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(clusterings, reversed);
+    }
+
     public Kind kind()
     {
         return Kind.NAMES;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 851e447..42807a2 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -599,4 +599,5 @@ public abstract class UnfilteredRowIterators
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 5a90804..f1df026 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -123,6 +124,24 @@ public class KeyspaceMetrics
     /** histogram over the number of partitions we have validated */
     public final Histogram partitionsValidated;
 
+    /*
+     * Metrics for inconsistencies detected between repaired data sets across replicas. These
+     * are tracked on the coordinator.
+     */
+
+    /**
+     * Incremented where an inconsistency is detected and there are no pending repair sessions affecting
+     * the data being read, indicating a genuine mismatch between replicas' repaired data sets.
+     */
+    public final Meter confirmedRepairedInconsistencies;
+    /**
+     * Incremented where an inconsistency is detected, but there are pending & uncommitted repair sessions
+     * in play on at least one replica. This may indicate a false positive as the inconsistency could be due to
+     * replicas marking the repair session as committed at slightly different times and so some consider it to
+     * be part of the repaired set whilst others do not.
+     */
+    public final Meter unconfirmedRepairedInconsistencies;
+
     public final MetricNameFactory factory;
     private Keyspace keyspace;
 
@@ -283,6 +302,9 @@ public class KeyspaceMetrics
         repairSyncTime = Metrics.timer(factory.createMetricName("RepairSyncTime"));
         partitionsValidated = Metrics.histogram(factory.createMetricName("PartitionsValidated"), false);
         bytesValidated = Metrics.histogram(factory.createMetricName("BytesValidated"), false);
+
+        confirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesConfirmed"));
+        unconfirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesUnconfirmed"));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 53ebcb0..52c50b8 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -217,6 +217,19 @@ public class TableMetrics
     public final Counter speculativeWrites;
     public final Gauge<Long> speculativeWriteLatencyNanos;
 
+    /**
+     * Metrics for inconsistencies detected between repaired data sets across replicas. These
+     * are tracked on the coordinator.
+     */
+    // Incremented where an inconsistency is detected and there are no pending repair sessions affecting
+    // the data being read, indicating a genuine mismatch between replicas' repaired data sets.
+    public final TableMeter confirmedRepairedInconsistencies;
+    // Incremented where an inconsistency is detected, but there are pending & uncommitted repair sessions
+    // in play on at least one replica. This may indicate a false positive as the inconsistency could be due to
+    // replicas marking the repair session as committed at slightly different times and so some consider it to
+    // be part of the repaired set whilst others do not.
+    public final TableMeter unconfirmedRepairedInconsistencies;
+
     public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -922,6 +935,9 @@ public class TableMetrics
 
         readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
         shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
+
+        confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies);
+        unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
     }
 
     public void updateSSTableIterated(int count)
@@ -1091,6 +1107,21 @@ public class TableMetrics
                                             globalAliasFactory.createMetricName(alias)));
     }
 
+    protected TableMeter createTableMeter(String name, Meter keyspaceMeter)
+    {
+        return createTableMeter(name, name, keyspaceMeter);
+    }
+
+    protected TableMeter createTableMeter(String name, String alias, Meter keyspaceMeter)
+    {
+        Meter meter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias));
+        register(name, alias, meter);
+        return new TableMeter(meter,
+                              keyspaceMeter,
+                              Metrics.meter(globalFactory.createMetricName(name),
+                                            globalAliasFactory.createMetricName(alias)));
+    }
+
     /**
      * Registers a metric to be removed when unloading CF.
      * @return true if first time metric with that name has been registered
@@ -1103,6 +1134,25 @@ public class TableMetrics
         return ret;
     }
 
+    public static class TableMeter
+    {
+        public final Meter[] all;
+        public final Meter table;
+        private TableMeter(Meter table, Meter keyspace, Meter global)
+        {
+            this.table = table;
+            this.all = new Meter[]{table, keyspace, global};
+        }
+
+        public void mark()
+        {
+            for (Meter meter : all)
+            {
+                meter.mark();
+            }
+        }
+    }
+
     public static class TableHistogram
     {
         public final Histogram[] all;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/net/ParameterType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ParameterType.java b/src/java/org/apache/cassandra/net/ParameterType.java
index 0a1f73f..b7a88a8 100644
--- a/src/java/org/apache/cassandra/net/ParameterType.java
+++ b/src/java/org/apache/cassandra/net/ParameterType.java
@@ -39,7 +39,8 @@ public enum ParameterType
     FAILURE_REASON("FAIL_REASON", ShortVersionedSerializer.instance),
     FAILURE_CALLBACK("CAL_BAC", DummyByteVersionedSerializer.instance),
     TRACE_SESSION("TraceSession", UUIDSerializer.serializer),
-    TRACE_TYPE("TraceType", Tracing.traceTypeSerializer);
+    TRACE_TYPE("TraceType", Tracing.traceTypeSerializer),
+    TRACK_REPAIRED_DATA("TrackRepaired", DummyByteVersionedSerializer.instance);
 
     public static final Map<String, ParameterType> byName;
     public final String key;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 4089e77..eac1ea0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -783,6 +783,23 @@ public class LocalSessions
         return session != null && session.getState() != FINALIZED && session.getState() != FAILED;
     }
 
+    /**
+     * determines if a local session exists, and if it's in the finalized state
+     */
+    public boolean isSessionFinalized(UUID sessionID)
+    {
+        LocalSession session = getSession(sessionID);
+        return session != null && session.getState() == FINALIZED;
+    }
+
+    /**
+     * determines if a local session exists
+     */
+    public boolean sessionExists(UUID sessionID)
+    {
+        return getSession(sessionID) != null;
+    }
+
     @VisibleForTesting
     protected boolean sessionHasData(LocalSession session)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c23eb88..9d9c628 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2122,12 +2122,21 @@ public class StorageProxy implements StorageProxyMBean
             Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
 
             ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver,
-                                                                              replicaLayout.consistencyLevel().blockFor(keyspace),
-                                                                              rangeCommand,
-                                                                              replicaLayout,
-                                                                              queryStartNanoTime);
+                                                                                                 replicaLayout.consistencyLevel().blockFor(keyspace),
+                                                                                                 rangeCommand,
+                                                                                                 replicaLayout,
+                                                                                                 queryStartNanoTime);
 
             handler.assureSufficientLiveNodes();
+
+            // If enabled, request repaired data tracking info from full replicas but
+            // only if there are multiple full replicas to compare results from
+            if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
+                && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+            {
+                command.trackRepairedStatus();
+            }
+
             if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal())
             {
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
@@ -2137,7 +2146,10 @@ public class StorageProxy implements StorageProxyMBean
                 for (Replica replica : replicaLayout.selected())
                 {
                     Tracing.trace("Enqueuing request to {}", replica);
-                    MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), replica.endpoint(), handler);
+                    MessageOut<ReadCommand> message = rangeCommand.createMessage();
+                    if (command.isTrackingRepairedStatus() && replica.isFull())
+                        message =  message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+                    MessagingService.instance().sendRRWithFailure(message, replica.endpoint(), handler);
                 }
             }
 
@@ -2798,6 +2810,60 @@ public class StorageProxy implements StorageProxyMBean
         DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
     }
 
+    @Override
+    public void enableRepairedDataTrackingForRangeReads()
+    {
+        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
+    }
+
+    @Override
+    public void disableRepairedDataTrackingForRangeReads()
+    {
+        DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
+    }
+
+    @Override
+    public boolean getRepairedDataTrackingEnabledForRangeReads()
+    {
+        return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
+    }
+
+    @Override
+    public void enableRepairedDataTrackingForPartitionReads()
+    {
+        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
+    }
+
+    @Override
+    public void disableRepairedDataTrackingForPartitionReads()
+    {
+        DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
+    }
+
+    @Override
+    public boolean getRepairedDataTrackingEnabledForPartitionReads()
+    {
+        return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
+    }
+
+    @Override
+    public void enableReportingUnconfirmedRepairedDataMismatches()
+    {
+        DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+    }
+
+    @Override
+    public void disableReportingUnconfirmedRepairedDataMismatches()
+    {
+       DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
+    }
+
+    @Override
+    public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled()
+    {
+        return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
+    }
+
     static class PaxosBallotAndContention
     {
         final UUID ballot;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 76a6617..efc163d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -93,4 +93,19 @@ public interface StorageProxyMBean
      * Stop logging queries but leave any generated files on disk.
      */
     public void stopFullQueryLogger();
+
+    /**
+     * Tracking and reporting of variances in the repaired data set across replicas at read time
+     */
+    void enableRepairedDataTrackingForRangeReads();
+    void disableRepairedDataTrackingForRangeReads();
+    boolean getRepairedDataTrackingEnabledForRangeReads();
+
+    void enableRepairedDataTrackingForPartitionReads();
+    void disableRepairedDataTrackingForPartitionReads();
+    boolean getRepairedDataTrackingEnabledForPartitionReads();
+
+    void enableReportingUnconfirmedRepairedDataMismatches();
+    void disableReportingUnconfirmedRepairedDataMismatches();
+    boolean getReportingUnconfirmedRepairedDataMismatchesEnabled();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 9043e87..1f69d6a 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
@@ -43,13 +42,12 @@ import org.apache.cassandra.db.transform.Filter;
 import org.apache.cassandra.db.transform.FilteredPartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaCollection;
 import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
+import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 
 import static com.google.common.collect.Iterables.*;
 
@@ -84,9 +82,25 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
 
         E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
         List<UnfilteredPartitionIterator> iters = new ArrayList<>(
-                Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
+        Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
         assert replicas.size() == iters.size();
 
+        // If requested, inspect each response for a digest of the replica's repaired data set
+        RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
+                                                  ? new RepairedDataTracker(getRepairedDataVerifier(command))
+                                                  : null;
+        if (repairedDataTracker != null)
+        {
+            messages.forEach(msg -> {
+                if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from).isFull())
+                {
+                    repairedDataTracker.recordDigest(msg.from,
+                                                     msg.payload.repairedDataDigest(),
+                                                     msg.payload.isRepairedDigestConclusive());
+                }
+            });
+        }
+
         /*
          * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
          * have more rows than the client requested. To make sure that we still conform to the original limit,
@@ -105,17 +119,25 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
 
         UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
                                                                           replicaLayout.withSelected(replicas),
-                                                                          mergedResultCounter);
+                                                                          mergedResultCounter,
+                                                                          repairedDataTracker);
         FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
         PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
         return Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
+    protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
+    {
+        return RepairedDataVerifier.simple(command);
+    }
+
     private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
                                                                      L sources,
-                                                                     DataLimits.Counter mergedResultCounter)
+                                                                     DataLimits.Counter mergedResultCounter,
+                                                                     RepairedDataTracker repairedDataTracker)
     {
-        // If we have only one results, there is no read repair to do and we can't get short reads
+        // If we have only one results, there is no read repair to do, we can't get short
+        // reads and we can't make a comparison between repaired data sets
         if (results.size() == 1)
             return results.get(0);
 
@@ -127,7 +149,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
             for (int i = 0; i < results.size(); i++)
                 results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
 
-        return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources));
+        return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
     }
 
     private String makeResponsesDebugString(DecoratedKey partitionKey)
@@ -135,7 +157,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
         return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
     }
 
-    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources)
+    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources, RepairedDataTracker repairedDataTracker)
     {
         return new UnfilteredPartitionIterators.MergeListener()
         {
@@ -224,6 +246,8 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
             public void close()
             {
                 partitionListener.close();
+                if (repairedDataTracker != null)
+                    repairedDataTracker.verify();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
index ef1d45b..1a454f9 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service.reads;
 
 import java.net.InetAddress;
 
+
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 30dea74..493b9d0 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
 import com.google.common.base.Preconditions;
 
 import com.codahale.metrics.Meter;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
@@ -32,11 +33,12 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.service.reads.DataResolver;
 import org.apache.cassandra.service.reads.DigestResolver;
 import org.apache.cassandra.service.reads.ReadCallback;
@@ -75,9 +77,14 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
         this.cfs = Keyspace.openAndGetStore(command.metadata());
     }
 
-    void sendReadCommand(InetAddressAndPort to, ReadCallback readCallback)
+    void sendReadCommand(Replica to, ReadCallback readCallback)
     {
-        MessagingService.instance().sendRRWithFailure(command.createMessage(), to, readCallback);
+        MessageOut<ReadCommand> message = command.createMessage();
+        // if enabled, request additional info about repaired data from any full replicas
+        if (command.isTrackingRepairedStatus() && to.isFull())
+            message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+
+        MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback);
     }
 
     abstract Meter getRepairMeter();
@@ -94,10 +101,14 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
 
         digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
 
+        // if enabled, request additional info about repaired data from any full replicas
+        if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
+            command.trackRepairedStatus();
+
         for (Replica replica : replicaLayout.selected())
         {
             Tracing.trace("Enqueuing full data read to {}", replica);
-            sendReadCommand(replica.endpoint(), readCallback);
+            sendReadCommand(replica, readCallback);
         }
         ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints());
     }
@@ -137,7 +148,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
 
             Replica replica = uncontacted.selected().iterator().next();
             Tracing.trace("Enqueuing speculative full data read to {}", replica);
-            sendReadCommand(replica.endpoint(), repair.readCallback);
+            sendReadCommand(replica, repair.readCallback);
             ReadRepairMetrics.speculatedRead.mark();
             ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
new file mode 100644
index 0000000..5024e86
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class RepairedDataTracker
+{
+    private final RepairedDataVerifier verifier;
+
+    public final Multimap<ByteBuffer, InetAddressAndPort> digests = HashMultimap.create();
+    public final Set<InetAddressAndPort> inconclusiveDigests = new HashSet<>();
+
+    public RepairedDataTracker(RepairedDataVerifier verifier)
+    {
+        this.verifier = verifier;
+    }
+
+    public void recordDigest(InetAddressAndPort source, ByteBuffer digest, boolean isConclusive)
+    {
+        digests.put(digest, source);
+        if (!isConclusive)
+            inconclusiveDigests.add(source);
+    }
+
+    public void verify()
+    {
+        verifier.verify(this);
+    }
+
+    public String toString()
+    {
+        return MoreObjects.toStringHelper(this)
+                          .add("digests", hexDigests())
+                          .add("inconclusive", inconclusiveDigests).toString();
+    }
+
+    private Map<String, Collection<InetAddressAndPort>> hexDigests()
+    {
+        Map<String, Collection<InetAddressAndPort>> hexDigests = new HashMap<>();
+        digests.asMap().forEach((k, v) -> hexDigests.put(ByteBufferUtil.bytesToHex(k), v));
+        return hexDigests;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        RepairedDataTracker that = (RepairedDataTracker) o;
+        return Objects.equals(digests, that.digests) &&
+               Objects.equals(inconclusiveDigests, that.inconclusiveDigests);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(digests, inconclusiveDigests);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org