You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2021/01/20 22:27:43 UTC

[cassandra] branch trunk updated: Release StreamingTombstoneHistogramBuilder spool when switching writers

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e8f7f5  Release StreamingTombstoneHistogramBuilder spool when switching writers
5e8f7f5 is described below

commit 5e8f7f591dfec5a61d8eb2e9e977ec29f3a2bbe4
Author: Adam Holmberg <ad...@datastax.com>
AuthorDate: Mon Dec 14 14:42:50 2020 -0600

    Release StreamingTombstoneHistogramBuilder spool when switching writers
    
     patch by Adam Holmberg; reviewed by Berenguer Blasi, Mick Semb Wever for CASSANDRA-14834
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java    |   6 -
 .../compaction/writers/CompactionAwareWriter.java  |  11 --
 .../writers/MajorLeveledCompactionWriter.java      |  12 --
 .../compaction/writers/MaxSSTableSizeWriter.java   |  13 --
 .../SplittingSizeTieredCompactionWriter.java       |   2 +-
 .../cassandra/io/sstable/SSTableRewriter.java      |  31 ++---
 .../cassandra/io/sstable/format/SSTableWriter.java |   5 +
 .../io/sstable/metadata/MetadataCollector.java     |   8 ++
 .../StreamingTombstoneHistogramBuilder.java        |  31 +++--
 .../cassandra/io/sstable/SSTableRewriterTest.java  | 139 +++++++++++++++------
 11 files changed, 152 insertions(+), 107 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 1c63268..7699940 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834)
  * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
  * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
  * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 764ad5b..13c9725 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -59,12 +59,6 @@ public class CompactionTask extends AbstractCompactionTask
         this(cfs, txn, gcBefore, false);
     }
 
-    @Deprecated
-    public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline, boolean keepOriginals)
-    {
-        this(cfs, txn, gcBefore, keepOriginals);
-    }
-
     public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean keepOriginals)
     {
         super(cfs, txn);
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index c1ae9ec..d363dcf 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -66,17 +66,6 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     private final List<PartitionPosition> diskBoundaries;
     private int locationIndex;
 
-    @Deprecated
-    public CompactionAwareWriter(ColumnFamilyStore cfs,
-                                 Directories directories,
-                                 LifecycleTransaction txn,
-                                 Set<SSTableReader> nonExpiredSSTables,
-                                 boolean offline,
-                                 boolean keepOriginals)
-    {
-        this(cfs, directories, txn, nonExpiredSSTables, keepOriginals);
-    }
-
     public CompactionAwareWriter(ColumnFamilyStore cfs,
                                  Directories directories,
                                  LifecycleTransaction txn,
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 2b93eb4..1c53600 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -51,18 +51,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
         this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false);
     }
 
-    @Deprecated
-    public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
-                                        Directories directories,
-                                        LifecycleTransaction txn,
-                                        Set<SSTableReader> nonExpiredSSTables,
-                                        long maxSSTableSize,
-                                        boolean offline,
-                                        boolean keepOriginals)
-    {
-        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, keepOriginals);
-    }
-
     @SuppressWarnings("resource")
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
                                         Directories directories,
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index df7eeaf..915f96b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -48,19 +48,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false);
     }
 
-    @Deprecated
-    public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
-                                Directories directories,
-                                LifecycleTransaction txn,
-                                Set<SSTableReader> nonExpiredSSTables,
-                                long maxSSTableSize,
-                                int level,
-                                boolean offline,
-                                boolean keepOriginals)
-    {
-        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, keepOriginals);
-    }
-
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
                                 Directories directories,
                                 LifecycleTransaction txn,
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 7533f1d..d29061c 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -58,7 +58,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
 
     public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
     {
-        super(cfs, directories, txn, nonExpiredSSTables, false, false);
+        super(cfs, directories, txn, nonExpiredSSTables, false);
         this.allSSTables = txn.originals();
         totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
         double[] potentialRatios = new double[20];
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index fb3aa2d..a3d5ae9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -66,6 +66,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
 
     private final List<SSTableWriter> writers = new ArrayList<>();
     private final boolean keepOriginals; // true if we do not want to obsolete the originals
+    private final boolean eagerWriterMetaRelease; // true if the writer metadata should be released when switch is called
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
@@ -74,44 +75,33 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
     private boolean throwEarly, throwLate;
 
     @Deprecated
-    public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline)
-    {
-        this(transaction, maxAge, isOffline, true);
-    }
-    @Deprecated
-    public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly)
+    public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals)
     {
-        this(transaction, maxAge, calculateOpenInterval(shouldOpenEarly), false);
+        this(transaction, maxAge, preemptiveOpenInterval, keepOriginals, false);
     }
 
-    @VisibleForTesting
-    public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals)
+    SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals, boolean eagerWriterMetaRelease)
     {
         this.transaction = transaction;
         this.maxAge = maxAge;
-        this.keepOriginals = keepOriginals;
         this.preemptiveOpenInterval = preemptiveOpenInterval;
-    }
-
-    @Deprecated
-    public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge, boolean isOffline)
-    {
-        return constructKeepingOriginals(transaction, keepOriginals, maxAge);
+        this.keepOriginals = keepOriginals;
+        this.eagerWriterMetaRelease = eagerWriterMetaRelease;
     }
 
     public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
     {
-        return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals);
+        return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals, true);
     }
 
     public static SSTableRewriter constructWithoutEarlyOpening(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
     {
-        return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals);
+        return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals, true);
     }
 
     public static SSTableRewriter construct(ColumnFamilyStore cfs, ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
     {
-        return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(cfs.supportsEarlyOpen()), keepOriginals);
+        return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(cfs.supportsEarlyOpen()), keepOriginals, true);
     }
 
     private static long calculateOpenInterval(boolean shouldOpenEarly)
@@ -303,6 +293,9 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         if (newWriter != null)
             writers.add(newWriter.setMaxDataAge(maxAge));
 
+        if (eagerWriterMetaRelease && writer != null)
+            writer.releaseMetadataOverhead();
+
         if (writer == null || writer.getFilePointer() == 0)
         {
             if (writer != null)
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index f54bc03..1dbfcdb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -325,6 +325,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         return (StatsMetadata) finalizeMetadata().get(MetadataType.STATS);
     }
 
+    public void releaseMetadataOverhead()
+    {
+        metadataCollector.release();
+    }
+
     public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
     {
         for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 65d534e..0d49ea1 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -272,6 +272,14 @@ public class MetadataCollector implements PartitionStatisticsCollector
         return components;
     }
 
+    /**
+     * Release large memory objects while keeping metrics intact
+     */
+    public void release()
+    {
+        estimatedTombstoneDropTime.releaseBuffers();
+    }
+
     private static List<ByteBuffer> makeList(ByteBuffer[] values)
     {
         // In most case, l will be the same size than values, but it's possible for it to be smaller
diff --git a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
index eda88bc..76630ec 100755
--- a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
+++ b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
@@ -40,12 +40,12 @@ import org.apache.cassandra.db.rows.Cell;
  * <ol>
  *     <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li>
  *     <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li>
- *     <li>If point was added and collection size became lorger than maxBinSize:</li>
+ *     <li>If point was added and collection size became larger than maxBinSize:</li>
  * </ol>
  *
  * <ol type="a">
  *     <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li>
- *     <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
+ *     <li>Replace these two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
  * </ol>
  *
  * <p>
@@ -54,7 +54,7 @@ import org.apache.cassandra.db.rows.Cell;
  *     <li>Spool: big map that saves from excessively merging of small bin. This map can contains up to maxSpoolSize points and accumulate weight from same points.
  *     For example, if spoolSize=100, binSize=10 and there are only 50 different points. it will be only 40 merges regardless how many points will be added.</li>
  *     <li>Spool is organized as open-addressing primitive hash map where odd elements are points and event elements are values.
- *     Spool can not resize => when number of collisions became bigger than threashold or size became large that <i>array_size/2</i> Spool is drained to bin</li>
+ *     Spool can not resize => when number of collisions became bigger than threshold or size became large that <i>array_size/2</i> Spool is drained to bin</li>
  *     <li>Bin is organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li>
  *     <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin pairs is packed in one long</li>
  * </ol>
@@ -69,7 +69,7 @@ public class StreamingTombstoneHistogramBuilder
     private final DataHolder bin;
 
     // Keep a second, larger buffer to spool data in, before finalizing it into `bin`
-    private final Spool spool;
+    private Spool spool;
 
     // voluntarily give up resolution for speed
     private final int roundSeconds;
@@ -98,6 +98,7 @@ public class StreamingTombstoneHistogramBuilder
      */
     public void update(int point, int value)
     {
+        assert spool != null: "update is being called after releaseBuffers. This could be functionally okay, but this assertion is a canary to alert about unintended use before it is necessary.";
         point = ceilKey(point, roundSeconds);
 
         if (spool.capacity > 0)
@@ -120,8 +121,22 @@ public class StreamingTombstoneHistogramBuilder
      */
     public void flushHistogram()
     {
-        spool.forEach(this::flushValue);
-        spool.clear();
+        Spool spool = this.spool;
+        if (spool != null)
+        {
+            spool.forEach(this::flushValue);
+            spool.clear();
+        }
+    }
+
+    /**
+     * Release inner spool buffers. Histogram remains readable and writable, but with lesser performance.
+     * Not intended for use before finalization.
+     */
+    public void releaseBuffers()
+    {
+       flushHistogram();
+       spool = null;
     }
 
     private void flushValue(int key, int spoolValue)
@@ -135,7 +150,7 @@ public class StreamingTombstoneHistogramBuilder
     }
 
     /**
-     * Creates a 'finished' snapshot of the current state of the historgram, but leaves this builder instance
+     * Creates a 'finished' snapshot of the current state of the histogram, but leaves this builder instance
      * open for subsequent additions to the histograms. Basically, this allows us to have some degree of sanity
      * wrt sstable early open.
      */
@@ -220,7 +235,7 @@ public class StreamingTombstoneHistogramBuilder
 
         /**
          *  Finds nearest points <i>p1</i> and <i>p2</i> in the collection
-         *  Replaces theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)
+         *  Replaces these two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)
          */
         @VisibleForTesting
         void mergeNearestPoints()
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 7c47c8b..1895653 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -19,24 +19,21 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Test;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.UpdateBuilder;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.SerializationHeader;
@@ -49,25 +46,27 @@ import org.apache.cassandra.db.compaction.CompactionIterator;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.SSTableSplitter;
 import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class SSTableRewriterTest extends SSTableWriterTestBase
 {
     @Test
-    public void basicTest() throws InterruptedException
+    public void basicTest()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -106,7 +105,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         truncate(cfs);
     }
     @Test
-    public void basicTest2() throws InterruptedException
+    public void basicTest2()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -120,7 +119,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         int nowInSec = FBUtilities.nowInSeconds();
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
-             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
+             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true);
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
@@ -138,7 +137,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void getPositionsTest() throws InterruptedException
+    public void getPositionsTest()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -153,7 +152,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         boolean checked = false;
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
-             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
+             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true);
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
@@ -194,7 +193,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void testNumberOfFilesAndSizes() throws Exception
+    public void testNumberOfFilesAndSizes()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -211,7 +210,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -250,7 +249,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void testNumberOfFiles_dont_clean_readers() throws Exception
+    public void testNumberOfFiles_dont_clean_readers()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -266,7 +265,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -401,7 +400,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                         LifecycleTransaction txn);
     }
 
-    private void testNumberOfFiles_abort(RewriterTest test) throws Exception
+    private void testNumberOfFiles_abort(RewriterTest test)
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -417,7 +416,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false))
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
             test.run(scanner, controller, s, cfs, rewriter, txn);
@@ -434,7 +433,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void testNumberOfFiles_finish_empty_new_writer() throws Exception
+    public void testNumberOfFiles_finish_empty_new_writer()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -449,7 +448,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -479,7 +478,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void testNumberOfFiles_truncate() throws Exception
+    public void testNumberOfFiles_truncate()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -495,7 +494,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -510,7 +509,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                 }
             }
 
-            sstables = rewriter.finish();
+            rewriter.finish();
         }
 
         LifecycleTransaction.waitForDeletions();
@@ -519,7 +518,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void testSmallFiles() throws Exception
+    public void testSmallFiles()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -535,7 +534,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, true);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -561,7 +560,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void testSSTableSplit() throws InterruptedException
+    public void testSSTableSplit()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -603,12 +602,12 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void testAbort2() throws Exception
+    public void testAbort2()
     {
         testAbortHelper(true, false);
     }
 
-    private void testAbortHelper(boolean earlyException, boolean offline) throws Exception
+    private void testAbortHelper(boolean earlyException, boolean offline)
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -621,7 +620,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN, compacting)
                                        : cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 100, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 100, 10000000, false, true);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
@@ -711,7 +710,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1, false, true);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
@@ -735,7 +734,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
     }
 
     @Test
-    public void testCanonicalView() throws Exception
+    public void testCanonicalView()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -749,7 +748,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = sstables.iterator().next().getScanner();
              CompactionController controller = new CompactionController(cfs, sstables, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
-             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
+             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
@@ -775,11 +774,9 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
 
     /**
      * emulates anticompaction - writing from one source sstable to two new sstables
-     *
-     * @throws IOException
      */
     @Test
-    public void testTwoWriters() throws IOException
+    public void testTwoWriters()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -853,6 +850,74 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
 
     }
 
+    /**
+     * tests SSTableRewriter ctor arg controlling whether writers metadata buffers are released.
+     * Verifies that writers trip an assert when updated after cleared on switch
+     *
+     * CASSANDRA-14834
+     */
+    @Test
+    public void testWriterClearing()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+
+        // Can't update a writer that is eagerly cleared on switch
+        boolean eagerWriterMetaRelease = true;
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(new HashSet<>(), OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, eagerWriterMetaRelease)
+        )
+        {
+            SSTableWriter firstWriter = getWriter(cfs, dir, txn);
+            rewriter.switchWriter(firstWriter);
+            rewriter.switchWriter(getWriter(cfs, dir, txn));
+            try
+            {
+                UnfilteredRowIterator uri = mock(UnfilteredRowIterator.class);
+                when(uri.partitionLevelDeletion()).thenReturn(new DeletionTime(0,0));
+                when(uri.partitionKey()).thenReturn(bopKeyFromInt(0));
+                // should not be able to append after buffer release on switch
+                firstWriter.append(uri);
+                fail("Expected AssertionError was not thrown.");
+            }
+            catch(AssertionError ae) {
+                if (!ae.getMessage().contains("update is being called after releaseBuffers"))
+                    throw ae;
+            }
+        }
+
+        // Can update a writer that is not eagerly cleared on switch
+        eagerWriterMetaRelease = false;
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(new HashSet<>(), OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, eagerWriterMetaRelease)
+        )
+        {
+            SSTableWriter firstWriter = getWriter(cfs, dir, txn);
+            rewriter.switchWriter(firstWriter);
+
+            // At least one write so it's not aborted when switched out.
+            UnfilteredRowIterator uri = mock(UnfilteredRowIterator.class);
+            when(uri.partitionLevelDeletion()).thenReturn(new DeletionTime(0,0));
+            when(uri.partitionKey()).thenReturn(bopKeyFromInt(0));
+            rewriter.append(uri);
+
+            rewriter.switchWriter(getWriter(cfs, dir, txn));
+
+            // should be able to append after switch, and assert is not tripped
+            when(uri.partitionKey()).thenReturn(bopKeyFromInt(1));
+            firstWriter.append(uri);
+        }
+    }
+
+    static DecoratedKey bopKeyFromInt(int i)
+    {
+        ByteBuffer bb = ByteBuffer.allocate(4);
+        bb.putInt(i);
+        bb.rewind();
+        return ByteOrderedPartitioner.instance.decorateKey(bb);
+    }
+
     private void validateKeys(Keyspace ks)
     {
         for (int i = 0; i < 100; i++)
@@ -865,10 +930,10 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
 
     public static SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
-        return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null);
+        return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100), null);
     }
 
-    public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount, int partitionCount, int cellCount, int cellSize)
+    public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount, int partitionCount, int cellCount)
     {
         int i = 0;
         Set<SSTableReader> result = new LinkedHashSet<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org