You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/01/23 10:18:21 UTC

[cassandra] 02/02: Exclude purgeable tombstones from repaired data digest

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9a280516ca8b9e730ae0648e5e29ee6280605132
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Wed Dec 18 18:31:36 2019 +0000

    Exclude purgeable tombstones from repaired data digest
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-15462
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/PartitionRangeReadCommand.java    |   2 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  | 160 ++++++++++--
 .../cassandra/db/SinglePartitionReadCommand.java   |   2 +-
 .../cassandra/db/partitions/PurgeFunction.java     |   7 +-
 .../distributed/test/RepairDigestTrackingTest.java |  54 ++++
 .../org/apache/cassandra/db/ReadCommandTest.java   | 272 ++++++++++++++++++++-
 7 files changed, 473 insertions(+), 25 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3d5217a..43126ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Exclude purgeable tombstones from repaired data tracking (CASSANDRA-15462)
  * Exclude legacy counter shards from repaired data tracking (CASSANDRA-15461)
  * Make it easier to add trace headers to messages (CASSANDRA-15499)
  * Fix and optimise partial compressed sstable streaming (CASSANDRA-13938)
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 2145389..cb68950 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -282,7 +282,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
             if (inputCollector.isEmpty())
                 return EmptyIterators.unfilteredPartition(metadata());
 
-            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators()), cfs);
+            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone)), cfs);
         }
         catch (RuntimeException | Error e)
         {
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 9485abc..4f8ea3e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -90,7 +90,7 @@ public abstract class ReadCommand extends AbstractReadQuery
 
     // for data queries, coordinators may request information on the repaired data used in constructing the response
     private boolean trackRepairedStatus = false;
-    // tracker for repaired data, initialized to singelton null object
+    // tracker for repaired data, initialized to singleton null object
     private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
     {
         void trackPartitionKey(DecoratedKey key){}
@@ -724,7 +724,7 @@ public abstract class ReadCommand extends AbstractReadQuery
     }
 
     private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
-                                                               final RepairedDataInfo repairedDataInfo)
+                                                                    final RepairedDataInfo repairedDataInfo)
     {
         class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
         {
@@ -744,6 +744,7 @@ public abstract class ReadCommand extends AbstractReadQuery
         {
             protected DecoratedKey applyToPartitionKey(DecoratedKey key)
             {
+                repairedDataInfo.onNewPartition(iterator);
                 repairedDataInfo.trackPartitionKey(key);
                 return key;
             }
@@ -762,7 +763,7 @@ public abstract class ReadCommand extends AbstractReadQuery
 
             protected Row applyToStatic(Row row)
             {
-                repairedDataInfo.trackRow(row);
+                repairedDataInfo.trackStaticRow(row);
                 return row;
             }
 
@@ -771,21 +772,48 @@ public abstract class ReadCommand extends AbstractReadQuery
                 repairedDataInfo.trackRow(row);
                 return row;
             }
-        }
 
+            protected void onPartitionClose()
+            {
+                repairedDataInfo.onPartitionClose();
+            }
+        }
         return Transformation.apply(iterator, new WithTracking());
     }
 
     private static class RepairedDataInfo
     {
-        private Digest hasher;
+        // Keeps a digest of the partition currently being processed. Since we won't know
+        // whether a partition will be fully purged from a read result until it's been
+        // consumed, we buffer this per-partition digest and add it to the final digest
+        // when the partition is closed (if it wasn't fully purged).
+        private Digest perPartitionDigest;
+        private Digest perCommandDigest;
         private boolean isConclusive = true;
 
+        // Doesn't actually purge from the underlying iterators, but excludes from the digest
+        // the purger can't be initialized until we've iterated all the sstables for the query
+        // as it requires the oldest repaired tombstone
+        private RepairedDataPurger purger;
+        private boolean isFullyPurged = true;
+
         ByteBuffer getDigest()
         {
-            return hasher == null
+            return perCommandDigest == null
                    ? ByteBufferUtil.EMPTY_BYTE_BUFFER
-                   : ByteBuffer.wrap(getHasher().digest());
+                   : ByteBuffer.wrap(perCommandDigest.digest());
+        }
+
+        protected void onNewPartition(UnfilteredRowIterator partition)
+        {
+            assert purger != null;
+            purger.setCurrentKey(partition.partitionKey());
+            purger.setIsReverseOrder(partition.isReverseOrder());
+        }
+
+        protected void setPurger(RepairedDataPurger purger)
+        {
+            this.purger = purger;
         }
 
         boolean isConclusive()
@@ -800,30 +828,128 @@ public abstract class ReadCommand extends AbstractReadQuery
 
         void trackPartitionKey(DecoratedKey key)
         {
-            getHasher().update(key.getKey());
+            getPerPartitionDigest().update(key.getKey());
         }
 
         void trackDeletion(DeletionTime deletion)
         {
-            deletion.digest(getHasher());
+            assert purger != null;
+            DeletionTime purged = purger.applyToDeletion(deletion);
+            if (!purged.isLive())
+                isFullyPurged = false;
+
+            purged.digest(getPerPartitionDigest());
         }
 
         void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
         {
-            marker.digest(getHasher());
+            assert purger != null;
+            RangeTombstoneMarker purged = purger.applyToMarker(marker);
+            if (purged != null)
+            {
+                isFullyPurged = false;
+                purged.digest(getPerPartitionDigest());
+            }
+        }
+
+        void trackStaticRow(Row row)
+        {
+            assert purger != null;
+            Row purged = purger.applyToRow(row);
+            if (!purged.isEmpty())
+            {
+                isFullyPurged = false;
+                purged.digest(getPerPartitionDigest());
+            }
         }
 
         void trackRow(Row row)
         {
-            row.digest(getHasher());
+            assert purger != null;
+            Row purged = purger.applyToRow(row);
+            if (purged != null)
+            {
+                isFullyPurged = false;
+                purged.digest(getPerPartitionDigest());
+            }
+        }
+
+        private Digest getPerPartitionDigest()
+        {
+            if (perPartitionDigest == null)
+                perPartitionDigest = Digest.forRepairedDataTracking();
+
+            return perPartitionDigest;
+        }
+
+        private void onPartitionClose()
+        {
+            if (perPartitionDigest != null)
+            {
+                // If the partition wasn't completely emptied by the purger,
+                // calculate the digest for the partition and use it to
+                // update the overall digest
+                if (!isFullyPurged)
+                {
+                    if (perCommandDigest == null)
+                        perCommandDigest = Digest.forRepairedDataTracking();
+
+                    byte[] partitionDigest = perPartitionDigest.digest();
+                    perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
+                    isFullyPurged = true;
+                }
+
+                perPartitionDigest = null;
+            }
+        }
+    }
+
+    /**
+     * Although PurgeFunction extends Transformation, this is never applied to an iterator.
+     * Instead, it is used by RepairedDataInfo during the generation of a repaired data
+     * digest to exclude data which will actually be purged later on in the read pipeline.
+     */
+    private static class RepairedDataPurger extends PurgeFunction
+    {
+        RepairedDataPurger(ColumnFamilyStore cfs,
+                           int nowInSec,
+                           int oldestUnrepairedTombstone)
+        {
+            super(nowInSec,
+                  cfs.gcBefore(nowInSec),
+                  oldestUnrepairedTombstone,
+                  cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
+                  cfs.metadata.get().enforceStrictLiveness());
         }
 
-        private Digest getHasher()
+        protected LongPredicate getPurgeEvaluator()
         {
-            if (hasher == null)
-                hasher = Digest.forRepairedDataTracking();
+            return (time) -> true;
+        }
+
+        void setCurrentKey(DecoratedKey key)
+        {
+            super.onNewPartition(key);
+        }
+
+        void setIsReverseOrder(boolean isReverseOrder)
+        {
+            super.setReverseOrder(isReverseOrder);
+        }
 
-            return hasher;
+        public DeletionTime applyToDeletion(DeletionTime deletionTime)
+        {
+            return super.applyToDeletion(deletionTime);
+        }
+
+        public Row applyToRow(Row row)
+        {
+            return super.applyToRow(row);
+        }
+
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+        {
+            return super.applyToMarker(marker);
         }
     }
 
@@ -912,12 +1038,14 @@ public abstract class ReadCommand extends AbstractReadQuery
                 unrepairedIters.add(iter);
         }
 
-        List<T> finalizeIterators()
+        List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
         {
             if (repairedIters.isEmpty())
                 return unrepairedIters;
 
             // merge the repaired data before returning, wrapping in a digest generator
+            RepairedDataPurger purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
+            repairedDataInfo.setPurger(purger);
             unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
             return unrepairedIters;
         }
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index eb57b93..4daad7d 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -695,7 +695,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
 
             StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
 
-            return withSSTablesIterated(inputCollector.finalizeIterators(), cfs.metric, metricsCollector);
+            return withSSTablesIterated(inputCollector.finalizeIterators(cfs, nowInSec(), oldestUnrepairedTombstone), cfs.metric, metricsCollector);
         }
         catch (RuntimeException | Error e)
         {
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index 8dcd359..d9e9036 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@ -60,13 +60,18 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
     {
     }
 
+    protected void setReverseOrder(boolean isReverseOrder)
+    {
+        this.isReverseOrder = isReverseOrder;
+    }
+
     @Override
     @SuppressWarnings("resource")
     protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
     {
         onNewPartition(partition.partitionKey());
 
-        isReverseOrder = partition.isReverseOrder();
+        setReverseOrder(partition.isReverseOrder());
         UnfilteredRowIterator purged = Transformation.apply(partition, this);
         if (purged.isEmpty())
         {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index a987ea3..1af329f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -113,6 +114,59 @@ public class RepairDigestTrackingTest extends DistributedTestBase implements Ser
         }
     }
 
+    @Test
+    public void testPurgeableTombstonesAreIgnored() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2)))
+        {
+
+            cluster.get(1).runOnInstance(() -> {
+                StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+            });
+
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
+            // on node1 only insert some tombstones, then flush
+            for (int i = 0; i < 10; i++)
+            {
+                cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
+            }
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+
+            // insert data on both nodes and flush
+            for (int i = 0; i < 10; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
+                                               ConsistencyLevel.ALL,
+                                               i, i, i);
+            }
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
+
+            // nothing is repaired yet
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
+            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
+            // mark everything repaired
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
+            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
+            cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
+            cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
+
+            // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
+            for (int i = 0; i < 10; i++)
+            {
+                cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
+            }
+
+            long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
+            // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
+            TimeUnit.SECONDS.sleep(2);
+            cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL);
+            long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
+
+            Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
+        }
+    }
+
     private void assertNotRepaired(SSTableReader reader) {
         Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE);
     }
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index c04f489..4419c70 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -24,6 +24,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.Assert;
@@ -40,10 +41,12 @@ import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.Range;
@@ -61,8 +64,11 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
 import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
@@ -70,6 +76,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
+import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -85,6 +92,7 @@ public class ReadCommandTest
     private static final String CF5 = "Standard5";
     private static final String CF6 = "Standard6";
     private static final String CF7 = "Counter7";
+    private static final String CF8 = "Standard8";
 
     private static final InetAddressAndPort REPAIR_COORDINATOR;
     static {
@@ -161,6 +169,14 @@ public class ReadCommandTest
                      .addClusteringColumn("col", AsciiType.instance)
                      .addRegularColumn("c", CounterColumnType.instance);
 
+        TableMetadata.Builder metadata8 =
+        TableMetadata.builder(KEYSPACE, CF8)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col", AsciiType.instance)
+                     .addRegularColumn("a", AsciiType.instance)
+                     .addRegularColumn("b", AsciiType.instance)
+                     .addRegularColumn("c", SetType.getInstance(AsciiType.instance, true));
+
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
@@ -170,7 +186,8 @@ public class ReadCommandTest
                                     metadata4,
                                     metadata5,
                                     metadata6,
-                                    metadata7);
+                                    metadata7,
+                                    metadata8);
 
         LocalSessionAccessor.startup();
     }
@@ -683,7 +700,7 @@ public class ReadCommandTest
         // execute a read and capture the digest
         ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
         ByteBuffer digestWithLegacyCounter0 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
-        assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0));
+        assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0));
 
         // truncate, then re-insert the same partition, but this time with a legacy
         // shard having the value 1. The repaired digest should match the previous, as
@@ -713,11 +730,254 @@ public class ReadCommandTest
         cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
 
         ByteBuffer digestWithCounterCell = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
-        assertFalse(ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(digestWithCounterCell));
+        assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithCounterCell));
         assertFalse(digestWithLegacyCounter0.equals(digestWithCounterCell));
         assertFalse(digestWithLegacyCounter1.equals(digestWithCounterCell));
     }
 
+    /**
+     * Writes a single partition containing a single row and reads using a partition read. The single
+     * row includes 1 live simple column, 1 simple tombstone and 1 complex column with a complex
+     * deletion and a live cell. The repaired data digests generated by executing the same query
+     * before and after the tombstones become eligible for purging should not match each other.
+     * Also, neither digest should be empty as the partition is not made empty by the purging.
+     */
+    @Test
+    public void purgeGCableTombstonesBeforeCalculatingDigest() throws Exception
+    {
+        KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(KEYSPACE);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF8);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        setGCGrace(cfs, 600);
+
+        DecoratedKey[] keys = new DecoratedKey[] { Util.dk("key0"), Util.dk("key1"), Util.dk("key2"), Util.dk("key3") };
+        int nowInSec = FBUtilities.nowInSeconds();
+        TableMetadata cfm = cfs.metadata();
+
+        // A simple tombstone
+        new RowUpdateBuilder(cfs.metadata(), 0, keys[0]).clustering("cc").delete("a").build().apply();
+
+        // Collection with an associated complex deletion
+        PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0);
+        builder.row("cc").add("c", ImmutableSet.of("element1", "element2"));
+        builder.buildAsMutation().apply();
+
+        // RangeTombstone and a row (not covered by the RT). The row contains a regular tombstone which will not be
+        // purged. This is to prevent the partition from being fully purged and removed from the final results
+        new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, keys[2]).addRangeTombstone("aa", "bb").build().apply();
+        new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, keys[2]).clustering("cc").delete("a").build().apply();
+
+        // Partition with 2 rows, one fully deleted
+        new RowUpdateBuilder(cfs.metadata.get(), 0, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply();
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        Map<DecoratedKey, ByteBuffer> digestsWithTombstones = new HashMap<>();
+        //Tombstones are not yet purgable
+        for (DecoratedKey key : keys)
+        {
+            ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
+            cmd.trackRepairedStatus();
+            Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+            assertFalse(partition.isEmpty());
+            partition.unfilteredIterator().forEachRemaining(u -> {
+                // must be either a RT, or a row containing some kind of deletion
+                assertTrue(u.isRangeTombstoneMarker() || ((Row)u).hasDeletion(cmd.nowInSec()));
+            });
+            ByteBuffer digestWithTombstones = cmd.getRepairedDataDigest();
+            // None should generate an empty digest
+            assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithTombstones);
+            digestsWithTombstones.put(key, digestWithTombstones);
+        }
+
+        // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
+        setGCGrace(cfs, 0);
+
+        //Tombstones are now purgable, so won't be in the read results and produce different digests
+        for (DecoratedKey key : keys)
+        {
+            ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec + 1).build();
+            cmd.trackRepairedStatus();
+            Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+            assertFalse(partition.isEmpty());
+            partition.unfilteredIterator().forEachRemaining(u -> {
+                // After purging, only rows without any deletions should remain.
+                // The one exception is "key2:cc" which has a regular column tombstone which is not
+                // eligible for purging. This is to prevent the partition from being fully purged
+                // when its RT is removed.
+                assertTrue(u.isRow());
+                Row r = (Row)u;
+                assertTrue(!r.hasDeletion(cmd.nowInSec())
+                           || (key.equals(keys[2]) && r.clustering()
+                                                       .get(0)
+                                                       .equals(AsciiType.instance.fromString("cc"))));
+
+            });
+            ByteBuffer digestWithoutTombstones = cmd.getRepairedDataDigest();
+            // not an empty digest
+            assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithoutTombstones);
+            // should not match the pre-purge digest
+            assertDigestsDiffer(digestsWithTombstones.get(key), digestWithoutTombstones);
+        }
+    }
+
+    private void setGCGrace(ColumnFamilyStore cfs, int gcGrace)
+    {
+        TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build();
+        KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace);
+        Schema.instance.load(
+            keyspaceMetadata.withSwapped(
+                keyspaceMetadata.tables.withSwapped(
+                    cfs.metadata().withSwapped(newParams))));
+    }
+
+    private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1)
+    {
+        assertTrue(ByteBufferUtil.compareUnsigned(b0, b1) != 0);
+    }
+
+    @Test
+    public void partitionReadFullyPurged() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        ReadCommand partitionRead = Util.cmd(cfs, Util.dk("key")).build();
+        fullyPurgedPartitionCreatesEmptyDigest(cfs, partitionRead);
+    }
+
+    @Test
+    public void rangeReadFullyPurged() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        ReadCommand rangeRead = Util.cmd(cfs).build();
+        fullyPurgedPartitionCreatesEmptyDigest(cfs, rangeRead);
+    }
+
+    /**
+     * Writes a single partition containing only a single row deletion and reads with either a range or
+     * partition query. Before the row deletion is eligible for purging, it should appear in the query
+     * results and cause a non-empty repaired data digest to be generated. Repeating the query after
+     * the row deletion is eligible for purging, both the result set and the repaired data digest should
+     * be empty.
+     */
+    private void fullyPurgedPartitionCreatesEmptyDigest(ColumnFamilyStore cfs, ReadCommand command) throws Exception
+    {
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        setGCGrace(cfs, 600);
+
+        // Partition with a single, fully deleted row
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key"), "cc").apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        command.trackRepairedStatus();
+        List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
+        assertEquals(1, partitions.size());
+        ByteBuffer digestWithTombstones = command.getRepairedDataDigest();
+        assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithTombstones) != 0);
+
+        // Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
+        setGCGrace(cfs, 0);
+
+        AbstractReadCommandBuilder builder = command instanceof PartitionRangeReadCommand
+                                             ? Util.cmd(cfs)
+                                             : Util.cmd(cfs, Util.dk("key"));
+        builder.withNowInSeconds(command.nowInSec() + 60);
+        command = builder.build();
+        command.trackRepairedStatus();
+
+        partitions = Util.getAllUnfiltered(command);
+        assertTrue(partitions.isEmpty());
+        ByteBuffer digestWithoutTombstones = command.getRepairedDataDigest();
+        assertEquals(0, ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutTombstones));
+    }
+
+    /**
+     * Verifies that during range reads which include multiple partitions, fully purged partitions
+     * have no material effect on the calculated digest. This test writes two sstables, each containing
+     * a single partition; the first is live and the second fully deleted and eligible for purging.
+     * Initially, only the sstable containing the live partition is marked repaired, while a range read
+     * which covers both partitions is performed to generate a digest. Then the sstable containing the
+     * purged partition is also marked repaired and the query reexecuted. The digests produced by both
+     * queries should match as the digest calculation should exclude the fully purged partition.
+     */
+    @Test
+    public void mixedPurgedAndNonPurgedPartitions()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        setGCGrace(cfs, 0);
+
+        ReadCommand command = Util.cmd(cfs).withNowInSeconds(FBUtilities.nowInSeconds() + 60).build();
+
+        // Live partition in a repaired sstable, so included in the digest calculation
+        new RowUpdateBuilder(cfs.metadata.get(), 0, ByteBufferUtil.bytes("key-0")).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+        // Fully deleted partition in an unrepaired sstable, so not included in the intial digest
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1"), "cc").apply();
+        cfs.forceBlockingFlush();
+
+        command.trackRepairedStatus();
+        List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
+        assertEquals(1, partitions.size());
+        ByteBuffer digestWithoutPurgedPartition = command.getRepairedDataDigest();
+        assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutPurgedPartition) != 0);
+
+        // mark the sstable containing the purged partition as repaired, so both partitions are now
+        // read during in the digest calculation. Because the purged partition is entirely
+        // discarded, the resultant digest should match the earlier one.
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+        command = Util.cmd(cfs).withNowInSeconds(command.nowInSec()).build();
+        command.trackRepairedStatus();
+
+        partitions = Util.getAllUnfiltered(command);
+        assertEquals(1, partitions.size());
+        ByteBuffer digestWithPurgedPartition = command.getRepairedDataDigest();
+        assertEquals(0, ByteBufferUtil.compareUnsigned(digestWithPurgedPartition, digestWithoutPurgedPartition));
+    }
+
+    @Test
+    public void purgingConsidersRepairedDataOnly() throws Exception
+    {
+        // 2 sstables, first is repaired and contains data that is all purgeable
+        // the second is unrepaired and contains non-purgable data. Even though
+        // the partition itself is not fully purged, the repaired data digest
+        // should be empty as there was no non-purgeable, repaired data read.
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        setGCGrace(cfs, 0);
+
+        // Partition with a single, fully deleted row which will be fully purged
+        DecoratedKey key = Util.dk("key");
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 0, key, "cc").apply();
+        cfs.forceBlockingFlush();
+        cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
+
+        new RowUpdateBuilder(cfs.metadata(), 1, key).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
+        cfs.forceBlockingFlush();
+
+        int nowInSec = FBUtilities.nowInSeconds() + 10;
+        ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
+        cmd.trackRepairedStatus();
+        Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
+        assertFalse(partition.isEmpty());
+        // check that
+        try (UnfilteredRowIterator rows = partition.unfilteredIterator())
+        {
+            assertFalse(rows.isEmpty());
+            Unfiltered unfiltered = rows.next();
+            assertFalse(rows.hasNext());
+            assertTrue(unfiltered.isRow());
+            assertFalse(((Row) unfiltered).hasDeletion(nowInSec));
+        }
+        assertEquals(EMPTY_BYTE_BUFFER, cmd.getRepairedDataDigest());
+    }
+
     private long readCount(SSTableReader sstable)
     {
         return sstable.getReadMeter().count();
@@ -833,7 +1093,7 @@ public class ReadCommandTest
         Set<ByteBuffer> digests = new HashSet<>();
         // first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive
         ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true);
-        assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+        assertEquals(EMPTY_BYTE_BUFFER, digest);
         digests.add(digest);
 
         // add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive
@@ -872,12 +1132,12 @@ public class ReadCommandTest
                                         .delete()
                                         .build()).apply();
             digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
-            assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+            assertEquals(EMPTY_BYTE_BUFFER, digest);
 
             // now flush so we have an unrepaired table with the deletion and repeat the check
             cfs.forceBlockingFlush();
             digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
-            assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, digest);
+            assertEquals(EMPTY_BYTE_BUFFER, digest);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org