You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/02 08:08:01 UTC

[cassandra] 02/02: Log when compacting many tombstones

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 296f65e8d1c25f31a87481843d715f5b7dad9d7b
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Jun 30 16:15:17 2021 +0200

    Log when compacting many tombstones
    
    Patch by marcuse; reviewed by Brandon Williams for CASSANDRA-16780
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   3 +
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 ++
 .../io/sstable/format/big/BigTableWriter.java      |  10 ++
 .../io/sstable/metadata/MetadataCollector.java     |  14 ++
 .../apache/cassandra/service/StorageService.java   |  13 ++
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 .../cassandra/distributed/impl/Instance.java       |   6 +-
 .../distributed/test/TombstoneWarningTest.java     | 144 +++++++++++++++++++++
 10 files changed, 204 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d2a0cff..6efd0c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Log when compacting many tombstones (CASSANDRA-16780)
  * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
  * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701)
  * Add a system property to set hostId if not yet initialized (CASSANDRA-14582)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 46d94d9..852945f 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1288,6 +1288,9 @@ unlogged_batch_across_partitions_warn_threshold: 10
 # Log a warning when compacting partitions larger than this value
 compaction_large_partition_warning_threshold_mb: 100
 
+# Log a warning when writing more tombstones than this value to a partition
+compaction_tombstone_warning_threshold: 100000
+
 # GC Pauses greater than 200 ms will be logged at INFO level
 # This threshold can be adjusted to minimize logging if necessary
 # gc_log_threshold_in_ms: 200
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index ae3e27e..96f047e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -219,6 +219,7 @@ public class Config
     public volatile int compaction_throughput_mb_per_sec = 16;
     public volatile int compaction_large_partition_warning_threshold_mb = 100;
     public int min_free_space_per_drive_in_mb = 50;
+    public volatile Integer compaction_tombstone_warning_threshold = 100000;
 
     public volatile int concurrent_materialized_view_builders = 1;
     public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 00ef887..86448c7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1801,6 +1801,16 @@ public class DatabaseDescriptor
 
     public static long getCompactionLargePartitionWarningThreshold() { return ByteUnit.MEBI_BYTES.toBytes(conf.compaction_large_partition_warning_threshold_mb); }
 
+    public static int getCompactionTombstoneWarningThreshold()
+    {
+        return conf.compaction_tombstone_warning_threshold;
+    }
+
+    public static void setCompactionTombstoneWarningThreshold(int count)
+    {
+        conf.compaction_tombstone_warning_threshold = count;
+    }
+
     public static int getConcurrentValidations()
     {
         return conf.concurrent_validations;
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index eeb9153..4607d99 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -231,6 +231,7 @@ public class BigTableWriter extends SSTableWriter
             long endPosition = dataFile.position();
             long rowSize = endPosition - startPosition;
             maybeLogLargePartitionWarning(key, rowSize);
+            maybeLogManyTombstonesWarning(key, metadataCollector.totalTombstones);
             metadataCollector.addPartitionSizeInBytes(rowSize);
             afterAppend(key, endPosition, entry, columnIndexWriter.buffer());
             return entry;
@@ -259,6 +260,15 @@ public class BigTableWriter extends SSTableWriter
         }
     }
 
+    private void maybeLogManyTombstonesWarning(DecoratedKey key, int tombstoneCount)
+    {
+        if (tombstoneCount > DatabaseDescriptor.getCompactionTombstoneWarningThreshold())
+        {
+            String keyString = metadata().partitionKeyType.getString(key.getKey());
+            logger.warn("Writing {} tombstones to {}/{}:{} in sstable {}", tombstoneCount, metadata.keyspace, metadata.name, keyString, getFilename());
+        }
+    }
+
     private static class StatsCollector extends Transformation
     {
         private final MetadataCollector collector;
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index be824ef..1e2d121 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.utils.streamhist.TombstoneHistogram;
 import org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder;
@@ -105,6 +106,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
     protected boolean hasLegacyCounterShards = false;
     protected long totalColumnsSet;
     protected long totalRows;
+    public int totalTombstones;
 
     /**
      * Default cardinality estimation method is to use HyperLogLog++.
@@ -114,6 +116,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
      */
     protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
     private final ClusteringComparator comparator;
+    private final int nowInSec = FBUtilities.nowInSeconds();
 
     private final UUID originatingHostId;
 
@@ -149,6 +152,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
     {
         long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0);
         cardinality.offerHashed(hashed);
+        totalTombstones = 0;
         return this;
     }
 
@@ -182,6 +186,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
         updateTimestamp(newInfo.timestamp());
         updateTTL(newInfo.ttl());
         updateLocalDeletionTime(newInfo.localExpirationTime());
+        if (!newInfo.isLive(nowInSec))
+            updateTombstoneCount();
     }
 
     public void update(Cell<?> cell)
@@ -189,6 +195,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
         updateTimestamp(cell.timestamp());
         updateTTL(cell.ttl());
         updateLocalDeletionTime(cell.localDeletionTime());
+        if (!cell.isLive(nowInSec))
+            updateTombstoneCount();
     }
 
     public void update(DeletionTime dt)
@@ -197,6 +205,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
         {
             updateTimestamp(dt.markedForDeleteAt());
             updateLocalDeletionTime(dt.localDeletionTime());
+            updateTombstoneCount();
         }
     }
 
@@ -218,6 +227,11 @@ public class MetadataCollector implements PartitionStatisticsCollector
             estimatedTombstoneDropTime.update(newLocalDeletionTime);
     }
 
+    private void updateTombstoneCount()
+    {
+        ++totalTombstones;
+    }
+
     private void updateTTL(int newTTL)
     {
         ttlTracker.update(newTTL);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a4a9f9b..60cb739 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5977,4 +5977,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.info("Changing keyspace count warn threshold from {} to {}", getKeyspaceCountWarnThreshold(), value);
         DatabaseDescriptor.setKeyspaceCountWarnThreshold(value);
     }
+
+    public void setCompactionTombstoneWarningThreshold(int count)
+    {
+        if (count < 0)
+            throw new IllegalStateException("compaction tombstone warning threshold needs to be >= 0, not "+count);
+        logger.info("Setting compaction_tombstone_warning_threshold to {}", count);
+        DatabaseDescriptor.setCompactionTombstoneWarningThreshold(count);
+    }
+
+    public int getCompactionTombstoneWarningThreshold()
+    {
+        return DatabaseDescriptor.getCompactionTombstoneWarningThreshold();
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index cc69fec..a5a6607 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -856,4 +856,7 @@ public interface StorageServiceMBean extends NotificationEmitter
     void setTableCountWarnThreshold(int value);
     int getKeyspaceCountWarnThreshold();
     void setKeyspaceCountWarnThreshold(int value);
+
+    public void setCompactionTombstoneWarningThreshold(int count);
+    public int getCompactionTombstoneWarningThreshold();
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index d772d51..a58f2db 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -185,7 +185,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         String suite = System.getProperty("suitename", "suitename_IS_UNDEFINED");
         String clusterId = ClusterIDDefiner.getId();
         String instanceId = InstanceIDDefiner.getInstanceId();
-        return new FileLogAction(new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId)));
+        File f = new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId));
+        // when creating a cluster globally in a test class we get the logs without the suite, try finding those logs:
+        if (!f.exists())
+            f = new File(String.format("build/test/logs/%s/%s/%s/system.log", tag, clusterId, instanceId));
+        return new FileLogAction(f);
     }
 
     @Override
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java
new file mode 100644
index 0000000..9406432
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TombstoneWarningTest extends TestBaseImpl
+{
+    private static final int COMPACTION_TOMBSTONE_WARN = 75;
+    private static final ICluster<IInvokableInstance> cluster;
+
+    static
+    {
+        try
+        {
+            Cluster.Builder builder = Cluster.build(3);
+            builder.withConfig(c -> c.set("compaction_tombstone_warning_threshold", COMPACTION_TOMBSTONE_WARN));
+            cluster = builder.createWithoutStarting();
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        cluster.startup();
+    }
+
+    @Before
+    public void setup()
+    {
+        cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+        init(cluster);
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+    }
+
+    @Test
+    public void regularTombstonesLogTest()
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("update %s.tbl set v = null where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+        assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN , false);
+    }
+
+    @Test
+    public void rowTombstonesLogTest()
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+        assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN , false);
+    }
+
+    @Test
+    public void rangeTombstonesLogTest()
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where pk = ? and ck >= ? and ck <= ?"), ConsistencyLevel.ALL, i, j, j);
+        assertTombstoneLogs(99 - (COMPACTION_TOMBSTONE_WARN / 2), true);
+    }
+
+    @Test
+    public void ttlTest() throws InterruptedException
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?) using ttl 1000"), ConsistencyLevel.ALL, i, j, j);
+        assertTombstoneLogs(0, true);
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("update %s.tbl using ttl 1 set v = 33 where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+        Thread.sleep(1500);
+        assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN, false);
+    }
+
+    @Test
+    public void noTombstonesLogTest()
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?)"), ConsistencyLevel.ALL, i, j, j);
+        assertTombstoneLogs(0, false);
+    }
+
+    private void assertTombstoneLogs(long expectedCount, boolean isRangeTombstones)
+    {
+        long mark = cluster.get(1).logs().mark();
+        cluster.get(1).flush(KEYSPACE);
+        String pattern = ".*Writing (?<tscount>\\d+) tombstones to distributed_test_keyspace/tbl:(?<key>\\d+).*";
+        LogResult<List<String>> res = cluster.get(1).logs().grep(mark, pattern);
+        assertEquals(expectedCount, res.getResult().size());
+        Pattern p = Pattern.compile(pattern);
+        for (String r : res.getResult())
+        {
+            Matcher m = p.matcher(r);
+            assertTrue(m.matches());
+            long tombstoneCount = Integer.parseInt(m.group("tscount"));
+            assertTrue(tombstoneCount > COMPACTION_TOMBSTONE_WARN);
+            assertEquals(r, Integer.parseInt(m.group("key")) * (isRangeTombstones ? 2 : 1), tombstoneCount);
+        }
+
+        mark = cluster.get(1).logs().mark();
+        cluster.get(1).forceCompact(KEYSPACE, "tbl");
+        res = cluster.get(1).logs().grep(mark, pattern);
+        assertEquals(expectedCount, res.getResult().size());
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org