You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/01/23 10:18:21 UTC
[cassandra] 02/02: Exclude purgeable tombstones from repaired data
digest
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9a280516ca8b9e730ae0648e5e29ee6280605132
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Wed Dec 18 18:31:36 2019 +0000
Exclude purgeable tombstones from repaired data digest
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-15462
---
CHANGES.txt | 1 +
.../cassandra/db/PartitionRangeReadCommand.java | 2 +-
src/java/org/apache/cassandra/db/ReadCommand.java | 160 ++++++++++--
.../cassandra/db/SinglePartitionReadCommand.java | 2 +-
.../cassandra/db/partitions/PurgeFunction.java | 7 +-
.../distributed/test/RepairDigestTrackingTest.java | 54 ++++
.../org/apache/cassandra/db/ReadCommandTest.java | 272 ++++++++++++++++++++-
7 files changed, 473 insertions(+), 25 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d5217a..43126ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha3
+ * Exclude purgeable tombstones from repaired data tracking (CASSANDRA-15462)
* Exclude legacy counter shards from repaired data tracking (CASSANDRA-15461)
* Make it easier to add trace headers to messages (CASSANDRA-15499)
* Fix and optimise partial compressed sstable streaming (CASSANDRA-13938)
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 2145389..cb68950 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -282,7 +282,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
if (inputCollector.isEmpty())
return EmptyIterators.unfilteredPartition(metadata());
- return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators()), cfs);
+ return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone)), cfs);
}
catch (RuntimeException | Error e)
{
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 9485abc..4f8ea3e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -90,7 +90,7 @@ public abstract class ReadCommand extends AbstractReadQuery
// for data queries, coordinators may request information on the repaired data used in constructing the response
private boolean trackRepairedStatus = false;
- // tracker for repaired data, initialized to singelton null object
+ // tracker for repaired data, initialized to singleton null object
private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
{
void trackPartitionKey(DecoratedKey key){}
@@ -724,7 +724,7 @@ public abstract class ReadCommand extends AbstractReadQuery
}
private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
- final RepairedDataInfo repairedDataInfo)
+ final RepairedDataInfo repairedDataInfo)
{
class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
{
@@ -744,6 +744,7 @@ public abstract class ReadCommand extends AbstractReadQuery
{
protected DecoratedKey applyToPartitionKey(DecoratedKey key)
{
+ repairedDataInfo.onNewPartition(iterator);
repairedDataInfo.trackPartitionKey(key);
return key;
}
@@ -762,7 +763,7 @@ public abstract class ReadCommand extends AbstractReadQuery
protected Row applyToStatic(Row row)
{
- repairedDataInfo.trackRow(row);
+ repairedDataInfo.trackStaticRow(row);
return row;
}
@@ -771,21 +772,48 @@ public abstract class ReadCommand extends AbstractReadQuery
repairedDataInfo.trackRow(row);
return row;
}
- }
+ protected void onPartitionClose()
+ {
+ repairedDataInfo.onPartitionClose();
+ }
+ }
return Transformation.apply(iterator, new WithTracking());
}
private static class RepairedDataInfo
{
- private Digest hasher;
+ // Keeps a digest of the partition currently being processed. Since we won't know
+ // whether a partition will be fully purged from a read result until it's been
+ // consumed, we buffer this per-partition digest and add it to the final digest
+ // when the partition is closed (if it wasn't fully purged).
+ private Digest perPartitionDigest;
+ private Digest perCommandDigest;
private boolean isConclusive = true;
+ // Doesn't actually purge from the underlying iterators, but excludes from the digest
+ // the purger can't be initialized until we've iterated all the sstables for the query
+ // as it requires the oldest repaired tombstone
+ private RepairedDataPurger purger;
+ private boolean isFullyPurged = true;
+
ByteBuffer getDigest()
{
- return hasher == null
+ return perCommandDigest == null
? ByteBufferUtil.EMPTY_BYTE_BUFFER
- : ByteBuffer.wrap(getHasher().digest());
+ : ByteBuffer.wrap(perCommandDigest.digest());
+ }
+
+ protected void onNewPartition(UnfilteredRowIterator partition)
+ {
+ assert purger != null;
+ purger.setCurrentKey(partition.partitionKey());
+ purger.setIsReverseOrder(partition.isReverseOrder());
+ }
+
+ protected void setPurger(RepairedDataPurger purger)
+ {
+ this.purger = purger;
}
boolean isConclusive()
@@ -800,30 +828,128 @@ public abstract class ReadCommand extends AbstractReadQuery
void trackPartitionKey(DecoratedKey key)
{
- getHasher().update(key.getKey());
+ getPerPartitionDigest().update(key.getKey());
}
void trackDeletion(DeletionTime deletion)
{
- deletion.digest(getHasher());
+ assert purger != null;
+ DeletionTime purged = purger.applyToDeletion(deletion);
+ if (!purged.isLive())
+ isFullyPurged = false;
+
+ purged.digest(getPerPartitionDigest());
}
void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
{
- marker.digest(getHasher());
+ assert purger != null;
+ RangeTombstoneMarker purged = purger.applyToMarker(marker);
+ if (purged != null)
+ {
+ isFullyPurged = false;
+ purged.digest(getPerPartitionDigest());
+ }
+ }
+
+ void trackStaticRow(Row row)
+ {
+ assert purger != null;
+ Row purged = purger.applyToRow(row);
+ if (!purged.isEmpty())
+ {
+ isFullyPurged = false;
+ purged.digest(getPerPartitionDigest());
+ }
}
void trackRow(Row row)
{
- row.digest(getHasher());
+ assert purger != null;
+ Row purged = purger.applyToRow(row);
+ if (purged != null)
+ {
+ isFullyPurged = false;
+ purged.digest(getPerPartitionDigest());
+ }
+ }
+
+ private Digest getPerPartitionDigest()
+ {
+ if (perPartitionDigest == null)
+ perPartitionDigest = Digest.forRepairedDataTracking();
+
+ return perPartitionDigest;
+ }
+
+ private void onPartitionClose()
+ {
+ if (perPartitionDigest != null)
+ {
+ // If the partition wasn't completely emptied by the purger,
+ // calculate the digest for the partition and use it to
+ // update the overall digest
+ if (!isFullyPurged)
+ {
+ if (perCommandDigest == null)
+ perCommandDigest = Digest.forRepairedDataTracking();
+
+ byte[] partitionDigest = perPartitionDigest.digest();
+ perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
+ isFullyPurged = true;
+ }
+
+ perPartitionDigest = null;
+ }
+ }
+ }
+
+ /**
+ * Although PurgeFunction extends Transformation, this is never applied to an iterator.
+ * Instead, it is used by RepairedDataInfo during the generation of a repaired data
+ * digest to exclude data which will actually be purged later on in the read pipeline.
+ */
+ private static class RepairedDataPurger extends PurgeFunction
+ {
+ RepairedDataPurger(ColumnFamilyStore cfs,
+ int nowInSec,
+ int oldestUnrepairedTombstone)
+ {
+ super(nowInSec,
+ cfs.gcBefore(nowInSec),
+ oldestUnrepairedTombstone,
+ cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
+ cfs.metadata.get().enforceStrictLiveness());
}
- private Digest getHasher()
+ protected LongPredicate getPurgeEvaluator()
{
- if (hasher == null)
- hasher = Digest.forRepairedDataTracking();
+ return (time) -> true;
+ }
+
+ void setCurrentKey(DecoratedKey key)
+ {
+ super.onNewPartition(key);
+ }
+
+ void setIsReverseOrder(boolean isReverseOrder)
+ {
+ super.setReverseOrder(isReverseOrder);
+ }
- return hasher;
+ public DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ return super.applyToDeletion(deletionTime);
+ }
+
+ public Row applyToRow(Row row)
+ {
+ return super.applyToRow(row);
+ }
+
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ return super.applyToMarker(marker);
}
}
@@ -912,12 +1038,14 @@ public abstract class ReadCommand extends AbstractReadQuery
unrepairedIters.add(iter);
}
- List<T> finalizeIterators()
+ List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
{
if (repairedIters.isEmpty())
return unrepairedIters;
// merge the repaired data before returning, wrapping in a digest generator
+ RepairedDataPurger purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
+ repairedDataInfo.setPurger(purger);
unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
return unrepairedIters;
}
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index eb57b93..4daad7d 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -695,7 +695,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
- return withSSTablesIterated(inputCollector.finalizeIterators(), cfs.metric, metricsCollector);
+ return withSSTablesIterated(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone), cfs.metric, metricsCollector);
}
catch (RuntimeException | Error e)
{
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index 8dcd359..d9e9036 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@ -60,13 +60,18 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
{
}
+ protected void setReverseOrder(boolean isReverseOrder)
+ {
+ this.isReverseOrder = isReverseOrder;
+ }
+
@Override
@SuppressWarnings("resource")
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
onNewPartition(partition.partitionKey());
- isReverseOrder = partition.isReverseOrder();
+ setReverseOrder(partition.isReverseOrder());
UnfilteredRowIterator purged = Transformation.apply(partition, this);
if (purged.isEmpty())
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index a987ea3..1af329f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
@@ -113,6 +114,59 @@ public class RepairDigestTrackingTest extends DistributedTestBase implements Ser
}
}
+ @Test
+ public void testPurgeableTombstonesAreIgnored() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+
+ cluster.get(1).runOnInstance(() -> {
+ StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+ });
+
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
+ // on node1 only insert some tombstones, then flush
+ for (int i = 0; i < 10; i++)
+ {
+ cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
+ }
+ cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+
+ // insert data on both nodes and flush
+ for (int i = 0; i < 10; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
+ ConsistencyLevel.ALL,
+ i, i, i);
+ }
+ cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+ cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+
+ // nothing is repaired yet
+ cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
+ cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
+ // mark everything repaired
+ cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
+ cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
+ cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
+ cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
+
+ // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
+ for (int i = 0; i < 10; i++)
+ {
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
+ }
+
+ long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
+ // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
+ TimeUnit.SECONDS.sleep(2);
+ cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL);
+ long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
+
+ Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
+ }
+ }
+
private void assertNotRepaired(SSTableReader reader) {
Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE);
}
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index c04f489..4419c70 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -24,6 +24,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
@@ -40,10 +41,12 @@ import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Range;
@@ -61,8 +64,11 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
@@ -70,6 +76,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
+import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -85,6 +92,7 @@ public class ReadCommandTest
private static final String CF5 = "Standard5";
private static final String CF6 = "Standard6";
private static final String CF7 = "Counter7";
+ private static final String CF8 = "Standard8";
private static final InetAddressAndPort REPAIR_COORDINATOR;
static {
@@ -161,6 +169,14 @@ public class ReadCommandTest
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("c", CounterColumnType.instance);
+ TableMetadata.Builder metadata8 =
+ TableMetadata.builder(KEYSPACE, CF8)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .addRegularColumn("c", SetType.getInstance(AsciiType.instance, true));
+
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
@@ -170,7 +186,8 @@ public class ReadCommandTest
metadata4,
metadata5,
metadata6,
- metadata7);
+ metadata7,
+ metadata8);
LocalSessionAccessor.startup();
}
@@ -683,7 +700,7 @@ public class ReadCommandTest
// execute a read and capture the digest
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
ByteBuffer digestWithLegacyCounter0 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
- assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0));
+ assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0));
// truncate, then re-insert the same partition, but this time with a legacy
// shard having the value 1. The repaired digest should match the previous, as
@@ -713,11 +730,254 @@ public class ReadCommandTest
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
ByteBuffer digestWithCounterCell = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
- assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithCounterCell));
+ assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithCounterCell));
assertFalse(digestWithLegacyCounter0.equals(digestWithCounterCell));
assertFalse(digestWithLegacyCounter1.equals(digestWithCounterCell));
}
+ /**
+ * Writes a single partition containing a single row and reads using a partition read. The single
+ * row includes 1 live simple column, 1 simple tombstone and 1 complex column with a complex
+ * deletion and a live cell. The repaired data digests generated by executing the same query
+ * before and after the tombstones become eligible for purging should not match each other.
+ * Also, neither digest should be empty as the partition is not made empty by the purging.
+ */
+ @Test
+ public void purgeGCableTombstonesBeforeCalculatingDigest() throws Exception
+ {
+ KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(KEYSPACE);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF8);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ setGCGrace(cfs, 600);
+
+ DecoratedKey[] keys = new DecoratedKey[] { Util.dk("key0"), Util.dk("key1"), Util.dk("key2"), Util.dk("key3") };
+ int nowInSec = FBUtilities.nowInSeconds();
+ TableMetadata cfm = cfs.metadata();
+
+ // A simple tombstone
+ new RowUpdateBuilder(cfs.metadata(), 0, keys[0]).clustering("cc").delete("a").build().apply();
+
+ // Collection with an associated complex deletion
+ PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0);
+ builder.row("cc").add("c", ImmutableSet.of("element1", "element2"));
+ builder.buildAsMutation().apply();
+
+ // RangeTombstone and a row (not covered by the RT). The row contains a regular tombstone which will not be
+ // purged. This is to prevent the partition from being fully purged and removed from the final results
+ new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, keys[2]).addRangeTombstone("aa", "bb").build().apply();
+ new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, keys[2]).clustering("cc").delete("a").build().apply();
+
+ // Partition with 2 rows, one fully deleted
+ new RowUpdateBuilder(cfs.metadata.get(), 0, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply();
+ RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply();
+ cfs.forceBlockingFlush();
+ cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+ Map<DecoratedKey, ByteBuffer> digestsWithTombstones = new HashMap<>();
+ //Tombstones are not yet purgable
+ for (DecoratedKey key : keys)
+ {
+ ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
+ cmd.trackRepairedStatus();
+ Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+ assertFalse(partition.isEmpty());
+ partition.unfilteredIterator().forEachRemaining(u -> {
+ // must be either a RT, or a row containing some kind of deletion
+ assertTrue(u.isRangeTombstoneMarker() || ((Row)u).hasDeletion(cmd.nowInSec()));
+ });
+ ByteBuffer digestWithTombstones = cmd.getRepairedDataDigest();
+ // None should generate an empty digest
+ assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithTombstones);
+ digestsWithTombstones.put(key, digestWithTombstones);
+ }
+
+ // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
+ setGCGrace(cfs, 0);
+
+ //Tombstones are now purgable, so won't be in the read results and produce different digests
+ for (DecoratedKey key : keys)
+ {
+ ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec + 1).build();
+ cmd.trackRepairedStatus();
+ Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+ assertFalse(partition.isEmpty());
+ partition.unfilteredIterator().forEachRemaining(u -> {
+ // After purging, only rows without any deletions should remain.
+ // The one exception is "key2:cc" which has a regular column tombstone which is not
+ // eligible for purging. This is to prevent the partition from being fully purged
+ // when its RT is removed.
+ assertTrue(u.isRow());
+ Row r = (Row)u;
+ assertTrue(!r.hasDeletion(cmd.nowInSec())
+ || (key.equals(keys[2]) && r.clustering()
+ .get(0)
+ .equals(AsciiType.instance.fromString("cc"))));
+
+ });
+ ByteBuffer digestWithoutTombstones = cmd.getRepairedDataDigest();
+ // not an empty digest
+ assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithoutTombstones);
+ // should not match the pre-purge digest
+ assertDigestsDiffer(digestsWithTombstones.get(key), digestWithoutTombstones);
+ }
+ }
+
+ private void setGCGrace(ColumnFamilyStore cfs, int gcGrace)
+ {
+ TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build();
+ KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace);
+ Schema.instance.load(
+ keyspaceMetadata.withSwapped(
+ keyspaceMetadata.tables.withSwapped(
+ cfs.metadata().withSwapped(newParams))));
+ }
+
+ private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1)
+ {
+ assertTrue(ByteBufferUtil.compareUnsigned(b0, b1) != 0);
+ }
+
+ @Test
+ public void partitionReadFullyPurged() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ ReadCommand partitionRead = Util.cmd(cfs, Util.dk("key")).build();
+ fullyPurgedPartitionCreatesEmptyDigest(cfs, partitionRead);
+ }
+
+ @Test
+ public void rangeReadFullyPurged() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ ReadCommand rangeRead = Util.cmd(cfs).build();
+ fullyPurgedPartitionCreatesEmptyDigest(cfs, rangeRead);
+ }
+
+ /**
+ * Writes a single partition containing only a single row deletion and reads with either a range or
+ * partition query. Before the row deletion is eligible for purging, it should appear in the query
+ * results and cause a non-empty repaired data digest to be generated. Repeating the query after
+ * the row deletion is eligible for purging, both the result set and the repaired data digest should
+ * be empty.
+ */
+ private void fullyPurgedPartitionCreatesEmptyDigest(ColumnFamilyStore cfs, ReadCommand command) throws Exception
+ {
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ setGCGrace(cfs, 600);
+
+ // Partition with a single, fully deleted row
+ RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key"), "cc").apply();
+ cfs.forceBlockingFlush();
+ cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+ command.trackRepairedStatus();
+ List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
+ assertEquals(1, partitions.size());
+ ByteBuffer digestWithTombstones = command.getRepairedDataDigest();
+ assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithTombstones) != 0);
+
+ // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
+ setGCGrace(cfs, 0);
+
+ AbstractReadCommandBuilder builder = command instanceof PartitionRangeReadCommand
+ ? Util.cmd(cfs)
+ : Util.cmd(cfs, Util.dk("key"));
+ builder.withNowInSeconds(command.nowInSec() + 60);
+ command = builder.build();
+ command.trackRepairedStatus();
+
+ partitions = Util.getAllUnfiltered(command);
+ assertTrue(partitions.isEmpty());
+ ByteBuffer digestWithoutTombstones = command.getRepairedDataDigest();
+ assertEquals(0, ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutTombstones));
+ }
+
+ /**
+ * Verifies that during range reads which include multiple partitions, fully purged partitions
+ * have no material effect on the calculated digest. This test writes two sstables, each containing
+ * a single partition; the first is live and the second fully deleted and eligible for purging.
+ * Initially, only the sstable containing the live partition is marked repaired, while a range read
+ * which covers both partitions is performed to generate a digest. Then the sstable containing the
+ * purged partition is also marked repaired and the query reexecuted. The digests produced by both
+ * queries should match as the digest calculation should exclude the fully purged partition.
+ */
+ @Test
+ public void mixedPurgedAndNonPurgedPartitions()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ setGCGrace(cfs, 0);
+
+ ReadCommand command = Util.cmd(cfs).withNowInSeconds(FBUtilities.nowInSeconds() + 60).build();
+
+ // Live partition in a repaired sstable, so included in the digest calculation
+ new RowUpdateBuilder(cfs.metadata.get(), 0, ByteBufferUtil.bytes("key-0")).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
+ cfs.forceBlockingFlush();
+ cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+ // Fully deleted partition in an unrepaired sstable, so not included in the intial digest
+ RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1"), "cc").apply();
+ cfs.forceBlockingFlush();
+
+ command.trackRepairedStatus();
+ List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
+ assertEquals(1, partitions.size());
+ ByteBuffer digestWithoutPurgedPartition = command.getRepairedDataDigest();
+ assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutPurgedPartition) != 0);
+
+ // mark the sstable containing the purged partition as repaired, so both partitions are now
+ // read during in the digest calculation. Because the purged partition is entirely
+ // discarded, the resultant digest should match the earlier one.
+ cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+ command = Util.cmd(cfs).withNowInSeconds(command.nowInSec()).build();
+ command.trackRepairedStatus();
+
+ partitions = Util.getAllUnfiltered(command);
+ assertEquals(1, partitions.size());
+ ByteBuffer digestWithPurgedPartition = command.getRepairedDataDigest();
+ assertEquals(0, ByteBufferUtil.compareUnsigned(digestWithPurgedPartition, digestWithoutPurgedPartition));
+ }
+
+ @Test
+ public void purgingConsidersRepairedDataOnly() throws Exception
+ {
+ // 2 sstables, first is repaired and contains data that is all purgeable
+ // the second is unrepaired and contains non-purgable data. Even though
+ // the partition itself is not fully purged, the repaired data digest
+ // should be empty as there was no non-purgeable, repaired data read.
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ setGCGrace(cfs, 0);
+
+ // Partition with a single, fully deleted row which will be fully purged
+ DecoratedKey key = Util.dk("key");
+ RowUpdateBuilder.deleteRow(cfs.metadata(), 0, key, "cc").apply();
+ cfs.forceBlockingFlush();
+ cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+ new RowUpdateBuilder(cfs.metadata(), 1, key).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
+ cfs.forceBlockingFlush();
+
+ int nowInSec = FBUtilities.nowInSeconds() + 10;
+ ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
+ cmd.trackRepairedStatus();
+ Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+ assertFalse(partition.isEmpty());
+ // check that
+ try (UnfilteredRowIterator rows = partition.unfilteredIterator())
+ {
+ assertFalse(rows.isEmpty());
+ Unfiltered unfiltered = rows.next();
+ assertFalse(rows.hasNext());
+ assertTrue(unfiltered.isRow());
+ assertFalse(((Row) unfiltered).hasDeletion(nowInSec));
+ }
+ assertEquals(EMPTY_BYTE_BUFFER, cmd.getRepairedDataDigest());
+ }
+
private long readCount(SSTableReader sstable)
{
return sstable.getReadMeter().count();
@@ -833,7 +1093,7 @@ public class ReadCommandTest
Set<ByteBuffer> digests = new HashSet<>();
// first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive
ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true);
- assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+ assertEquals(EMPTY_BYTE_BUFFER, digest);
digests.add(digest);
// add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive
@@ -872,12 +1132,12 @@ public class ReadCommandTest
.delete()
.build()).apply();
digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
- assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+ assertEquals(EMPTY_BYTE_BUFFER, digest);
// now flush so we have an unrepaired table with the deletion and repeat the check
cfs.forceBlockingFlush();
digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
- assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+ assertEquals(EMPTY_BYTE_BUFFER, digest);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org