You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2018/09/05 18:13:33 UTC
[1/2] cassandra git commit: Detect inconsistencies in repaired data
on the read path
Repository: cassandra
Updated Branches:
refs/heads/trunk 744973e4c -> 5fbb938ad
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
new file mode 100644
index 0000000..816fe9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+public interface RepairedDataVerifier
+{
+ public void verify(RepairedDataTracker tracker);
+
+ static RepairedDataVerifier simple(ReadCommand command)
+ {
+ return new SimpleVerifier(command);
+ }
+
+ static class SimpleVerifier implements RepairedDataVerifier
+ {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleVerifier.class);
+ private final ReadCommand command;
+
+ private static final String INCONSISTENCY_WARNING = "Detected mismatch between repaired datasets for table {}.{} during read of {}. {}";
+
+ SimpleVerifier(ReadCommand command)
+ {
+ this.command = command;
+ }
+
+ @Override
+ public void verify(RepairedDataTracker tracker)
+ {
+ Tracing.trace("Verifying repaired data tracker {}", tracker);
+
+ // some mismatch occurred between the repaired datasets on the replicas
+ if (tracker.digests.keySet().size() > 1)
+ {
+ // if any of the digests should be considered inconclusive, because there were
+ // pending repair sessions which had not yet been committed or unrepaired partition
+ // deletes which meant some sstables were skipped during reads, mark the inconsistency
+ // as confirmed
+ if (tracker.inconclusiveDigests.isEmpty())
+ {
+ TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
+ metrics.confirmedRepairedInconsistencies.mark();
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
+ INCONSISTENCY_WARNING, command.metadata().keyspace,
+ command.metadata().name, getCommandString(), tracker);
+ }
+ else if (DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
+ {
+ TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
+ metrics.unconfirmedRepairedInconsistencies.mark();
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
+ INCONSISTENCY_WARNING, command.metadata().keyspace,
+ command.metadata().name, getCommandString(), tracker);
+ }
+ }
+ }
+
+ private String getCommandString()
+ {
+ return command instanceof SinglePartitionReadCommand
+ ? ((SinglePartitionReadCommand)command).partitionKey().toString()
+ : ((PartitionRangeReadCommand)command).dataRange().keyRange().getString(command.metadata().partitionKeyType);
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 5020b95..8df7651 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -37,28 +39,36 @@ import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.partitions.FilteredPartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class ReadCommandTest
{
@@ -68,6 +78,20 @@ public class ReadCommandTest
private static final String CF3 = "Standard3";
private static final String CF4 = "Standard4";
private static final String CF5 = "Standard5";
+ private static final String CF6 = "Standard6";
+
+ private static final InetAddressAndPort REPAIR_COORDINATOR;
+ static {
+ try
+ {
+ REPAIR_COORDINATOR = InetAddressAndPort.getByName("10.0.0.1");
+ }
+ catch (UnknownHostException e)
+ {
+
+ throw new AssertionError(e);
+ }
+ }
@BeforeClass
public static void defineSchema() throws ConfigurationException
@@ -116,6 +140,14 @@ public class ReadCommandTest
.addRegularColumn("e", AsciiType.instance)
.addRegularColumn("f", AsciiType.instance);
+ TableMetadata.Builder metadata6 =
+ TableMetadata.builder(KEYSPACE, CF6)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .caching(CachingParams.CACHE_EVERYTHING);
+
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
@@ -123,7 +155,10 @@ public class ReadCommandTest
metadata2,
metadata3,
metadata4,
- metadata5);
+ metadata5,
+ metadata6);
+
+ LocalSessionAccessor.startup();
}
@Test
@@ -540,4 +575,242 @@ public class ReadCommandTest
assertEquals(1, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
}
+ @Test
+ public void testSinglePartitionSliceRepairedDataTracking() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+ testRepairedDataTracking(cfs, readCommand);
+ }
+
+ @Test
+ public void testPartitionRangeRepairedDataTracking() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+ ReadCommand readCommand = Util.cmd(cfs).build();
+ testRepairedDataTracking(cfs, readCommand);
+ }
+
+ @Test
+ public void testSinglePartitionNamesRepairedDataTracking() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
+ testRepairedDataTracking(cfs, readCommand);
+ }
+
+ @Test
+ public void testSinglePartitionNamesSkipsOptimisationsIfTrackingRepairedData()
+ {
+ // when tracking, the optimizations of querying sstables in timestamp order and
+ // returning once all requested columns are not available so just assert that
+ // all sstables are read when performing such queries
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("wxyz"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+ List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+ assertEquals(2, sstables.size());
+ Collections.sort(sstables, SSTableReader.maxTimestampComparator);
+
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").columns("a").build();
+
+ assertEquals(0, readCount(sstables.get(0)));
+ assertEquals(0, readCount(sstables.get(1)));
+ ReadCommand withTracking = readCommand.copy();
+ withTracking.trackRepairedStatus();
+ Util.getAll(withTracking);
+ assertEquals(1, readCount(sstables.get(0)));
+ assertEquals(1, readCount(sstables.get(1)));
+
+ // same command without tracking touches only the table with the higher timestamp
+ Util.getAll(readCommand.copy());
+ assertEquals(2, readCount(sstables.get(0)));
+ assertEquals(1, readCount(sstables.get(1)));
+ }
+
+ private long readCount(SSTableReader sstable)
+ {
+ return sstable.getReadMeter().count();
+ }
+
+ @Test
+ public void skipRowCacheIfTrackingRepairedData()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+ .clustering("cc")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+ assertTrue(cfs.isRowCacheEnabled());
+ // warm the cache
+ assertFalse(Util.getAll(readCommand).isEmpty());
+ long cacheHits = cfs.metric.rowCacheHit.getCount();
+
+ Util.getAll(readCommand);
+ assertTrue(cfs.metric.rowCacheHit.getCount() > cacheHits);
+ cacheHits = cfs.metric.rowCacheHit.getCount();
+
+ ReadCommand withRepairedInfo = readCommand.copy();
+ withRepairedInfo.trackRepairedStatus();
+ Util.getAll(withRepairedInfo);
+ assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
+ }
+
+ private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand readCommand) throws IOException
+ {
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
+ .clustering("cc")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+ List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
+ assertEquals(2, sstables.size());
+ sstables.forEach(sstable -> assertFalse(sstable.isRepaired() || sstable.isPendingRepair()));
+ SSTableReader sstable1 = sstables.get(0);
+ SSTableReader sstable2 = sstables.get(1);
+
+ int numPartitions = 1;
+ int rowsPerPartition = 2;
+
+ // Capture all the digest versions as we mutate the table's repaired status. Each time
+ // we make a change, we expect a different digest.
+ Set<ByteBuffer> digests = new HashSet<>();
+ // first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive
+ ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true);
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+ digests.add(digest);
+
+ // add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive
+ UUID session1 = UUIDGen.getTimeUUID();
+ mutateRepaired(cfs, sstable1, ActiveRepairService.UNREPAIRED_SSTABLE, session1);
+ digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+ assertEquals(1, digests.size());
+
+ // add a different pending session to table2, digest should remain the same and still consider it inconclusive
+ UUID session2 = UUIDGen.getTimeUUID();
+ mutateRepaired(cfs, sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, session2);
+ digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+ assertEquals(1, digests.size());
+
+ // mark one table repaired
+ mutateRepaired(cfs, sstable1, 111, null);
+ // this time, digest should not be empty, session2 still means that the result is inconclusive
+ digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
+ assertEquals(2, digests.size());
+
+ // mark the second table repaired
+ mutateRepaired(cfs, sstable2, 222, null);
+ // digest should be updated again and as there are no longer any pending sessions, it should be considered conclusive
+ digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true));
+ assertEquals(3, digests.size());
+
+ // insert a partition tombstone into the memtable, then re-check the repaired info.
+ // This is to ensure that when the optimisations which skip reading from sstables
+ // when a newer partition tombstone has already been cause the digest to be marked
+ // as inconclusive.
+ // the exception to this case is for partition range reads, where we always read
+ // and generate digests for all sstables, so we only test this path for single partition reads
+ if (readCommand.isLimitedToOnePartition())
+ {
+ new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), ByteBufferUtil.bytes("key"))
+ .delete()
+ .build()).apply();
+ digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+
+ // now flush so we have an unrepaired table with the deletion and repeat the check
+ cfs.forceBlockingFlush();
+ digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+ }
+ }
+
+ private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession) throws IOException
+ {
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false);
+ sstable.reloadSSTableMetadata();
+ if (pendingSession != null)
+ {
+ // setup a minimal repair session. This is necessary because we
+ // check for sessions which have exceeded timeout and been purged
+ Range<Token> range = new Range<>(cfs.metadata().partitioner.getMinimumToken(),
+ cfs.metadata().partitioner.getRandomToken());
+ ActiveRepairService.instance.registerParentRepairSession(pendingSession,
+ REPAIR_COORDINATOR,
+ Lists.newArrayList(cfs),
+ Sets.newHashSet(range),
+ true,
+ repairedAt,
+ true,
+ PreviewKind.NONE);
+
+ LocalSessionAccessor.prepareUnsafe(pendingSession, null, Sets.newHashSet(REPAIR_COORDINATOR));
+ }
+ }
+
+ private ByteBuffer performReadAndVerifyRepairedInfo(ReadCommand command,
+ int expectedPartitions,
+ int expectedRowsPerPartition,
+ boolean expectConclusive)
+ {
+ // perform equivalent read command multiple times and assert that
+ // the repaired data info is always consistent. Return the digest
+ // so we can verify that it changes when the repaired status of
+ // the queried tables does.
+ Set<ByteBuffer> digests = new HashSet<>();
+ for (int i = 0; i < 10; i++)
+ {
+ ReadCommand withRepairedInfo = command.copy();
+ withRepairedInfo.trackRepairedStatus();
+
+ List<FilteredPartition> partitions = Util.getAll(withRepairedInfo);
+ assertEquals(expectedPartitions, partitions.size());
+ partitions.forEach(p -> assertEquals(expectedRowsPerPartition, p.rowCount()));
+
+ ByteBuffer digest = withRepairedInfo.getRepairedDataDigest();
+ digests.add(digest);
+ assertEquals(1, digests.size());
+ assertEquals(expectConclusive, withRepairedInfo.isRepairedDataDigestConclusive());
+ }
+ return digests.iterator().next();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
new file mode 100644
index 0000000..0c43661
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ReadCommandVerbHandlerTest
+{
+ private static final String TEST_NAME = "read_command_vh_test_";
+ private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+ private static final String TABLE = "table1";
+
+ private final Random random = new Random();
+ private ReadCommandVerbHandler handler;
+ private TableMetadata metadata;
+
+ @BeforeClass
+ public static void init()
+ {
+ SchemaLoader.loadSchema();
+ SchemaLoader.schemaDefinition(TEST_NAME);
+ }
+
+ @Before
+ public void setup()
+ {
+ MessagingService.instance().clearMessageSinks();
+ MessagingService.instance().addMessageSink(new IMessageSink()
+ {
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
+ {
+ return false;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ return false;
+ }
+ });
+
+ metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+ handler = new ReadCommandVerbHandler();
+ }
+
+ @Test
+ public void setRepairedDataTrackingFlagIfHeaderPresent()
+ {
+ ReadCommand command = command(key());
+ assertFalse(command.isTrackingRepairedStatus());
+ Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACK_REPAIRED_DATA,
+ MessagingService.ONE_BYTE);
+ handler.doVerb(MessageIn.create(peer(),
+ command,
+ params,
+ MessagingService.Verb.READ,
+ MessagingService.current_version),
+ messageId());
+ assertTrue(command.isTrackingRepairedStatus());
+ }
+
+ @Test
+ public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent()
+ {
+ ReadCommand command = command(key());
+ assertFalse(command.isTrackingRepairedStatus());
+ Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACE_SESSION,
+ UUID.randomUUID());
+ handler.doVerb(MessageIn.create(peer(),
+ command,
+ params,
+ MessagingService.Verb.READ,
+ MessagingService.current_version),
+ messageId());
+ assertFalse(command.isTrackingRepairedStatus());
+ }
+
+ @Test
+ public void dontSetRepairedDataTrackingFlagIfHeadersEmpty()
+ {
+ ReadCommand command = command(key());
+ assertFalse(command.isTrackingRepairedStatus());
+ handler.doVerb(MessageIn.create(peer(),
+ command,
+ ImmutableMap.of(),
+ MessagingService.Verb.READ,
+ MessagingService.current_version),
+ messageId());
+ assertFalse(command.isTrackingRepairedStatus());
+ }
+
+ private int key()
+ {
+ return random.nextInt();
+ }
+
+ private int messageId()
+ {
+ return random.nextInt();
+ }
+
+ private InetAddressAndPort peer()
+ {
+ try
+ {
+ return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, 9});
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ReadCommand command(int key)
+ {
+ return new SinglePartitionReadCommand(false,
+ 0,
+ false,
+ metadata,
+ FBUtilities.nowInSeconds(),
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+ new ClusteringIndexSliceFilter(Slices.ALL, false),
+ null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
new file mode 100644
index 0000000..6e1a804
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ReadResponseTest
+{
+
+ private final Random random = new Random();
+ private TableMetadata metadata;
+
+ @Before
+ public void setup()
+ {
+ metadata = TableMetadata.builder("ks", "t1")
+ .addPartitionKeyColumn("p", Int32Type.instance)
+ .addRegularColumn("v", Int32Type.instance)
+ .partitioner(Murmur3Partitioner.instance)
+ .build();
+ }
+
+ @Test
+ public void fromCommandWithConclusiveRepairedDigest()
+ {
+ ByteBuffer digest = digest();
+ ReadCommand command = command(key(), metadata, digest, true);
+ ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+ assertTrue(response.isRepairedDigestConclusive());
+ assertEquals(digest, response.repairedDataDigest());
+ verifySerDe(response);
+ }
+
+ @Test
+ public void fromCommandWithInconclusiveRepairedDigest()
+ {
+ ByteBuffer digest = digest();
+ ReadCommand command = command(key(), metadata, digest, false);
+ ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+ assertFalse(response.isRepairedDigestConclusive());
+ assertEquals(digest, response.repairedDataDigest());
+ verifySerDe(response);
+ }
+
+ @Test
+ public void fromCommandWithConclusiveEmptyRepairedDigest()
+ {
+ ReadCommand command = command(key(), metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+ ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+ assertTrue(response.isRepairedDigestConclusive());
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest());
+ verifySerDe(response);
+ }
+
+ @Test
+ public void fromCommandWithInconclusiveEmptyRepairedDigest()
+ {
+ ReadCommand command = command(key(), metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+ ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+ assertFalse(response.isRepairedDigestConclusive());
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest());
+ verifySerDe(response);
+ }
+
+ /*
+ * Digest responses should never include repaired data tracking as we only request
+ * it in read repair or for range queries
+ */
+ @Test (expected = UnsupportedOperationException.class)
+ public void digestResponseErrorsIfRepairedDataDigestRequested()
+ {
+ ReadCommand command = digestCommand(key(), metadata);
+ ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+ assertTrue(response.isDigestResponse());
+ assertFalse(response.mayIncludeRepairedDigest());
+ response.repairedDataDigest();
+ }
+
+ @Test (expected = UnsupportedOperationException.class)
+ public void digestResponseErrorsIfIsConclusiveRequested()
+ {
+ ReadCommand command = digestCommand(key(), metadata);
+ ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+ assertTrue(response.isDigestResponse());
+ assertFalse(response.mayIncludeRepairedDigest());
+ response.isRepairedDigestConclusive();
+ }
+
+ @Test (expected = UnsupportedOperationException.class)
+ public void digestResponseErrorsIfIteratorRequested()
+ {
+ ReadCommand command = digestCommand(key(), metadata);
+ ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata));
+ assertTrue(response.isDigestResponse());
+ assertFalse(response.mayIncludeRepairedDigest());
+ response.makeIterator(command);
+ }
+
+ @Test
+ public void makeDigestDoesntConsiderRepairedDataInfo()
+ {
+ // It shouldn't be possible to get false positive DigestMismatchExceptions based
+ // on differing repaired data tracking info because it isn't requested on initial
+ // requests, only following a digest mismatch. Having a test doesn't hurt though
+ int key = key();
+ ByteBuffer digest1 = digest();
+ ReadCommand command1 = command(key, metadata, digest1, true);
+ ReadResponse response1 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata));
+
+ ByteBuffer digest2 = digest();
+ ReadCommand command2 = command(key, metadata, digest2, false);
+ ReadResponse response2 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata));
+
+ assertEquals(response1.digest(command1), response2.digest(command2));
+ }
+
+ private void verifySerDe(ReadResponse response) {
+ // check that roundtripping through ReadResponse.serializer behaves as expected.
+ // ReadResponses from pre-4.0 nodes will never contain repaired data digest
+ // or pending session info, but we run all messages through both pre/post 4.0
+ // serde to check that the defaults are correctly applied
+ roundTripSerialization(response, MessagingService.current_version);
+ roundTripSerialization(response, MessagingService.VERSION_30);
+
+ }
+
+ private void roundTripSerialization(ReadResponse response, int version)
+ {
+ try
+ {
+ DataOutputBuffer out = new DataOutputBuffer();
+ ReadResponse.serializer.serialize(response, out, version);
+
+ DataInputBuffer in = new DataInputBuffer(out.buffer(), false);
+ ReadResponse deser = ReadResponse.serializer.deserialize(in, version);
+ if (version < MessagingService.VERSION_40)
+ {
+ assertFalse(deser.mayIncludeRepairedDigest());
+ // even though that means they should never be used, verify that the default values are present
+ assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, deser.repairedDataDigest());
+ assertTrue(deser.isRepairedDigestConclusive());
+ }
+ else
+ {
+ assertTrue(deser.mayIncludeRepairedDigest());
+ assertEquals(response.repairedDataDigest(), deser.repairedDataDigest());
+ assertEquals(response.isRepairedDigestConclusive(), deser.isRepairedDigestConclusive());
+ }
+ }
+ catch (IOException e)
+ {
+ fail("Caught unexpected IOException during SerDe: " + e.getMessage());
+ }
+ }
+
+
+ private int key()
+ {
+ return random.nextInt();
+ }
+
+ private ByteBuffer digest()
+ {
+ byte[] bytes = new byte[4];
+ random.nextBytes(bytes);
+ return ByteBuffer.wrap(bytes);
+ }
+
+ private ReadCommand digestCommand(int key, TableMetadata metadata)
+ {
+ return new StubReadCommand(key, metadata, true, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+ }
+
+ private ReadCommand command(int key, TableMetadata metadata, ByteBuffer repairedDigest, boolean conclusive)
+ {
+ return new StubReadCommand(key, metadata, false, repairedDigest, conclusive);
+ }
+
+ private static class StubReadCommand extends SinglePartitionReadCommand
+ {
+
+ private final ByteBuffer repairedDigest;
+ private final boolean conclusive;
+
+ StubReadCommand(int key, TableMetadata metadata,
+ boolean isDigest,
+ final ByteBuffer repairedDigest,
+ final boolean conclusive)
+ {
+ super(isDigest,
+ 0,
+ false,
+ metadata,
+ FBUtilities.nowInSeconds(),
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+ null,
+ null);
+ this.repairedDigest = repairedDigest;
+ this.conclusive = conclusive;
+ }
+
+ @Override
+ public ByteBuffer getRepairedDataDigest()
+ {
+ return repairedDigest;
+ }
+
+ @Override
+ public boolean isRepairedDataDigestConclusive()
+ {
+ return conclusive;
+ }
+
+ public UnfilteredPartitionIterator executeLocally(ReadExecutionController controller)
+ {
+ return EmptyIterators.unfilteredPartition(this.metadata());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
index c6f2232..582aff8 100644
--- a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.service.reads;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.SortedSet;
@@ -214,15 +215,38 @@ public abstract class AbstractReadResponseTest
return dk(Integer.toString(k));
}
- static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean digest)
+
+ static MessageIn<ReadResponse> response(ReadCommand command,
+ InetAddressAndPort from,
+ UnfilteredPartitionIterator data,
+ boolean isDigestResponse,
+ int fromVersion,
+ ByteBuffer repairedDataDigest,
+ boolean hasPendingRepair)
+ {
+ ReadResponse response = isDigestResponse
+ ? ReadResponse.createDigestResponse(data, command)
+ : ReadResponse.createRemoteDataResponse(data, repairedDataDigest, hasPendingRepair, command, fromVersion);
+ return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, fromVersion);
+ }
+
+ static MessageIn<ReadResponse> response(InetAddressAndPort from,
+ UnfilteredPartitionIterator partitionIterator,
+ ByteBuffer repairedDigest,
+ boolean hasPendingRepair,
+ ReadCommand cmd)
+ {
+ return response(cmd, from, partitionIterator, false, MessagingService.current_version, repairedDigest, hasPendingRepair);
+ }
+
+ static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean isDigestResponse)
{
- ReadResponse response = digest ? ReadResponse.createDigestResponse(data, command) : ReadResponse.createDataResponse(data, command);
- return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, MessagingService.current_version);
+ return response(command, from, data, false, MessagingService.current_version, ByteBufferUtil.EMPTY_BYTE_BUFFER, isDigestResponse);
}
static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data)
{
- return response(command, from, data, false);
+ return response(command, from, data, false, MessagingService.current_version, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
}
public RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index abec25d..968ef16 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -55,8 +55,13 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
+import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
import org.apache.cassandra.service.reads.repair.TestableReadRepair;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -81,6 +86,7 @@ public class DataResolverTest extends AbstractReadResponseTest
public void setup()
{
command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
+ command.trackRepairedStatus();
readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
}
@@ -586,6 +592,7 @@ public class DataResolverTest extends AbstractReadResponseTest
public void testRepairRangeTombstoneWithPartitionDeletion()
{
EndpointsForRange replicas = makeReplicas(2);
+
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
@@ -898,6 +905,347 @@ public class DataResolverTest extends AbstractReadResponseTest
Assert.assertNull(readRepair.sent.get(peer2));
}
+ /** Tests for repaired data tracking */
+
+ @Test
+ public void trackMatchingEmptyDigestsWithAllConclusive()
+ {
+ EndpointsForRange replicas = makeReplicas(2);
+ ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, true);
+ verifier.expectDigest(peer2, digest1, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMatchingEmptyDigestsWithSomeConclusive()
+ {
+ ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ EndpointsForRange replicas = makeReplicas(2);
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, false);
+ verifier.expectDigest(peer2, digest1, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMatchingEmptyDigestsWithNoneConclusive()
+ {
+ ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ EndpointsForRange replicas = makeReplicas(2);
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, false);
+ verifier.expectDigest(peer2, digest1, false);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMatchingDigestsWithAllConclusive()
+ {
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ EndpointsForRange replicas = makeReplicas(2);
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, true);
+ verifier.expectDigest(peer2, digest1, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMatchingDigestsWithSomeConclusive()
+ {
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ EndpointsForRange replicas = makeReplicas(2);
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, true);
+ verifier.expectDigest(peer2, digest1, false);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMatchingDigestsWithNoneConclusive()
+ {
+ EndpointsForRange replicas = makeReplicas(2);
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, false);
+ verifier.expectDigest(peer2, digest1, false);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMatchingRepairedDigestsWithDifferentData()
+ {
+ // As far as repaired data tracking is concerned, the actual data in the response is not relevant
+ EndpointsForRange replicas = makeReplicas(2);
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, true);
+ verifier.expectDigest(peer2, digest1, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMismatchingRepairedDigestsWithAllConclusive()
+ {
+ EndpointsForRange replicas = makeReplicas(2);
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, true);
+ verifier.expectDigest(peer2, digest2, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMismatchingRepairedDigestsWithSomeConclusive()
+ {
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+ EndpointsForRange replicas = makeReplicas(2);
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, false);
+ verifier.expectDigest(peer2, digest2, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMismatchingRepairedDigestsWithNoneConclusive()
+ {
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+ EndpointsForRange replicas = makeReplicas(2);
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, false);
+ verifier.expectDigest(peer2, digest2, false);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, false, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void trackMismatchingRepairedDigestsWithDifferentData()
+ {
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+ EndpointsForRange replicas = makeReplicas(2);
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, true);
+ verifier.expectDigest(peer2, digest2, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void noVerificationForSingletonResponse()
+ {
+ // for CL <= 1 a coordinator shouldn't request repaired data tracking but we
+ // can easily assert that the verification isn't attempted even if it did
+ EndpointsForRange replicas = makeReplicas(2);
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ verifier.expectDigest(peer1, digest1, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+
+ resolveAndConsume(resolver);
+ assertFalse(verifier.verified);
+ }
+
+ @Test
+ public void responsesFromOlderVersionsAreNotTracked()
+ {
+ // In a mixed version cluster, responses from a replicas running older versions won't include
+ // tracking info, so the digest and pending session status are defaulted. To make sure these
+ // default values don't result in false positives we make sure not to consider them when
+ // processing in DataResolver
+ EndpointsForRange replicas = makeReplicas(2);
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ verifier.expectDigest(peer1, digest1, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+ // peer2 is advertising an older version, so when we deserialize its response there are two things to note:
+ // i) the actual serialized response cannot contain any tracking info so deserialization will use defaults of
+ // an empty digest and pending sessions = false
+ // ii) under normal circumstances, this would cause a mismatch with peer1, but because of the older version,
+ // here it will not
+ resolver.preprocess(response(command, peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)),
+ false, MessagingService.VERSION_30,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER, false));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ @Test
+ public void responsesFromTransientReplicasAreNotTracked()
+ {
+ EndpointsForRange replicas = makeReplicas(2);
+ EndpointsForRange.Mutable mutable = replicas.newMutable(2);
+ mutable.add(replicas.get(0));
+ mutable.add(Replica.transientReplica(replicas.get(1).endpoint(), replicas.range()));
+ replicas = mutable.asSnapshot();
+
+ TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
+ ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
+ ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ verifier.expectDigest(peer1, digest1, true);
+
+ DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
+
+ resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
+ resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest2, true, command));
+
+ resolveAndConsume(resolver);
+ assertTrue(verifier.verified);
+ }
+
+ private static class TestRepairedDataVerifier implements RepairedDataVerifier
+ {
+ private final RepairedDataTracker expected = new RepairedDataTracker(null);
+ private boolean verified = false;
+
+ private void expectDigest(InetAddressAndPort from, ByteBuffer digest, boolean conclusive)
+ {
+ expected.recordDigest(from, digest, conclusive);
+ }
+
+ @Override
+ public void verify(RepairedDataTracker tracker)
+ {
+ verified = expected.equals(tracker);
+ }
+ }
+
+ private DataResolver resolverWithVerifier(final ReadCommand command,
+ final ReplicaLayout.ForRange plan,
+ final ReadRepair readRepair,
+ final long queryStartNanoTime,
+ final RepairedDataVerifier verifier)
+ {
+ class TestableDataResolver extends DataResolver
+ {
+
+ public TestableDataResolver(ReadCommand command, ReplicaLayout.ForRange plan, ReadRepair readRepair, long queryStartNanoTime)
+ {
+ super(command, plan, readRepair, queryStartNanoTime);
+ }
+
+ protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
+ {
+ return verifier;
+ }
+ }
+
+ return new TestableDataResolver(command, plan, readRepair, queryStartNanoTime);
+ }
+
private void assertRepairContainsDeletions(Mutation mutation,
DeletionTime deletionTime,
RangeTombstone...rangeTombstones)
@@ -954,4 +1302,19 @@ public class DataResolverTest extends AbstractReadResponseTest
{
return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas);
}
+
+ private static void resolveAndConsume(DataResolver resolver)
+ {
+ try (PartitionIterator iterator = resolver.resolve())
+ {
+ while (iterator.hasNext())
+ {
+ try (RowIterator partition = iterator.next())
+ {
+ while (partition.hasNext())
+ partition.next();
+ }
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index a574d02..a4b7615 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -93,10 +93,10 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
ReadCallback readCallback = null;
@Override
- void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+ void sendReadCommand(Replica to, ReadCallback callback)
{
assert readCallback == null || readCallback == callback;
- readCommandRecipients.add(to);
+ readCommandRecipients.add(to.endpoint());
readCallback = callback;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index c345ee6..a5efe27 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -141,7 +141,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
Assert.assertNotNull(e.toMap());
}
- void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+ void sendReadCommand(Replica to, ReadCallback callback)
{
assert readCallback == null || readCallback == callback;
readCallback = callback;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index 9bb7eed..bee5ddd 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.service.reads.ReadCallback;
@@ -46,10 +47,10 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
ReadCallback readCallback = null;
@Override
- void sendReadCommand(InetAddressAndPort to, ReadCallback callback)
+ void sendReadCommand(Replica to, ReadCallback callback)
{
assert readCallback == null || readCallback == callback;
- readCommandRecipients.add(to);
+ readCommandRecipients.add(to.endpoint());
readCallback = callback;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
new file mode 100644
index 0000000..c916d13
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.net.UnknownHostException;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class RepairedDataVerifierTest
+{
+ private static final String TEST_NAME = "read_command_vh_test_";
+ private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+ private static final String TABLE = "table1";
+
+ private final Random random = new Random();
+ private TableMetadata metadata;
+ private TableMetrics metrics;
+
+ // counter to generate the last byte of peer addresses
+ private int addressSuffix = 10;
+
+ @BeforeClass
+ public static void init()
+ {
+ SchemaLoader.loadSchema();
+ SchemaLoader.schemaDefinition(TEST_NAME);
+ DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+ }
+
+ @Before
+ public void setup()
+ {
+ metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+ metrics = ColumnFamilyStore.metricsFor(metadata.id);
+ }
+
+ @Test
+ public void repairedDataMismatchWithSomeConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount + 1 , unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMismatchWithNoneConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount + 1 , unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMismatchWithAllConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
+
+ tracker.verify();
+ assertEquals(confirmedCount + 1, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMatchesWithAllConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMatchesWithSomeConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMatchesWithNoneConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void allEmptyDigestWithAllConclusive()
+ {
+ // if a read didn't touch any repaired sstables, digests will be empty
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+ tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void allEmptyDigestsWithSomeConclusive()
+ {
+ // if a read didn't touch any repaired sstables, digests will be empty
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+ tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void allEmptyDigestsWithNoneConclusive()
+ {
+ // if a read didn't touch any repaired sstables, digests will be empty
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+ tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void noTrackingDataRecorded()
+ {
+ // if a read didn't land on any replicas which support repaired data tracking, nothing will be recorded
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ private long confirmedCount()
+ {
+ return metrics.confirmedRepairedInconsistencies.table.getCount();
+ }
+
+ private long unconfirmedCount()
+ {
+ return metrics.unconfirmedRepairedInconsistencies.table.getCount();
+ }
+
+ private InetAddressAndPort peer()
+ {
+ try
+ {
+ return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private int key()
+ {
+ return random.nextInt();
+ }
+
+ private ReadCommand command(int key)
+ {
+ return new StubReadCommand(key, metadata, false);
+ }
+
+ private static class StubReadCommand extends SinglePartitionReadCommand
+ {
+ StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
+ {
+ super(isDigest,
+ 0,
+ false,
+ metadata,
+ FBUtilities.nowInSeconds(),
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
+ null,
+ null);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/2] cassandra git commit: Detect inconsistencies in repaired data
on the read path
Posted by sa...@apache.org.
Detect inconsistencies in repaired data on the read path
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Jordan West for CASSANDRA-14145
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5fbb938a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5fbb938a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5fbb938a
Branch: refs/heads/trunk
Commit: 5fbb938adaafd91e7bea1672f09a03c7ac5b9b9d
Parents: 744973e
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed May 9 18:57:30 2018 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 5 19:07:12 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 18 +
.../org/apache/cassandra/config/Config.java | 20 +
.../cassandra/config/DatabaseDescriptor.java | 29 ++
.../cassandra/db/PartitionRangeReadCommand.java | 33 +-
.../org/apache/cassandra/db/ReadCommand.java | 346 +++++++++++++++++-
.../cassandra/db/ReadCommandVerbHandler.java | 4 +
.../org/apache/cassandra/db/ReadResponse.java | 123 ++++++-
.../db/SinglePartitionReadCommand.java | 60 +--
.../db/filter/ClusteringIndexNamesFilter.java | 14 +
.../db/rows/UnfilteredRowIterators.java | 1 +
.../cassandra/metrics/KeyspaceMetrics.java | 22 ++
.../apache/cassandra/metrics/TableMetrics.java | 50 +++
.../org/apache/cassandra/net/ParameterType.java | 3 +-
.../repair/consistent/LocalSessions.java | 17 +
.../apache/cassandra/service/StorageProxy.java | 76 +++-
.../cassandra/service/StorageProxyMBean.java | 15 +
.../cassandra/service/reads/DataResolver.java | 44 ++-
.../service/reads/ShortReadProtection.java | 1 +
.../reads/repair/AbstractReadRepair.java | 21 +-
.../reads/repair/RepairedDataTracker.java | 87 +++++
.../reads/repair/RepairedDataVerifier.java | 95 +++++
.../apache/cassandra/db/ReadCommandTest.java | 287 ++++++++++++++-
.../db/ReadCommandVerbHandlerTest.java | 171 +++++++++
.../apache/cassandra/db/ReadResponseTest.java | 261 +++++++++++++
.../service/reads/AbstractReadResponseTest.java | 32 +-
.../service/reads/DataResolverTest.java | 363 +++++++++++++++++++
.../reads/repair/BlockingReadRepairTest.java | 4 +-
.../DiagEventsBlockingReadRepairTest.java | 2 +-
.../reads/repair/ReadOnlyReadRepairTest.java | 5 +-
.../reads/repair/RepairedDataVerifierTest.java | 291 +++++++++++++++
31 files changed, 2399 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 301f97f..5cfc7ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
* Add checksumming to the native protocol (CASSANDRA-13304)
* Make AuthCache more easily extendable (CASSANDRA-14662)
* Extend RolesCache to include detailed role info (CASSANDRA-14497)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 995a520..190ce77 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1238,3 +1238,21 @@ diagnostic_events_enabled: false
# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
#native_transport_flush_in_batches_legacy: false
+
+# Enable tracking of repaired state of data during reads and comparison between replicas
+# Mismatches between the repaired sets of replicas can be characterized as either confirmed
+# or unconfirmed. In this context, unconfirmed indicates that the presence of pending repair
+# sessions, unrepaired partition tombstones, or some other condition means that the disparity
+# cannot be considered conclusive. Confirmed mismatches should be a trigger for investigation
+# as they may be indicative of corruption or data loss.
+# There are separate flags for range vs partition reads as single partition reads are only tracked
+# when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+# enabled for range reads, all range reads will include repaired data tracking. As this adds
+# some overhead, operators may wish to disable it whilst still enabling it for partition reads
+repaired_data_tracking_for_range_reads_enabled: false
+repaired_data_tracking_for_partition_reads_enabled: false
+# If false, only confirmed mismatches will be reported. If true, a separate metric for unconfirmed
+# mismatches will also be recorded. This is to avoid potential signal:noise issues are unconfirmed
+# mismatches are less actionable than confirmed ones.
+report_unconfirmed_repaired_data_mismatches: false
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b04f9ec..782815e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -395,6 +395,26 @@ public class Config
public volatile boolean diagnostic_events_enabled = false;
/**
+ * flags for enabling tracking repaired state of data during reads
+ * separate flags for range & single partition reads as single partition reads are only tracked
+ * when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+ * enabled for range reads, all such reads will include repaired data tracking. As this adds
+ * some overhead, operators may wish to disable it whilst still enabling it for partition reads
+ */
+ public volatile boolean repaired_data_tracking_for_range_reads_enabled = false;
+ public volatile boolean repaired_data_tracking_for_partition_reads_enabled = false;
+ /* If true, unconfirmed mismatches (those which cannot be considered conclusive proof of out of
+ * sync repaired data due to the presence of pending repair sessions, or unrepaired partition
+ * deletes) will increment a metric, distinct from confirmed mismatches. If false, unconfirmed
+ * mismatches are simply ignored by the coordinator.
+ * This is purely to allow operators to avoid potential signal:noise issues as these types of
+ * mismatches are considerably less actionable than their confirmed counterparts. Setting this
+ * to true only disables the incrementing of the counters when an unconfirmed mismatch is found
+ * and has no other effect on the collection or processing of the repaired data.
+ */
+ public volatile boolean report_unconfirmed_repaired_data_mismatches = false;
+
+ /**
* @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
*/
@Deprecated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ddea8f4..2ad9b18 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2672,4 +2672,33 @@ public class DatabaseDescriptor
conf.corrupted_tombstone_strategy = strategy;
}
+ public static boolean getRepairedDataTrackingForRangeReadsEnabled()
+ {
+ return conf.repaired_data_tracking_for_range_reads_enabled;
+ }
+
+ public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled)
+ {
+ conf.repaired_data_tracking_for_range_reads_enabled = enabled;
+ }
+
+ public static boolean getRepairedDataTrackingForPartitionReadsEnabled()
+ {
+ return conf.repaired_data_tracking_for_partition_reads_enabled;
+ }
+
+ public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled)
+ {
+ conf.repaired_data_tracking_for_partition_reads_enabled = enabled;
+ }
+
+ public static boolean reportUnconfirmedRepairedDataMismatches()
+ {
+ return conf.report_unconfirmed_repaired_data_mismatches;
+ }
+
+ public static void reportUnconfirmedRepairedDataMismatches(boolean enabled)
+ {
+ conf.report_unconfirmed_repaired_data_mismatches = enabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 7eab016..79db18a 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -18,12 +18,10 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.*;
@@ -46,7 +44,6 @@ import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
/**
* A read command that selects a (part of a) range of partitions.
@@ -56,7 +53,6 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
private final DataRange dataRange;
- private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
private PartitionRangeReadCommand(boolean isDigest,
int digestVersion,
@@ -257,8 +253,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
- final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
-
+ InputCollector<UnfilteredPartitionIterator> inputCollector = iteratorsForRange(view);
try
{
for (Memtable memtable : view.memtables)
@@ -266,7 +261,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange());
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
- iterators.add(iter);
+ inputCollector.addMemtableIterator(iter);
}
SSTableReadsListener readCountUpdater = newReadCountUpdater();
@@ -274,25 +269,27 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater);
- iterators.add(iter);
+ inputCollector.addSSTableIterator(sstable, iter);
+
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
}
// iterators can be empty for offline tools
- return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata())
- : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators), cfs);
+ if (inputCollector.isEmpty())
+ return EmptyIterators.unfilteredPartition(metadata());
+
+ return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators()), cfs);
}
catch (RuntimeException | Error e)
{
try
{
- FBUtilities.closeAll(iterators);
+ inputCollector.close();
}
- catch (Exception suppressed)
+ catch (Exception e1)
{
- e.addSuppressed(suppressed);
+ e.addSuppressed(e1);
}
-
throw e;
}
}
@@ -313,12 +310,6 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
};
}
- @Override
- protected int oldestUnrepairedTombstone()
- {
- return oldestUnrepairedTombstone;
- }
-
private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
{
class CacheFilter extends Transformation
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 736e3a3..e146b8a 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,10 +18,17 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.BiFunction;
import java.util.function.LongPredicate;
import javax.annotation.Nullable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +46,7 @@ import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexNotAvailableException;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
@@ -48,9 +56,12 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingUtils;
/**
* General interface for storage-engine read commands (common to both range and
@@ -71,6 +82,23 @@ public abstract class ReadCommand extends AbstractReadQuery
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
private int digestVersion;
+ // for data queries, coordinators may request information on the repaired data used in constructing the response
+ private boolean trackRepairedStatus = false;
+ // tracker for repaired data, initialized to singelton null object
+ private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
+ {
+ void trackPartitionKey(DecoratedKey key){}
+ void trackDeletion(DeletionTime deletion){}
+ void trackRangeTombstoneMarker(RangeTombstoneMarker marker){}
+ void trackRow(Row row){}
+ boolean isConclusive(){ return true; }
+ ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
+ };
+
+ private RepairedDataInfo repairedDataInfo = NULL_REPAIRED_DATA_INFO;
+
+ int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
@Nullable
private final IndexMetadata index;
@@ -187,6 +215,68 @@ public abstract class ReadCommand extends AbstractReadQuery
}
/**
+ * Activates repaired data tracking for this command.
+ *
+ * When active, a digest will be created from data read from repaired SSTables. The digests
+ * from each replica can then be compared on the coordinator to detect any divergence in their
+ * repaired datasets. In this context, an sstable is considered repaired if it is marked
+ * repaired or has a pending repair session which has been committed.
+ * In addition to the digest, a set of ids for any pending but as yet uncommitted repair sessions
+ * is recorded and returned to the coordinator. This is to help reduce false positives caused
+ * by compaction lagging which can leave sstables from committed sessions in the pending state
+ * for a time.
+ */
+ public void trackRepairedStatus()
+ {
+ trackRepairedStatus = true;
+ }
+
+ /**
+ * Whether or not repaired status of any data read is being tracked or not
+ *
+ * @return Whether repaired status tracking is active for this command
+ */
+ public boolean isTrackingRepairedStatus()
+ {
+ return trackRepairedStatus;
+ }
+
+ /**
+ * Returns a digest of the repaired data read in the execution of this command.
+ *
+ * If either repaired status tracking is not active or the command has not yet been
+ * executed, then this digest will be an empty buffer.
+ * Otherwise, it will contain a digest* of the repaired data read, or empty buffer
+ * if no repaired data was read.
+ * @return digest of the repaired data read in the execution of the command
+ */
+ public ByteBuffer getRepairedDataDigest()
+ {
+ return repairedDataInfo.getDigest();
+ }
+
+ /**
+ * Returns a boolean indicating whether any relevant sstables were skipped during the read
+ * that produced the repaired data digest.
+ *
+ * If true, then no pending repair sessions or partition deletes have influenced the extent
+ * of the repaired sstables that went into generating the digest.
+ * This indicates whether or not the digest can reliably be used to infer consistency
+ * issues between the repaired sets across replicas.
+ *
+ * If either repaired status tracking is not active or the command has not yet been
+ * executed, then this will always return true.
+ *
+ * @return boolean to indicate confidence in the dwhether or not the digest of the repaired data can be
+ * reliably be used to infer inconsistency issues between the repaired sets across
+ * replicas.
+ */
+ public boolean isRepairedDataDigestConclusive()
+ {
+ return repairedDataInfo.isConclusive();
+ }
+
+ /**
* Index (metadata) chosen for this query. Can be null.
*
* @return index (metadata) chosen for this query
@@ -225,7 +315,11 @@ public abstract class ReadCommand extends AbstractReadQuery
protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
- protected abstract int oldestUnrepairedTombstone();
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
@SuppressWarnings("resource")
public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
@@ -305,6 +399,9 @@ public abstract class ReadCommand extends AbstractReadQuery
Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
}
+ if (isTrackingRepairedStatus())
+ repairedDataInfo = new RepairedDataInfo();
+
UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
try
@@ -569,6 +666,253 @@ public abstract class ReadCommand extends AbstractReadQuery
return toCQLString();
}
+ private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
+ final RepairedDataInfo repairedDataInfo)
+ {
+ class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
+ {
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ return withRepairedDataInfo(partition, repairedDataInfo);
+ }
+ }
+
+ return Transformation.apply(iterator, new WithRepairedDataTracking());
+ }
+
+ private static UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator,
+ final RepairedDataInfo repairedDataInfo)
+ {
+ class WithTracking extends Transformation
+ {
+ protected DecoratedKey applyToPartitionKey(DecoratedKey key)
+ {
+ repairedDataInfo.trackPartitionKey(key);
+ return key;
+ }
+
+ protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ repairedDataInfo.trackDeletion(deletionTime);
+ return deletionTime;
+ }
+
+ protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ repairedDataInfo.trackRangeTombstoneMarker(marker);
+ return marker;
+ }
+
+ protected Row applyToStatic(Row row)
+ {
+ repairedDataInfo.trackRow(row);
+ return row;
+ }
+
+ protected Row applyToRow(Row row)
+ {
+ repairedDataInfo.trackRow(row);
+ return row;
+ }
+ }
+
+ return Transformation.apply(iterator, new WithTracking());
+ }
+
+ private static class RepairedDataInfo
+ {
+ private Hasher hasher;
+ private boolean isConclusive = true;
+
+ ByteBuffer getDigest()
+ {
+ return hasher == null
+ ? ByteBufferUtil.EMPTY_BYTE_BUFFER
+ : ByteBuffer.wrap(getHasher().hash().asBytes());
+ }
+
+ boolean isConclusive()
+ {
+ return isConclusive;
+ }
+
+ void markInconclusive()
+ {
+ isConclusive = false;
+ }
+
+ void trackPartitionKey(DecoratedKey key)
+ {
+ HashingUtils.updateBytes(getHasher(), key.getKey().duplicate());
+ }
+
+ void trackDeletion(DeletionTime deletion)
+ {
+ deletion.digest(getHasher());
+ }
+
+ void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
+ {
+ marker.digest(getHasher());
+ }
+
+ void trackRow(Row row)
+ {
+ row.digest(getHasher());
+ }
+
+ private Hasher getHasher()
+ {
+ if (hasher == null)
+ hasher = Hashing.crc32c().newHasher();
+
+ return hasher;
+ }
+ }
+
+ @SuppressWarnings("resource") // resultant iterators are closed by their callers
+ InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view)
+ {
+ BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
+ (unfilteredRowIterators, repairedDataInfo) ->
+ withRepairedDataInfo(UnfilteredRowIterators.merge(unfilteredRowIterators), repairedDataInfo);
+
+ return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+ }
+
+ @SuppressWarnings("resource") // resultant iterators are closed by their callers
+ InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view)
+ {
+ BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
+ (unfilteredPartitionIterators, repairedDataInfo) ->
+ withRepairedDataInfo(UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, UnfilteredPartitionIterators.MergeListener.NOOP), repairedDataInfo);
+
+ return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+ }
+
+ /**
+ * Handles the collation of unfiltered row or partition iterators that comprise the
+ * input for a query. Separates them according to repaired status and of repaired
+ * status is being tracked, handles the merge and wrapping in a digest generator of
+ * the repaired iterators.
+ *
+ * Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks
+ * as this prematurely closes the underlying iterators
+ */
+ static class InputCollector<T extends AutoCloseable>
+ {
+ final RepairedDataInfo repairedDataInfo;
+ private final boolean isTrackingRepairedStatus;
+ Set<SSTableReader> repairedSSTables;
+ BiFunction<List<T>, RepairedDataInfo, T> repairedMerger;
+ List<T> repairedIters;
+ List<T> unrepairedIters;
+
+ InputCollector(ColumnFamilyStore.ViewFragment view,
+ RepairedDataInfo repairedDataInfo,
+ BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
+ boolean isTrackingRepairedStatus)
+ {
+ this.repairedDataInfo = repairedDataInfo;
+ this.isTrackingRepairedStatus = isTrackingRepairedStatus;
+ if (isTrackingRepairedStatus)
+ {
+ for (SSTableReader sstable : view.sstables)
+ {
+ if (considerRepairedForTracking(sstable))
+ {
+ if (repairedSSTables == null)
+ repairedSSTables = Sets.newHashSetWithExpectedSize(view.sstables.size());
+ repairedSSTables.add(sstable);
+ }
+ }
+ }
+ if (repairedSSTables == null)
+ {
+ repairedIters = Collections.emptyList();
+ unrepairedIters = new ArrayList<>(view.sstables.size());
+ }
+ else
+ {
+ repairedIters = new ArrayList<>(repairedSSTables.size());
+ // when we're done collating, we'll merge the repaired iters and add the
+ // result to the unrepaired list, so size that list accordingly
+ unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
+ }
+ this.repairedMerger = repairedMerger;
+ }
+
+ void addMemtableIterator(T iter)
+ {
+ unrepairedIters.add(iter);
+ }
+
+ void addSSTableIterator(SSTableReader sstable, T iter)
+ {
+ if (repairedSSTables != null && repairedSSTables.contains(sstable))
+ repairedIters.add(iter);
+ else
+ unrepairedIters.add(iter);
+ }
+
+ List<T> finalizeIterators()
+ {
+ if (repairedIters.isEmpty())
+ return unrepairedIters;
+
+ // merge the repaired data before returning, wrapping in a digest generator
+ unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
+ return unrepairedIters;
+ }
+
+ boolean isEmpty()
+ {
+ return repairedIters.isEmpty() && unrepairedIters.isEmpty();
+ }
+
+ // For tracking purposes we consider data repaired if the sstable is either:
+ // * marked repaired
+ // * marked pending, but the local session has been committed. This reduces the window
+ // whereby the tracking is affected by compaction backlog causing repaired sstables to
+ // remain in the pending state
+ // If an sstable is involved in a pending repair which is not yet committed, we mark the
+ // repaired data info inconclusive, as the same data on other replicas may be in a
+ // slightly different state.
+ private boolean considerRepairedForTracking(SSTableReader sstable)
+ {
+ if (!isTrackingRepairedStatus)
+ return false;
+
+ UUID pendingRepair = sstable.getPendingRepair();
+ if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
+ {
+ if (ActiveRepairService.instance.consistent.local.isSessionFinalized(pendingRepair))
+ return true;
+
+ // In the edge case where compaction is backed up long enough for the session to
+ // timeout and be purged by LocalSessions::cleanup, consider the sstable unrepaired
+ // as it will be marked unrepaired when compaction catches up
+ if (!ActiveRepairService.instance.consistent.local.sessionExists(pendingRepair))
+ return false;
+
+ repairedDataInfo.markInconclusive();
+ }
+
+ return sstable.isRepaired();
+ }
+
+ void markInconclusive()
+ {
+ repairedDataInfo.markInconclusive();
+ }
+
+ public void close() throws Exception
+ {
+ FBUtilities.closeAll(unrepairedIters);
+ FBUtilities.closeAll(repairedIters);
+ }
+ }
+
private static class Serializer implements IVersionedSerializer<ReadCommand>
{
private static int digestFlag(boolean isDigest)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index a71e92d..1b28c2c 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
@@ -43,6 +44,9 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
ReadCommand command = message.payload;
command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout());
+ if (message.parameters.containsKey(ParameterType.TRACK_REPAIRED_DATA))
+ command.trackRepairedStatus();
+
ReadResponse response;
try (ReadExecutionController executionController = command.executionController();
UnfilteredPartitionIterator iterator = command.executeLocally(executionController))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 486980d..2ddb6a7 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -51,11 +51,19 @@ public abstract class ReadResponse
}
@VisibleForTesting
- public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
+ public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data,
+ ByteBuffer repairedDataDigest,
+ boolean isRepairedDigestConclusive,
+ ReadCommand command,
+ int version)
{
- return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()), MessagingService.current_version);
+ return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()),
+ repairedDataDigest,
+ isRepairedDigestConclusive,
+ version);
}
+
public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command)
{
return new DigestResponse(makeDigest(data, command));
@@ -63,6 +71,9 @@ public abstract class ReadResponse
public abstract UnfilteredPartitionIterator makeIterator(ReadCommand command);
public abstract ByteBuffer digest(ReadCommand command);
+ public abstract ByteBuffer repairedDataDigest();
+ public abstract boolean isRepairedDigestConclusive();
+ public abstract boolean mayIncludeRepairedDigest();
public abstract boolean isDigestResponse();
@@ -85,18 +96,22 @@ public abstract class ReadResponse
}
}
}
- return "<key " + key + " not found>";
+ return String.format("<key %s not found (repaired_digest=%s repaired_digest_conclusive=%s)>",
+ key, ByteBufferUtil.bytesToHex(repairedDataDigest()), isRepairedDigestConclusive());
}
private String toDebugString(UnfilteredRowIterator partition, TableMetadata metadata)
{
StringBuilder sb = new StringBuilder();
- sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s",
+ sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s repaired_digest=%s repaired_digest_conclusive==%s",
metadata,
metadata.partitionKeyType.getString(partition.partitionKey().getKey()),
partition.partitionLevelDeletion(),
- partition.columns()));
+ partition.columns(),
+ ByteBufferUtil.bytesToHex(repairedDataDigest()),
+ isRepairedDigestConclusive()
+ ));
if (partition.staticRow() != Rows.EMPTY_STATIC_ROW)
sb.append("\n ").append(partition.staticRow().toString(metadata, true));
@@ -130,6 +145,21 @@ public abstract class ReadResponse
throw new UnsupportedOperationException();
}
+ public boolean mayIncludeRepairedDigest()
+ {
+ return false;
+ }
+
+ public ByteBuffer repairedDataDigest()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isRepairedDigestConclusive()
+ {
+ throw new UnsupportedOperationException();
+ }
+
public ByteBuffer digest(ReadCommand command)
{
// We assume that the digest is in the proper version, which bug excluded should be true since this is called with
@@ -150,7 +180,11 @@ public abstract class ReadResponse
{
private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
{
- super(build(iter, command.columnFilter()), MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+ super(build(iter, command.columnFilter()),
+ command.getRepairedDataDigest(),
+ command.isRepairedDataDigestConclusive(),
+ MessagingService.current_version,
+ SerializationHelper.Flag.LOCAL);
}
private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
@@ -171,9 +205,12 @@ public abstract class ReadResponse
// built on the coordinator node receiving a response
private static class RemoteDataResponse extends DataResponse
{
- protected RemoteDataResponse(ByteBuffer data, int version)
+ protected RemoteDataResponse(ByteBuffer data,
+ ByteBuffer repairedDataDigest,
+ boolean isRepairedDigestConclusive,
+ int version)
{
- super(data, version, SerializationHelper.Flag.FROM_REMOTE);
+ super(data, repairedDataDigest, isRepairedDigestConclusive, version, SerializationHelper.Flag.FROM_REMOTE);
}
}
@@ -182,13 +219,21 @@ public abstract class ReadResponse
// TODO: can the digest be calculated over the raw bytes now?
// The response, serialized in the current messaging version
private final ByteBuffer data;
+ private final ByteBuffer repairedDataDigest;
+ private final boolean isRepairedDigestConclusive;
private final int dataSerializationVersion;
private final SerializationHelper.Flag flag;
- protected DataResponse(ByteBuffer data, int dataSerializationVersion, SerializationHelper.Flag flag)
+ protected DataResponse(ByteBuffer data,
+ ByteBuffer repairedDataDigest,
+ boolean isRepairedDigestConclusive,
+ int dataSerializationVersion,
+ SerializationHelper.Flag flag)
{
super();
this.data = data;
+ this.repairedDataDigest = repairedDataDigest;
+ this.isRepairedDigestConclusive = isRepairedDigestConclusive;
this.dataSerializationVersion = dataSerializationVersion;
this.flag = flag;
}
@@ -213,6 +258,21 @@ public abstract class ReadResponse
}
}
+ public boolean mayIncludeRepairedDigest()
+ {
+ return dataSerializationVersion >= MessagingService.VERSION_40;
+ }
+
+ public ByteBuffer repairedDataDigest()
+ {
+ return repairedDataDigest;
+ }
+
+ public boolean isRepairedDigestConclusive()
+ {
+ return isRepairedDigestConclusive;
+ }
+
public ByteBuffer digest(ReadCommand command)
{
try (UnfilteredPartitionIterator iterator = makeIterator(command))
@@ -233,10 +293,25 @@ public abstract class ReadResponse
{
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
ByteBufferUtil.writeWithVIntLength(digest, out);
if (!isDigest)
{
+ // From 4.0, a coordinator may request additional info about the repaired data that
+ // makes up the response, namely a digest generated from the repaired data and a
+ // flag indicating our level of confidence in that digest. The digest may be considered
+ // inconclusive if it may have been affected by some unrepaired data during read.
+ // e.g. some sstables read during this read were involved in pending but not yet
+ // committed repair sessions or an unrepaired partition tombstone meant that not all
+ // repaired sstables were read (but they might be on other replicas).
+ // If the coordinator did not request this info, the response contains an empty digest
+ // and a true for the isConclusive flag.
+ // If the messaging version is < 4.0, these are omitted altogether.
+ if (version >= MessagingService.VERSION_40)
+ {
+ ByteBufferUtil.writeWithVIntLength(response.repairedDataDigest(), out);
+ out.writeBoolean(response.isRepairedDigestConclusive());
+ }
+
ByteBuffer data = ((DataResponse)response).data;
ByteBufferUtil.writeWithVIntLength(data, out);
}
@@ -248,18 +323,42 @@ public abstract class ReadResponse
if (digest.hasRemaining())
return new DigestResponse(digest);
+ // A data response may also contain a digest of the portion of its payload
+ // that comes from the replica's repaired set, along with a flag indicating
+ // whether or not the digest may be influenced by unrepaired/pending
+ // repaired data
+ boolean repairedDigestConclusive;
+ if (version >= MessagingService.VERSION_40)
+ {
+ digest = ByteBufferUtil.readWithVIntLength(in);
+ repairedDigestConclusive = in.readBoolean();
+ }
+ else
+ {
+ digest = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ repairedDigestConclusive = true;
+ }
+
ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
- return new RemoteDataResponse(data, version);
+ return new RemoteDataResponse(data, digest, repairedDigestConclusive, version);
}
public long serializedSize(ReadResponse response, int version)
{
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
+
if (!isDigest)
{
+ // From 4.0, a coordinator may request an additional info about the repaired data
+ // that makes up the response.
+ if (version >= MessagingService.VERSION_40)
+ {
+ size += ByteBufferUtil.serializedSizeWithVIntLength(response.repairedDataDigest());
+ size += 1;
+ }
+
// In theory, we should deserialize/re-serialize if the version asked is different from the current
// version as the content could have a different serialization format. So far though, we haven't made
// change to partition iterators serialization since 3.0 so we skip this.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c81185e..e99a487 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.cassandra.cache.IRowCacheEntry;
@@ -44,12 +43,11 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SearchIterator;
@@ -65,8 +63,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
private final DecoratedKey partitionKey;
private final ClusteringIndexFilter clusteringIndexFilter;
- private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
-
@VisibleForTesting
protected SinglePartitionReadCommand(boolean isDigest,
int digestVersion,
@@ -393,7 +389,9 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
@SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
{
- UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+ // skip the row cache and go directly to sstables/memtable if repaired status of
+ // data is being tracked. This is only requested after an initial digest mismatch
+ UnfilteredRowIterator partition = cfs.isRowCacheEnabled() && !isTrackingRepairedStatus()
? getThroughCache(cfs, executionController)
: queryMemtableAndDisk(cfs, executionController);
return new SingletonUnfilteredPartitionIterator(partition);
@@ -564,12 +562,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
return queryMemtableAndDiskInternal(cfs);
}
- @Override
- protected int oldestUnrepairedTombstone()
- {
- return oldestUnrepairedTombstone;
- }
-
private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs)
{
/*
@@ -582,16 +574,19 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
* and if we have neither non-frozen collections/UDTs nor counters (indeed, for a non-frozen collection or UDT,
* we can't guarantee an older sstable won't have some elements that weren't in the most recent sstables,
* and counters are intrinsically a collection of shards and so have the same problem).
+ * Also, if tracking repaired data then we skip this optimization so we can collate the repaired sstables
+ * and generate a digest over their merge, which procludes an early return.
*/
- if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType())
+ if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType() && !isTrackingRepairedStatus())
return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter());
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
- List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
-
+ long mostRecentPartitionTombstone = Long.MIN_VALUE;
+ InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view);
try
{
for (Memtable memtable : view.memtables)
@@ -604,8 +599,13 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+
+ // Memtable data is always considered unrepaired
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
- iterators.add(iter);
+ inputCollector.addMemtableIterator(iter);
+
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+ iter.partitionLevelDeletion().markedForDeleteAt());
}
/*
@@ -620,18 +620,25 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
* In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
* in one pass, and minimize the number of sstables for which we read a partition tombstone.
*/
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
- long mostRecentPartitionTombstone = Long.MIN_VALUE;
int nonIntersectingSSTables = 0;
List<SSTableReader> skippedSSTablesWithTombstones = null;
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
+ if (isTrackingRepairedStatus())
+ Tracing.trace("Collecting data from sstables and tracking repaired status");
+
for (SSTableReader sstable : view.sstables)
{
// if we've already seen a partition tombstone with a timestamp greater
// than the most recent update to this sstable, we can skip it
+ // if we're tracking repaired status, we mark the repaired digest inconclusive
+ // as other replicas may not have seen this partition delete and so could include
+ // data from this sstable (or others) in their digests
if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+ {
+ inputCollector.markInconclusive();
break;
+ }
if (!shouldInclude(sstable))
{
@@ -654,7 +661,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- iterators.add(iter);
+ inputCollector.addSSTableIterator(sstable, iter);
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
iter.partitionLevelDeletion().markedForDeleteAt());
}
@@ -674,7 +681,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- iterators.add(iter);
+ inputCollector.addSSTableIterator(sstable, iter);
includedDueToTombstones++;
}
}
@@ -682,21 +689,22 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
- if (iterators.isEmpty())
+ if (inputCollector.isEmpty())
return EmptyIterators.unfilteredRow(cfs.metadata(), partitionKey(), filter.isReversed());
StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
- return withSSTablesIterated(iterators, cfs.metric, metricsCollector);
+
+ return withSSTablesIterated(inputCollector.finalizeIterators(), cfs.metric, metricsCollector);
}
catch (RuntimeException | Error e)
{
try
{
- FBUtilities.closeAll(iterators);
+ inputCollector.close();
}
- catch (Exception suppressed)
+ catch (Exception e1)
{
- e.addSuppressed(suppressed);
+ e.addSuppressed(e1);
}
throw e;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index 4850fd5..f25dc91 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -206,6 +206,20 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
return sb.toString();
}
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClusteringIndexNamesFilter that = (ClusteringIndexNamesFilter) o;
+ return Objects.equals(clusterings, that.clusterings) &&
+ Objects.equals(reversed, that.reversed);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(clusterings, reversed);
+ }
+
public Kind kind()
{
return Kind.NAMES;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 851e447..42807a2 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -599,4 +599,5 @@ public abstract class UnfilteredRowIterators
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 5a90804..f1df026 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -22,6 +22,7 @@ import java.util.Set;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -123,6 +124,24 @@ public class KeyspaceMetrics
/** histogram over the number of partitions we have validated */
public final Histogram partitionsValidated;
+ /*
+ * Metrics for inconsistencies detected between repaired data sets across replicas. These
+ * are tracked on the coordinator.
+ */
+
+ /**
+ * Incremented where an inconsistency is detected and there are no pending repair sessions affecting
+ * the data being read, indicating a genuine mismatch between replicas' repaired data sets.
+ */
+ public final Meter confirmedRepairedInconsistencies;
+ /**
+ * Incremented where an inconsistency is detected, but there are pending & uncommitted repair sessions
+ * in play on at least one replica. This may indicate a false positive as the inconsistency could be due to
+ * replicas marking the repair session as committed at slightly different times and so some consider it to
+ * be part of the repaired set whilst others do not.
+ */
+ public final Meter unconfirmedRepairedInconsistencies;
+
public final MetricNameFactory factory;
private Keyspace keyspace;
@@ -283,6 +302,9 @@ public class KeyspaceMetrics
repairSyncTime = Metrics.timer(factory.createMetricName("RepairSyncTime"));
partitionsValidated = Metrics.histogram(factory.createMetricName("PartitionsValidated"), false);
bytesValidated = Metrics.histogram(factory.createMetricName("BytesValidated"), false);
+
+ confirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesConfirmed"));
+ unconfirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesUnconfirmed"));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 53ebcb0..52c50b8 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -217,6 +217,19 @@ public class TableMetrics
public final Counter speculativeWrites;
public final Gauge<Long> speculativeWriteLatencyNanos;
+ /**
+ * Metrics for inconsistencies detected between repaired data sets across replicas. These
+ * are tracked on the coordinator.
+ */
+ // Incremented where an inconsistency is detected and there are no pending repair sessions affecting
+ // the data being read, indicating a genuine mismatch between replicas' repaired data sets.
+ public final TableMeter confirmedRepairedInconsistencies;
+ // Incremented where an inconsistency is detected, but there are pending & uncommitted repair sessions
+ // in play on at least one replica. This may indicate a false positive as the inconsistency could be due to
+ // replicas marking the repair session as committed at slightly different times and so some consider it to
+ // be part of the repaired set whilst others do not.
+ public final TableMeter unconfirmedRepairedInconsistencies;
+
public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -922,6 +935,9 @@ public class TableMetrics
readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
+
+ confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies);
+ unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
}
public void updateSSTableIterated(int count)
@@ -1091,6 +1107,21 @@ public class TableMetrics
globalAliasFactory.createMetricName(alias)));
}
+ protected TableMeter createTableMeter(String name, Meter keyspaceMeter)
+ {
+ return createTableMeter(name, name, keyspaceMeter);
+ }
+
+ protected TableMeter createTableMeter(String name, String alias, Meter keyspaceMeter)
+ {
+ Meter meter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias));
+ register(name, alias, meter);
+ return new TableMeter(meter,
+ keyspaceMeter,
+ Metrics.meter(globalFactory.createMetricName(name),
+ globalAliasFactory.createMetricName(alias)));
+ }
+
/**
* Registers a metric to be removed when unloading CF.
* @return true if first time metric with that name has been registered
@@ -1103,6 +1134,25 @@ public class TableMetrics
return ret;
}
+ public static class TableMeter
+ {
+ public final Meter[] all;
+ public final Meter table;
+ private TableMeter(Meter table, Meter keyspace, Meter global)
+ {
+ this.table = table;
+ this.all = new Meter[]{table, keyspace, global};
+ }
+
+ public void mark()
+ {
+ for (Meter meter : all)
+ {
+ meter.mark();
+ }
+ }
+ }
+
public static class TableHistogram
{
public final Histogram[] all;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/net/ParameterType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ParameterType.java b/src/java/org/apache/cassandra/net/ParameterType.java
index 0a1f73f..b7a88a8 100644
--- a/src/java/org/apache/cassandra/net/ParameterType.java
+++ b/src/java/org/apache/cassandra/net/ParameterType.java
@@ -39,7 +39,8 @@ public enum ParameterType
FAILURE_REASON("FAIL_REASON", ShortVersionedSerializer.instance),
FAILURE_CALLBACK("CAL_BAC", DummyByteVersionedSerializer.instance),
TRACE_SESSION("TraceSession", UUIDSerializer.serializer),
- TRACE_TYPE("TraceType", Tracing.traceTypeSerializer);
+ TRACE_TYPE("TraceType", Tracing.traceTypeSerializer),
+ TRACK_REPAIRED_DATA("TrackRepaired", DummyByteVersionedSerializer.instance);
public static final Map<String, ParameterType> byName;
public final String key;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 4089e77..eac1ea0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -783,6 +783,23 @@ public class LocalSessions
return session != null && session.getState() != FINALIZED && session.getState() != FAILED;
}
+ /**
+ * determines if a local session exists, and if it's in the finalized state
+ */
+ public boolean isSessionFinalized(UUID sessionID)
+ {
+ LocalSession session = getSession(sessionID);
+ return session != null && session.getState() == FINALIZED;
+ }
+
+ /**
+ * determines if a local session exists
+ */
+ public boolean sessionExists(UUID sessionID)
+ {
+ return getSession(sessionID) != null;
+ }
+
@VisibleForTesting
protected boolean sessionHasData(LocalSession session)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c23eb88..9d9c628 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2122,12 +2122,21 @@ public class StorageProxy implements StorageProxyMBean
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver,
- replicaLayout.consistencyLevel().blockFor(keyspace),
- rangeCommand,
- replicaLayout,
- queryStartNanoTime);
+ replicaLayout.consistencyLevel().blockFor(keyspace),
+ rangeCommand,
+ replicaLayout,
+ queryStartNanoTime);
handler.assureSufficientLiveNodes();
+
+ // If enabled, request repaired data tracking info from full replicas but
+ // only if there are multiple full replicas to compare results from
+ if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
+ && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+ {
+ command.trackRepairedStatus();
+ }
+
if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal())
{
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
@@ -2137,7 +2146,10 @@ public class StorageProxy implements StorageProxyMBean
for (Replica replica : replicaLayout.selected())
{
Tracing.trace("Enqueuing request to {}", replica);
- MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), replica.endpoint(), handler);
+ MessageOut<ReadCommand> message = rangeCommand.createMessage();
+ if (command.isTrackingRepairedStatus() && replica.isFull())
+ message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+ MessagingService.instance().sendRRWithFailure(message, replica.endpoint(), handler);
}
}
@@ -2798,6 +2810,60 @@ public class StorageProxy implements StorageProxyMBean
DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
}
+ @Override
+ public void enableRepairedDataTrackingForRangeReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
+ }
+
+ @Override
+ public void disableRepairedDataTrackingForRangeReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
+ }
+
+ @Override
+ public boolean getRepairedDataTrackingEnabledForRangeReads()
+ {
+ return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
+ }
+
+ @Override
+ public void enableRepairedDataTrackingForPartitionReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
+ }
+
+ @Override
+ public void disableRepairedDataTrackingForPartitionReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
+ }
+
+ @Override
+ public boolean getRepairedDataTrackingEnabledForPartitionReads()
+ {
+ return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
+ }
+
+ @Override
+ public void enableReportingUnconfirmedRepairedDataMismatches()
+ {
+ DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+ }
+
+ @Override
+ public void disableReportingUnconfirmedRepairedDataMismatches()
+ {
+ DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
+ }
+
+ @Override
+ public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled()
+ {
+ return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
+ }
+
static class PaxosBallotAndContention
{
final UUID ballot;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 76a6617..efc163d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -93,4 +93,19 @@ public interface StorageProxyMBean
* Stop logging queries but leave any generated files on disk.
*/
public void stopFullQueryLogger();
+
+ /**
+ * Tracking and reporting of variances in the repaired data set across replicas at read time
+ */
+ void enableRepairedDataTrackingForRangeReads();
+ void disableRepairedDataTrackingForRangeReads();
+ boolean getRepairedDataTrackingEnabledForRangeReads();
+
+ void enableRepairedDataTrackingForPartitionReads();
+ void disableRepairedDataTrackingForPartitionReads();
+ boolean getRepairedDataTrackingEnabledForPartitionReads();
+
+ void enableReportingUnconfirmedRepairedDataMismatches();
+ void disableReportingUnconfirmedRepairedDataMismatches();
+ boolean getReportingUnconfirmedRepairedDataMismatchesEnabled();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 9043e87..1f69d6a 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -24,7 +24,6 @@ import java.util.List;
import com.google.common.base.Joiner;
import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
@@ -43,13 +42,12 @@ import org.apache.cassandra.db.transform.Filter;
import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
+import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
import static com.google.common.collect.Iterables.*;
@@ -84,9 +82,25 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
List<UnfilteredPartitionIterator> iters = new ArrayList<>(
- Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
+ Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
assert replicas.size() == iters.size();
+ // If requested, inspect each response for a digest of the replica's repaired data set
+ RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
+ ? new RepairedDataTracker(getRepairedDataVerifier(command))
+ : null;
+ if (repairedDataTracker != null)
+ {
+ messages.forEach(msg -> {
+ if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from).isFull())
+ {
+ repairedDataTracker.recordDigest(msg.from,
+ msg.payload.repairedDataDigest(),
+ msg.payload.isRepairedDigestConclusive());
+ }
+ });
+ }
+
/*
* Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
* have more rows than the client requested. To make sure that we still conform to the original limit,
@@ -105,17 +119,25 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
replicaLayout.withSelected(replicas),
- mergedResultCounter);
+ mergedResultCounter,
+ repairedDataTracker);
FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
return Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
+ protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
+ {
+ return RepairedDataVerifier.simple(command);
+ }
+
private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
L sources,
- DataLimits.Counter mergedResultCounter)
+ DataLimits.Counter mergedResultCounter,
+ RepairedDataTracker repairedDataTracker)
{
- // If we have only one results, there is no read repair to do and we can't get short reads
+ // If we have only one results, there is no read repair to do, we can't get short
+ // reads and we can't make a comparison between repaired data sets
if (results.size() == 1)
return results.get(0);
@@ -127,7 +149,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
for (int i = 0; i < results.size(); i++)
results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
- return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources));
+ return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
}
private String makeResponsesDebugString(DecoratedKey partitionKey)
@@ -135,7 +157,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
}
- private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources)
+ private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources, RepairedDataTracker repairedDataTracker)
{
return new UnfilteredPartitionIterators.MergeListener()
{
@@ -224,6 +246,8 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
public void close()
{
partitionListener.close();
+ if (repairedDataTracker != null)
+ repairedDataTracker.verify();
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
index ef1d45b..1a454f9 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service.reads;
import java.net.InetAddress;
+
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 30dea74..493b9d0 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
import com.google.common.base.Preconditions;
import com.codahale.metrics.Meter;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
@@ -32,11 +33,12 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.service.reads.DataResolver;
import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.ReadCallback;
@@ -75,9 +77,14 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
this.cfs = Keyspace.openAndGetStore(command.metadata());
}
- void sendReadCommand(InetAddressAndPort to, ReadCallback readCallback)
+ void sendReadCommand(Replica to, ReadCallback readCallback)
{
- MessagingService.instance().sendRRWithFailure(command.createMessage(), to, readCallback);
+ MessageOut<ReadCommand> message = command.createMessage();
+ // if enabled, request additional info about repaired data from any full replicas
+ if (command.isTrackingRepairedStatus() && to.isFull())
+ message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+
+ MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback);
}
abstract Meter getRepairMeter();
@@ -94,10 +101,14 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
+ // if enabled, request additional info about repaired data from any full replicas
+ if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
+ command.trackRepairedStatus();
+
for (Replica replica : replicaLayout.selected())
{
Tracing.trace("Enqueuing full data read to {}", replica);
- sendReadCommand(replica.endpoint(), readCallback);
+ sendReadCommand(replica, readCallback);
}
ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints());
}
@@ -137,7 +148,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
Replica replica = uncontacted.selected().iterator().next();
Tracing.trace("Enqueuing speculative full data read to {}", replica);
- sendReadCommand(replica.endpoint(), repair.readCallback);
+ sendReadCommand(replica, repair.readCallback);
ReadRepairMetrics.speculatedRead.mark();
ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
new file mode 100644
index 0000000..5024e86
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class RepairedDataTracker
+{
+ private final RepairedDataVerifier verifier;
+
+ public final Multimap<ByteBuffer, InetAddressAndPort> digests = HashMultimap.create();
+ public final Set<InetAddressAndPort> inconclusiveDigests = new HashSet<>();
+
+ public RepairedDataTracker(RepairedDataVerifier verifier)
+ {
+ this.verifier = verifier;
+ }
+
+ public void recordDigest(InetAddressAndPort source, ByteBuffer digest, boolean isConclusive)
+ {
+ digests.put(digest, source);
+ if (!isConclusive)
+ inconclusiveDigests.add(source);
+ }
+
+ public void verify()
+ {
+ verifier.verify(this);
+ }
+
+ public String toString()
+ {
+ return MoreObjects.toStringHelper(this)
+ .add("digests", hexDigests())
+ .add("inconclusive", inconclusiveDigests).toString();
+ }
+
+ private Map<String, Collection<InetAddressAndPort>> hexDigests()
+ {
+ Map<String, Collection<InetAddressAndPort>> hexDigests = new HashMap<>();
+ digests.asMap().forEach((k, v) -> hexDigests.put(ByteBufferUtil.bytesToHex(k), v));
+ return hexDigests;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RepairedDataTracker that = (RepairedDataTracker) o;
+ return Objects.equals(digests, that.digests) &&
+ Objects.equals(inconclusiveDigests, that.inconclusiveDigests);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(digests, inconclusiveDigests);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org