You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/02 08:08:01 UTC
[cassandra] 02/02: Log when compacting many tombstones
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 296f65e8d1c25f31a87481843d715f5b7dad9d7b
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Jun 30 16:15:17 2021 +0200
Log when compacting many tombstones
Patch by marcuse; reviewed by Brandon Williams for CASSANDRA-16780
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 3 +
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../io/sstable/format/big/BigTableWriter.java | 10 ++
.../io/sstable/metadata/MetadataCollector.java | 14 ++
.../apache/cassandra/service/StorageService.java | 13 ++
.../cassandra/service/StorageServiceMBean.java | 3 +
.../cassandra/distributed/impl/Instance.java | 6 +-
.../distributed/test/TombstoneWarningTest.java | 144 +++++++++++++++++++++
10 files changed, 204 insertions(+), 1 deletion(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d2a0cff..6efd0c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Log when compacting many tombstones (CASSANDRA-16780)
* Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
* Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701)
* Add a system property to set hostId if not yet initialized (CASSANDRA-14582)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 46d94d9..852945f 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1288,6 +1288,9 @@ unlogged_batch_across_partitions_warn_threshold: 10
# Log a warning when compacting partitions larger than this value
compaction_large_partition_warning_threshold_mb: 100
+# Log a warning when writing more tombstones than this value to a partition
+compaction_tombstone_warning_threshold: 100000
+
# GC Pauses greater than 200 ms will be logged at INFO level
# This threshold can be adjusted to minimize logging if necessary
# gc_log_threshold_in_ms: 200
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index ae3e27e..96f047e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -219,6 +219,7 @@ public class Config
public volatile int compaction_throughput_mb_per_sec = 16;
public volatile int compaction_large_partition_warning_threshold_mb = 100;
public int min_free_space_per_drive_in_mb = 50;
+ public volatile Integer compaction_tombstone_warning_threshold = 100000;
public volatile int concurrent_materialized_view_builders = 1;
public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 00ef887..86448c7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1801,6 +1801,16 @@ public class DatabaseDescriptor
public static long getCompactionLargePartitionWarningThreshold() { return ByteUnit.MEBI_BYTES.toBytes(conf.compaction_large_partition_warning_threshold_mb); }
+ public static int getCompactionTombstoneWarningThreshold()
+ {
+ return conf.compaction_tombstone_warning_threshold;
+ }
+
+ public static void setCompactionTombstoneWarningThreshold(int count)
+ {
+ conf.compaction_tombstone_warning_threshold = count;
+ }
+
public static int getConcurrentValidations()
{
return conf.concurrent_validations;
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index eeb9153..4607d99 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -231,6 +231,7 @@ public class BigTableWriter extends SSTableWriter
long endPosition = dataFile.position();
long rowSize = endPosition - startPosition;
maybeLogLargePartitionWarning(key, rowSize);
+ maybeLogManyTombstonesWarning(key, metadataCollector.totalTombstones);
metadataCollector.addPartitionSizeInBytes(rowSize);
afterAppend(key, endPosition, entry, columnIndexWriter.buffer());
return entry;
@@ -259,6 +260,15 @@ public class BigTableWriter extends SSTableWriter
}
}
+ private void maybeLogManyTombstonesWarning(DecoratedKey key, int tombstoneCount)
+ {
+ if (tombstoneCount > DatabaseDescriptor.getCompactionTombstoneWarningThreshold())
+ {
+ String keyString = metadata().partitionKeyType.getString(key.getKey());
+ logger.warn("Writing {} tombstones to {}/{}:{} in sstable {}", tombstoneCount, metadata.keyspace, metadata.name, keyString, getFilename());
+ }
+ }
+
private static class StatsCollector extends Transformation
{
private final MetadataCollector collector;
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index be824ef..1e2d121 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MurmurHash;
import org.apache.cassandra.utils.streamhist.TombstoneHistogram;
import org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder;
@@ -105,6 +106,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
protected boolean hasLegacyCounterShards = false;
protected long totalColumnsSet;
protected long totalRows;
+ public int totalTombstones;
/**
* Default cardinality estimation method is to use HyperLogLog++.
@@ -114,6 +116,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
*/
protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
private final ClusteringComparator comparator;
+ private final int nowInSec = FBUtilities.nowInSeconds();
private final UUID originatingHostId;
@@ -149,6 +152,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
{
long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0);
cardinality.offerHashed(hashed);
+ totalTombstones = 0;
return this;
}
@@ -182,6 +186,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
updateTimestamp(newInfo.timestamp());
updateTTL(newInfo.ttl());
updateLocalDeletionTime(newInfo.localExpirationTime());
+ if (!newInfo.isLive(nowInSec))
+ updateTombstoneCount();
}
public void update(Cell<?> cell)
@@ -189,6 +195,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
updateTimestamp(cell.timestamp());
updateTTL(cell.ttl());
updateLocalDeletionTime(cell.localDeletionTime());
+ if (!cell.isLive(nowInSec))
+ updateTombstoneCount();
}
public void update(DeletionTime dt)
@@ -197,6 +205,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
{
updateTimestamp(dt.markedForDeleteAt());
updateLocalDeletionTime(dt.localDeletionTime());
+ updateTombstoneCount();
}
}
@@ -218,6 +227,11 @@ public class MetadataCollector implements PartitionStatisticsCollector
estimatedTombstoneDropTime.update(newLocalDeletionTime);
}
+ private void updateTombstoneCount()
+ {
+ ++totalTombstones;
+ }
+
private void updateTTL(int newTTL)
{
ttlTracker.update(newTTL);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a4a9f9b..60cb739 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5977,4 +5977,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.info("Changing keyspace count warn threshold from {} to {}", getKeyspaceCountWarnThreshold(), value);
DatabaseDescriptor.setKeyspaceCountWarnThreshold(value);
}
+
+ public void setCompactionTombstoneWarningThreshold(int count)
+ {
+ if (count < 0)
+ throw new IllegalStateException("compaction tombstone warning threshold needs to be >= 0, not "+count);
+ logger.info("Setting compaction_tombstone_warning_threshold to {}", count);
+ DatabaseDescriptor.setCompactionTombstoneWarningThreshold(count);
+ }
+
+ public int getCompactionTombstoneWarningThreshold()
+ {
+ return DatabaseDescriptor.getCompactionTombstoneWarningThreshold();
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index cc69fec..a5a6607 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -856,4 +856,7 @@ public interface StorageServiceMBean extends NotificationEmitter
void setTableCountWarnThreshold(int value);
int getKeyspaceCountWarnThreshold();
void setKeyspaceCountWarnThreshold(int value);
+
+ public void setCompactionTombstoneWarningThreshold(int count);
+ public int getCompactionTombstoneWarningThreshold();
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index d772d51..a58f2db 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -185,7 +185,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
String suite = System.getProperty("suitename", "suitename_IS_UNDEFINED");
String clusterId = ClusterIDDefiner.getId();
String instanceId = InstanceIDDefiner.getInstanceId();
- return new FileLogAction(new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId)));
+ File f = new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId));
+ // when creating a cluster globally in a test class we get the logs without the suite, try finding those logs:
+ if (!f.exists())
+ f = new File(String.format("build/test/logs/%s/%s/%s/system.log", tag, clusterId, instanceId));
+ return new FileLogAction(f);
}
@Override
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java
new file mode 100644
index 0000000..9406432
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TombstoneWarningTest extends TestBaseImpl
+{
+ private static final int COMPACTION_TOMBSTONE_WARN = 75;
+ private static final ICluster<IInvokableInstance> cluster;
+
+ static
+ {
+ try
+ {
+ Cluster.Builder builder = Cluster.build(3);
+ builder.withConfig(c -> c.set("compaction_tombstone_warning_threshold", COMPACTION_TOMBSTONE_WARN));
+ cluster = builder.createWithoutStarting();
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ @BeforeClass
+ public static void setupClass()
+ {
+ cluster.startup();
+ }
+
+ @Before
+ public void setup()
+ {
+ cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+ init(cluster);
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ }
+
+ @Test
+ public void regularTombstonesLogTest()
+ {
+ for (int i = 0; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ cluster.coordinator(1).execute(withKeyspace("update %s.tbl set v = null where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+ assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN , false);
+ }
+
+ @Test
+ public void rowTombstonesLogTest()
+ {
+ for (int i = 0; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+ assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN , false);
+ }
+
+ @Test
+ public void rangeTombstonesLogTest()
+ {
+ for (int i = 0; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where pk = ? and ck >= ? and ck <= ?"), ConsistencyLevel.ALL, i, j, j);
+ assertTombstoneLogs(99 - (COMPACTION_TOMBSTONE_WARN / 2), true);
+ }
+
+ @Test
+ public void ttlTest() throws InterruptedException
+ {
+ for (int i = 0; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?) using ttl 1000"), ConsistencyLevel.ALL, i, j, j);
+ assertTombstoneLogs(0, true);
+ for (int i = 0; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ cluster.coordinator(1).execute(withKeyspace("update %s.tbl using ttl 1 set v = 33 where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+ Thread.sleep(1500);
+ assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN, false);
+ }
+
+ @Test
+ public void noTombstonesLogTest()
+ {
+ for (int i = 0; i < 100; i++)
+ for (int j = 0; j < i; j++)
+ cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?)"), ConsistencyLevel.ALL, i, j, j);
+ assertTombstoneLogs(0, false);
+ }
+
+ private void assertTombstoneLogs(long expectedCount, boolean isRangeTombstones)
+ {
+ long mark = cluster.get(1).logs().mark();
+ cluster.get(1).flush(KEYSPACE);
+ String pattern = ".*Writing (?<tscount>\\d+) tombstones to distributed_test_keyspace/tbl:(?<key>\\d+).*";
+ LogResult<List<String>> res = cluster.get(1).logs().grep(mark, pattern);
+ assertEquals(expectedCount, res.getResult().size());
+ Pattern p = Pattern.compile(pattern);
+ for (String r : res.getResult())
+ {
+ Matcher m = p.matcher(r);
+ assertTrue(m.matches());
+ long tombstoneCount = Integer.parseInt(m.group("tscount"));
+ assertTrue(tombstoneCount > COMPACTION_TOMBSTONE_WARN);
+ assertEquals(r, Integer.parseInt(m.group("key")) * (isRangeTombstones ? 2 : 1), tombstoneCount);
+ }
+
+ mark = cluster.get(1).logs().mark();
+ cluster.get(1).forceCompact(KEYSPACE, "tbl");
+ res = cluster.get(1).logs().grep(mark, pattern);
+ assertEquals(expectedCount, res.getResult().size());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org