You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/07/02 08:07:59 UTC

[cassandra] branch trunk updated (d5e8e81 -> 296f65e)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from d5e8e81  Merge branch 'cassandra-4.0' into trunk
     new 3f6f7be  Display bytes per level in tablestats for LCS tables
     new 296f65e  Log when compacting many tombstones

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   2 +
 conf/cassandra.yaml                                |   3 +
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 ++
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   5 +
 .../cassandra/db/ColumnFamilyStoreMBean.java       |   6 +
 .../db/compaction/CompactionStrategyManager.java   |  38 ++++++
 .../db/compaction/LeveledCompactionStrategy.java   |   5 +
 .../db/compaction/LeveledGenerations.java          |   8 ++
 .../cassandra/db/compaction/LeveledManifest.java   |   5 +
 .../io/sstable/format/big/BigTableWriter.java      |  10 ++
 .../io/sstable/metadata/MetadataCollector.java     |  14 ++
 .../apache/cassandra/service/StorageService.java   |  13 ++
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 .../cassandra/tools/nodetool/stats/StatsTable.java |   1 +
 .../tools/nodetool/stats/TableStatsHolder.java     |  12 ++
 .../tools/nodetool/stats/TableStatsPrinter.java    |   6 +-
 .../cassandra/distributed/impl/Instance.java       |   6 +-
 .../distributed/test/TombstoneWarningTest.java     | 144 +++++++++++++++++++++
 .../compaction/LeveledCompactionStrategyTest.java  |  47 +++++++
 20 files changed, 337 insertions(+), 2 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Display bytes per level in tablestats for LCS tables

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 3f6f7be63faf2d07b274caf25a2fa1dcbde14187
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Jun 30 16:34:09 2021 +0200

    Display bytes per level in tablestats for LCS tables
    
    Patch by marcuse; reviewed by Brandon Williams for CASSANDRA-16779
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  5 +++
 .../cassandra/db/ColumnFamilyStoreMBean.java       |  6 +++
 .../db/compaction/CompactionStrategyManager.java   | 38 +++++++++++++++++
 .../db/compaction/LeveledCompactionStrategy.java   |  5 +++
 .../db/compaction/LeveledGenerations.java          |  8 ++++
 .../cassandra/db/compaction/LeveledManifest.java   |  5 +++
 .../cassandra/tools/nodetool/stats/StatsTable.java |  1 +
 .../tools/nodetool/stats/TableStatsHolder.java     | 12 ++++++
 .../tools/nodetool/stats/TableStatsPrinter.java    |  6 ++-
 .../compaction/LeveledCompactionStrategyTest.java  | 47 ++++++++++++++++++++++
 11 files changed, 133 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index dcfacb4..d2a0cff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
  * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701)
  * Add a system property to set hostId if not yet initialized (CASSANDRA-14582)
  * GossiperTest.testHasVersion3Nodes didn't take into account trunk version changes, fixed to rely on latest version (CASSANDRA-16651)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 113a916..df9f763 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2638,6 +2638,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return compactionStrategyManager.getSSTableCountPerLevel();
     }
 
+    public long[] getPerLevelSizeBytes()
+    {
+        return compactionStrategyManager.getPerLevelSizeBytes();
+    }
+
     public int getLevelFanoutSize()
     {
         return compactionStrategyManager.getLevelFanoutSize();
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 0360e34..f06b65a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -203,6 +203,12 @@ public interface ColumnFamilyStoreMBean
     public int[] getSSTableCountPerLevel();
 
     /**
+     * @return total size on disk for each level. null unless leveled compaction is used.
+     *         array index corresponds to level(int[0] is for level 0, ...).
+     */
+    public long[] getPerLevelSizeBytes();
+
+    /**
      * @return sstable fanout size for level compaction strategy.
      */
     public int getLevelFanoutSize();
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index deece30..d7d3ba5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -587,6 +587,29 @@ public class CompactionStrategyManager implements INotificationConsumer
         return null;
     }
 
+    public long[] getPerLevelSizeBytes()
+    {
+        readLock.lock();
+        try
+        {
+            if (repaired.first() instanceof LeveledCompactionStrategy)
+            {
+                long [] res = new long[LeveledGenerations.MAX_LEVEL_COUNT];
+                for (AbstractCompactionStrategy strategy : getAllStrategies())
+                {
+                    long[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSizeBytes();
+                    res = sumArrays(res, repairedCountPerLevel);
+                }
+                return res;
+            }
+            return null;
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+    }
+
     static int[] sumArrays(int[] a, int[] b)
     {
         int[] res = new int[Math.max(a.length, b.length)];
@@ -602,6 +625,21 @@ public class CompactionStrategyManager implements INotificationConsumer
         return res;
     }
 
+    static long[] sumArrays(long[] a, long[] b)
+    {
+        long[] res = new long[Math.max(a.length, b.length)];
+        for (int i = 0; i < res.length; i++)
+        {
+            if (i < a.length && i < b.length)
+                res[i] = a[i] + b[i];
+            else if (i < a.length)
+                res[i] = a[i];
+            else
+                res[i] = b[i];
+        }
+        return res;
+    }
+
     /**
      * Should only be called holding the readLock
      */
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index dd7c9df..6faca63 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -109,6 +109,11 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         return manifest.getAllLevelSize();
     }
 
+    public long[] getAllLevelSizeBytes()
+    {
+        return manifest.getAllLevelSizeBytes();
+    }
+
     @Override
     public void startup()
     {
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
index 64027f2..2cc1b1a 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
@@ -226,6 +226,14 @@ class LeveledGenerations
         return counts;
     }
 
+    long[] getAllLevelSizeBytes()
+    {
+        long[] sums = new long[levelCount()];
+        for (int i = 0; i < sums.length; i++)
+            sums[i] = get(i).stream().map(SSTableReader::onDiskLength).reduce(0L, Long::sum);
+        return sums;
+    }
+
     Set<SSTableReader> allSSTables()
     {
         ImmutableSet.Builder<SSTableReader> builder = ImmutableSet.builder();
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 7c865c7..8b636fc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -395,6 +395,11 @@ public class LeveledManifest
         return generations.getAllLevelSize();
     }
 
+    public synchronized long[] getAllLevelSizeBytes()
+    {
+        return generations.getAllLevelSizeBytes();
+    }
+
     @VisibleForTesting
     public synchronized int remove(SSTableReader reader)
     {
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
index d897eab..6f5d904 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
@@ -69,5 +69,6 @@ public class StatsTable
     public long maximumTombstonesPerSliceLastFiveMinutes;
     public String droppedMutations;
     public List<String> sstablesInEachLevel = new ArrayList<>();
+    public List<String> sstableBytesInEachLevel = new ArrayList<>();
     public Boolean isInCorrectLocation = null; // null: option not active
 }
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
index b1685e6..b66c802 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
@@ -118,6 +118,7 @@ public class TableStatsHolder implements StatsHolder
     {
         Map<String, Object> mpTable = new HashMap<>();
         mpTable.put("sstables_in_each_level", table.sstablesInEachLevel);
+        mpTable.put("sstable_bytes_in_each_level", table.sstableBytesInEachLevel);
         mpTable.put("space_used_live", table.spaceUsedLive);
         mpTable.put("space_used_total", table.spaceUsedTotal);
         mpTable.put("space_used_by_snapshots_total", table.spaceUsedBySnapshotsTotal);
@@ -235,6 +236,17 @@ public class TableStatsHolder implements StatsHolder
                     }
                 }
 
+                long[] leveledSSTablesBytes = table.getPerLevelSizeBytes();
+                if (leveledSSTablesBytes != null)
+                {
+                    statsTable.isLeveledSstable = true;
+                    for (int level = 0; level < leveledSSTablesBytes.length; level++)
+                    {
+                        long size = leveledSSTablesBytes[level];
+                        statsTable.sstableBytesInEachLevel.add(format(size, humanReadable));
+                    }
+                }
+
                 if (locationCheck)
                     statsTable.isInCorrectLocation = !table.hasMisplacedSSTables();
 
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
index f4188e4..50a22d6 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
@@ -79,8 +79,12 @@ public class TableStatsPrinter<T extends StatsHolder>
             out.println(indent + "SSTable count: " + table.sstableCount);
             out.println(indent + "Old SSTable count: " + table.oldSSTableCount);
             if (table.isLeveledSstable)
+            {
                 out.println(indent + "SSTables in each level: [" + String.join(", ",
-                                                                          table.sstablesInEachLevel) + "]");
+                                                                               table.sstablesInEachLevel) + "]");
+                out.println(indent + "SSTable bytes in each level: [" + String.join(", ",
+                                                                                    table.sstableBytesInEachLevel) + "]");
+            }
 
             out.println(indent + "Space used (live): " + table.spaceUsedLive);
             out.println(indent + "Space used (total): " + table.spaceUsedTotal);
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 2d6835b..68a9ac8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -771,6 +771,53 @@ public class LeveledCompactionStrategyTest
         }
         return newLevels;
     }
+    @Test
+    public void testPerLevelSizeBytes() throws IOException
+    {
+        byte [] b = new byte[100];
+        new Random().nextBytes(b);
+        ByteBuffer value = ByteBuffer.wrap(b);
+        int rows = 5;
+        int columns = 5;
+
+        cfs.disableAutoCompaction();
+        for (int r = 0; r < rows; r++)
+        {
+            UpdateBuilder update = UpdateBuilder.create(cfs.metadata(), String.valueOf(r));
+            for (int c = 0; c < columns; c++)
+                update.newRow("column" + c).add("val", value);
+            update.applyUnsafe();
+        }
+        cfs.forceBlockingFlush();
+
+        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+        long [] levelSizes = cfs.getPerLevelSizeBytes();
+        for (int i = 0; i < levelSizes.length; i++)
+        {
+            if (i != 0)
+                assertEquals(0, levelSizes[i]);
+            else
+                assertEquals(sstable.onDiskLength(), levelSizes[i]);
+        }
+
+        assertEquals(sstable.onDiskLength(), cfs.getPerLevelSizeBytes()[0]);
+
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0);
+        strategy.manifest.remove(sstable);
+        sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2);
+        sstable.reloadSSTableMetadata();
+        strategy.manifest.addSSTables(Collections.singleton(sstable));
+
+        levelSizes = cfs.getPerLevelSizeBytes();
+        for (int i = 0; i < levelSizes.length; i++)
+        {
+            if (i != 2)
+                assertEquals(0, levelSizes[i]);
+            else
+                assertEquals(sstable.onDiskLength(), levelSizes[i]);
+        }
+
+    }
 
     /**
      * brute-force checks if the new sstables can be added to the correct level in manifest

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Log when compacting many tombstones

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 296f65e8d1c25f31a87481843d715f5b7dad9d7b
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Jun 30 16:15:17 2021 +0200

    Log when compacting many tombstones
    
    Patch by marcuse; reviewed by Brandon Williams for CASSANDRA-16780
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   3 +
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 ++
 .../io/sstable/format/big/BigTableWriter.java      |  10 ++
 .../io/sstable/metadata/MetadataCollector.java     |  14 ++
 .../apache/cassandra/service/StorageService.java   |  13 ++
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 .../cassandra/distributed/impl/Instance.java       |   6 +-
 .../distributed/test/TombstoneWarningTest.java     | 144 +++++++++++++++++++++
 10 files changed, 204 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d2a0cff..6efd0c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Log when compacting many tombstones (CASSANDRA-16780)
  * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
  * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701)
  * Add a system property to set hostId if not yet initialized (CASSANDRA-14582)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 46d94d9..852945f 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1288,6 +1288,9 @@ unlogged_batch_across_partitions_warn_threshold: 10
 # Log a warning when compacting partitions larger than this value
 compaction_large_partition_warning_threshold_mb: 100
 
+# Log a warning when writing more tombstones than this value to a partition
+compaction_tombstone_warning_threshold: 100000
+
 # GC Pauses greater than 200 ms will be logged at INFO level
 # This threshold can be adjusted to minimize logging if necessary
 # gc_log_threshold_in_ms: 200
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index ae3e27e..96f047e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -219,6 +219,7 @@ public class Config
     public volatile int compaction_throughput_mb_per_sec = 16;
     public volatile int compaction_large_partition_warning_threshold_mb = 100;
     public int min_free_space_per_drive_in_mb = 50;
+    public volatile Integer compaction_tombstone_warning_threshold = 100000;
 
     public volatile int concurrent_materialized_view_builders = 1;
     public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 00ef887..86448c7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1801,6 +1801,16 @@ public class DatabaseDescriptor
 
     public static long getCompactionLargePartitionWarningThreshold() { return ByteUnit.MEBI_BYTES.toBytes(conf.compaction_large_partition_warning_threshold_mb); }
 
+    public static int getCompactionTombstoneWarningThreshold()
+    {
+        return conf.compaction_tombstone_warning_threshold;
+    }
+
+    public static void setCompactionTombstoneWarningThreshold(int count)
+    {
+        conf.compaction_tombstone_warning_threshold = count;
+    }
+
     public static int getConcurrentValidations()
     {
         return conf.concurrent_validations;
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index eeb9153..4607d99 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -231,6 +231,7 @@ public class BigTableWriter extends SSTableWriter
             long endPosition = dataFile.position();
             long rowSize = endPosition - startPosition;
             maybeLogLargePartitionWarning(key, rowSize);
+            maybeLogManyTombstonesWarning(key, metadataCollector.totalTombstones);
             metadataCollector.addPartitionSizeInBytes(rowSize);
             afterAppend(key, endPosition, entry, columnIndexWriter.buffer());
             return entry;
@@ -259,6 +260,15 @@ public class BigTableWriter extends SSTableWriter
         }
     }
 
+    private void maybeLogManyTombstonesWarning(DecoratedKey key, int tombstoneCount)
+    {
+        if (tombstoneCount > DatabaseDescriptor.getCompactionTombstoneWarningThreshold())
+        {
+            String keyString = metadata().partitionKeyType.getString(key.getKey());
+            logger.warn("Writing {} tombstones to {}/{}:{} in sstable {}", tombstoneCount, metadata.keyspace, metadata.name, keyString, getFilename());
+        }
+    }
+
     private static class StatsCollector extends Transformation
     {
         private final MetadataCollector collector;
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index be824ef..1e2d121 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.utils.streamhist.TombstoneHistogram;
 import org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder;
@@ -105,6 +106,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
     protected boolean hasLegacyCounterShards = false;
     protected long totalColumnsSet;
     protected long totalRows;
+    public int totalTombstones;
 
     /**
      * Default cardinality estimation method is to use HyperLogLog++.
@@ -114,6 +116,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
      */
     protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
     private final ClusteringComparator comparator;
+    private final int nowInSec = FBUtilities.nowInSeconds();
 
     private final UUID originatingHostId;
 
@@ -149,6 +152,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
     {
         long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0);
         cardinality.offerHashed(hashed);
+        totalTombstones = 0;
         return this;
     }
 
@@ -182,6 +186,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
         updateTimestamp(newInfo.timestamp());
         updateTTL(newInfo.ttl());
         updateLocalDeletionTime(newInfo.localExpirationTime());
+        if (!newInfo.isLive(nowInSec))
+            updateTombstoneCount();
     }
 
     public void update(Cell<?> cell)
@@ -189,6 +195,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
         updateTimestamp(cell.timestamp());
         updateTTL(cell.ttl());
         updateLocalDeletionTime(cell.localDeletionTime());
+        if (!cell.isLive(nowInSec))
+            updateTombstoneCount();
     }
 
     public void update(DeletionTime dt)
@@ -197,6 +205,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
         {
             updateTimestamp(dt.markedForDeleteAt());
             updateLocalDeletionTime(dt.localDeletionTime());
+            updateTombstoneCount();
         }
     }
 
@@ -218,6 +227,11 @@ public class MetadataCollector implements PartitionStatisticsCollector
             estimatedTombstoneDropTime.update(newLocalDeletionTime);
     }
 
+    private void updateTombstoneCount()
+    {
+        ++totalTombstones;
+    }
+
     private void updateTTL(int newTTL)
     {
         ttlTracker.update(newTTL);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a4a9f9b..60cb739 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5977,4 +5977,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.info("Changing keyspace count warn threshold from {} to {}", getKeyspaceCountWarnThreshold(), value);
         DatabaseDescriptor.setKeyspaceCountWarnThreshold(value);
     }
+
+    public void setCompactionTombstoneWarningThreshold(int count)
+    {
+        if (count < 0)
+            throw new IllegalStateException("compaction tombstone warning threshold needs to be >= 0, not "+count);
+        logger.info("Setting compaction_tombstone_warning_threshold to {}", count);
+        DatabaseDescriptor.setCompactionTombstoneWarningThreshold(count);
+    }
+
+    public int getCompactionTombstoneWarningThreshold()
+    {
+        return DatabaseDescriptor.getCompactionTombstoneWarningThreshold();
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index cc69fec..a5a6607 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -856,4 +856,7 @@ public interface StorageServiceMBean extends NotificationEmitter
     void setTableCountWarnThreshold(int value);
     int getKeyspaceCountWarnThreshold();
     void setKeyspaceCountWarnThreshold(int value);
+
+    public void setCompactionTombstoneWarningThreshold(int count);
+    public int getCompactionTombstoneWarningThreshold();
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index d772d51..a58f2db 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -185,7 +185,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         String suite = System.getProperty("suitename", "suitename_IS_UNDEFINED");
         String clusterId = ClusterIDDefiner.getId();
         String instanceId = InstanceIDDefiner.getInstanceId();
-        return new FileLogAction(new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId)));
+        File f = new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId));
+        // when creating a cluster globally in a test class we get the logs without the suite, try finding those logs:
+        if (!f.exists())
+            f = new File(String.format("build/test/logs/%s/%s/%s/system.log", tag, clusterId, instanceId));
+        return new FileLogAction(f);
     }
 
     @Override
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java
new file mode 100644
index 0000000..9406432
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TombstoneWarningTest extends TestBaseImpl
+{
+    private static final int COMPACTION_TOMBSTONE_WARN = 75;
+    private static final ICluster<IInvokableInstance> cluster;
+
+    static
+    {
+        try
+        {
+            Cluster.Builder builder = Cluster.build(3);
+            builder.withConfig(c -> c.set("compaction_tombstone_warning_threshold", COMPACTION_TOMBSTONE_WARN));
+            cluster = builder.createWithoutStarting();
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        cluster.startup();
+    }
+
+    @Before
+    public void setup()
+    {
+        cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+        init(cluster);
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+    }
+
+    @Test
+    public void regularTombstonesLogTest()
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("update %s.tbl set v = null where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+        assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN , false);
+    }
+
+    @Test
+    public void rowTombstonesLogTest()
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+        assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN , false);
+    }
+
+    @Test
+    public void rangeTombstonesLogTest()
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where pk = ? and ck >= ? and ck <= ?"), ConsistencyLevel.ALL, i, j, j);
+        assertTombstoneLogs(99 - (COMPACTION_TOMBSTONE_WARN / 2), true);
+    }
+
+    @Test
+    public void ttlTest() throws InterruptedException
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?) using ttl 1000"), ConsistencyLevel.ALL, i, j, j);
+        assertTombstoneLogs(0, true);
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("update %s.tbl using ttl 1 set v = 33 where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j);
+        Thread.sleep(1500);
+        assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN, false);
+    }
+
+    @Test
+    public void noTombstonesLogTest()
+    {
+        for (int i = 0; i < 100; i++)
+            for (int j = 0; j < i; j++)
+                cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?)"), ConsistencyLevel.ALL, i, j, j);
+        assertTombstoneLogs(0, false);
+    }
+
+    private void assertTombstoneLogs(long expectedCount, boolean isRangeTombstones)
+    {
+        long mark = cluster.get(1).logs().mark();
+        cluster.get(1).flush(KEYSPACE);
+        String pattern = ".*Writing (?<tscount>\\d+) tombstones to distributed_test_keyspace/tbl:(?<key>\\d+).*";
+        LogResult<List<String>> res = cluster.get(1).logs().grep(mark, pattern);
+        assertEquals(expectedCount, res.getResult().size());
+        Pattern p = Pattern.compile(pattern);
+        for (String r : res.getResult())
+        {
+            Matcher m = p.matcher(r);
+            assertTrue(m.matches());
+            long tombstoneCount = Integer.parseInt(m.group("tscount"));
+            assertTrue(tombstoneCount > COMPACTION_TOMBSTONE_WARN);
+            assertEquals(r, Integer.parseInt(m.group("key")) * (isRangeTombstones ? 2 : 1), tombstoneCount);
+        }
+
+        mark = cluster.get(1).logs().mark();
+        cluster.get(1).forceCompact(KEYSPACE, "tbl");
+        res = cluster.get(1).logs().grep(mark, pattern);
+        assertEquals(expectedCount, res.getResult().size());
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org