You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2021/01/20 22:27:43 UTC
[cassandra] branch trunk updated: Release
StreamingTombstoneHistogramBuilder spool when switching writers
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5e8f7f5 Release StreamingTombstoneHistogramBuilder spool when switching writers
5e8f7f5 is described below
commit 5e8f7f591dfec5a61d8eb2e9e977ec29f3a2bbe4
Author: Adam Holmberg <ad...@datastax.com>
AuthorDate: Mon Dec 14 14:42:50 2020 -0600
Release StreamingTombstoneHistogramBuilder spool when switching writers
patch by Adam Holmberg; reviewed by Berenguer Blasi, Mick Semb Wever for CASSANDRA-14834
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionTask.java | 6 -
.../compaction/writers/CompactionAwareWriter.java | 11 --
.../writers/MajorLeveledCompactionWriter.java | 12 --
.../compaction/writers/MaxSSTableSizeWriter.java | 13 --
.../SplittingSizeTieredCompactionWriter.java | 2 +-
.../cassandra/io/sstable/SSTableRewriter.java | 31 ++---
.../cassandra/io/sstable/format/SSTableWriter.java | 5 +
.../io/sstable/metadata/MetadataCollector.java | 8 ++
.../StreamingTombstoneHistogramBuilder.java | 31 +++--
.../cassandra/io/sstable/SSTableRewriterTest.java | 139 +++++++++++++++------
11 files changed, 152 insertions(+), 107 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 1c63268..7699940 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834)
* Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
* Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
* Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 764ad5b..13c9725 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -59,12 +59,6 @@ public class CompactionTask extends AbstractCompactionTask
this(cfs, txn, gcBefore, false);
}
- @Deprecated
- public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline, boolean keepOriginals)
- {
- this(cfs, txn, gcBefore, keepOriginals);
- }
-
public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean keepOriginals)
{
super(cfs, txn);
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index c1ae9ec..d363dcf 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -66,17 +66,6 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
private final List<PartitionPosition> diskBoundaries;
private int locationIndex;
- @Deprecated
- public CompactionAwareWriter(ColumnFamilyStore cfs,
- Directories directories,
- LifecycleTransaction txn,
- Set<SSTableReader> nonExpiredSSTables,
- boolean offline,
- boolean keepOriginals)
- {
- this(cfs, directories, txn, nonExpiredSSTables, keepOriginals);
- }
-
public CompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
LifecycleTransaction txn,
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 2b93eb4..1c53600 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -51,18 +51,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false);
}
- @Deprecated
- public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
- Directories directories,
- LifecycleTransaction txn,
- Set<SSTableReader> nonExpiredSSTables,
- long maxSSTableSize,
- boolean offline,
- boolean keepOriginals)
- {
- this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, keepOriginals);
- }
-
@SuppressWarnings("resource")
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
Directories directories,
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index df7eeaf..915f96b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -48,19 +48,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false);
}
- @Deprecated
- public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
- Directories directories,
- LifecycleTransaction txn,
- Set<SSTableReader> nonExpiredSSTables,
- long maxSSTableSize,
- int level,
- boolean offline,
- boolean keepOriginals)
- {
- this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, keepOriginals);
- }
-
public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
Directories directories,
LifecycleTransaction txn,
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 7533f1d..d29061c 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -58,7 +58,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
{
- super(cfs, directories, txn, nonExpiredSSTables, false, false);
+ super(cfs, directories, txn, nonExpiredSSTables, false);
this.allSSTables = txn.originals();
totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
double[] potentialRatios = new double[20];
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index fb3aa2d..a3d5ae9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -66,6 +66,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
private final List<SSTableWriter> writers = new ArrayList<>();
private final boolean keepOriginals; // true if we do not want to obsolete the originals
+ private final boolean eagerWriterMetaRelease; // true if the writer metadata should be released when switch is called
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
@@ -74,44 +75,33 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
private boolean throwEarly, throwLate;
@Deprecated
- public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline)
- {
- this(transaction, maxAge, isOffline, true);
- }
- @Deprecated
- public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly)
+ public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals)
{
- this(transaction, maxAge, calculateOpenInterval(shouldOpenEarly), false);
+ this(transaction, maxAge, preemptiveOpenInterval, keepOriginals, false);
}
- @VisibleForTesting
- public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals)
+ SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals, boolean eagerWriterMetaRelease)
{
this.transaction = transaction;
this.maxAge = maxAge;
- this.keepOriginals = keepOriginals;
this.preemptiveOpenInterval = preemptiveOpenInterval;
- }
-
- @Deprecated
- public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge, boolean isOffline)
- {
- return constructKeepingOriginals(transaction, keepOriginals, maxAge);
+ this.keepOriginals = keepOriginals;
+ this.eagerWriterMetaRelease = eagerWriterMetaRelease;
}
public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
{
- return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals);
+ return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals, true);
}
public static SSTableRewriter constructWithoutEarlyOpening(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
{
- return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals);
+ return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals, true);
}
public static SSTableRewriter construct(ColumnFamilyStore cfs, ILifecycleTransaction transaction, boolean keepOriginals, long maxAge)
{
- return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(cfs.supportsEarlyOpen()), keepOriginals);
+ return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(cfs.supportsEarlyOpen()), keepOriginals, true);
}
private static long calculateOpenInterval(boolean shouldOpenEarly)
@@ -303,6 +293,9 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
if (newWriter != null)
writers.add(newWriter.setMaxDataAge(maxAge));
+ if (eagerWriterMetaRelease && writer != null)
+ writer.releaseMetadataOverhead();
+
if (writer == null || writer.getFilePointer() == 0)
{
if (writer != null)
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index f54bc03..1dbfcdb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -325,6 +325,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional
return (StatsMetadata) finalizeMetadata().get(MetadataType.STATS);
}
+ public void releaseMetadataOverhead()
+ {
+ metadataCollector.release();
+ }
+
public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
{
for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 65d534e..0d49ea1 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -272,6 +272,14 @@ public class MetadataCollector implements PartitionStatisticsCollector
return components;
}
+ /**
+ * Release large memory objects while keeping metrics intact
+ */
+ public void release()
+ {
+ estimatedTombstoneDropTime.releaseBuffers();
+ }
+
private static List<ByteBuffer> makeList(ByteBuffer[] values)
{
// In most case, l will be the same size than values, but it's possible for it to be smaller
diff --git a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
index eda88bc..76630ec 100755
--- a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
+++ b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
@@ -40,12 +40,12 @@ import org.apache.cassandra.db.rows.Cell;
* <ol>
* <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li>
* <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li>
- * <li>If point was added and collection size became lorger than maxBinSize:</li>
+ * <li>If point was added and collection size became larger than maxBinSize:</li>
* </ol>
*
* <ol type="a">
* <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li>
- * <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
+ * <li>Replace these two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
* </ol>
*
* <p>
@@ -54,7 +54,7 @@ import org.apache.cassandra.db.rows.Cell;
* <li>Spool: big map that saves from excessively merging of small bin. This map can contains up to maxSpoolSize points and accumulate weight from same points.
* For example, if spoolSize=100, binSize=10 and there are only 50 different points. it will be only 40 merges regardless how many points will be added.</li>
* <li>Spool is organized as open-addressing primitive hash map where odd elements are points and event elements are values.
- * Spool can not resize => when number of collisions became bigger than threashold or size became large that <i>array_size/2</i> Spool is drained to bin</li>
+ * Spool can not resize => when number of collisions became bigger than threshold or size became large that <i>array_size/2</i> Spool is drained to bin</li>
* <li>Bin is organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li>
* <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin pairs is packed in one long</li>
* </ol>
@@ -69,7 +69,7 @@ public class StreamingTombstoneHistogramBuilder
private final DataHolder bin;
// Keep a second, larger buffer to spool data in, before finalizing it into `bin`
- private final Spool spool;
+ private Spool spool;
// voluntarily give up resolution for speed
private final int roundSeconds;
@@ -98,6 +98,7 @@ public class StreamingTombstoneHistogramBuilder
*/
public void update(int point, int value)
{
+ assert spool != null: "update is being called after releaseBuffers. This could be functionally okay, but this assertion is a canary to alert about unintended use before it is necessary.";
point = ceilKey(point, roundSeconds);
if (spool.capacity > 0)
@@ -120,8 +121,22 @@ public class StreamingTombstoneHistogramBuilder
*/
public void flushHistogram()
{
- spool.forEach(this::flushValue);
- spool.clear();
+ Spool spool = this.spool;
+ if (spool != null)
+ {
+ spool.forEach(this::flushValue);
+ spool.clear();
+ }
+ }
+
+ /**
+ * Release inner spool buffers. Histogram remains readable and writable, but with lesser performance.
+ * Not intended for use before finalization.
+ */
+ public void releaseBuffers()
+ {
+ flushHistogram();
+ spool = null;
}
private void flushValue(int key, int spoolValue)
@@ -135,7 +150,7 @@ public class StreamingTombstoneHistogramBuilder
}
/**
- * Creates a 'finished' snapshot of the current state of the historgram, but leaves this builder instance
+ * Creates a 'finished' snapshot of the current state of the histogram, but leaves this builder instance
* open for subsequent additions to the histograms. Basically, this allows us to have some degree of sanity
* wrt sstable early open.
*/
@@ -220,7 +235,7 @@ public class StreamingTombstoneHistogramBuilder
/**
* Finds nearest points <i>p1</i> and <i>p2</i> in the collection
- * Replaces theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)
+ * Replaces these two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)
*/
@VisibleForTesting
void mergeNearestPoints()
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 7c47c8b..1895653 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -19,24 +19,21 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.SerializationHeader;
@@ -49,25 +46,27 @@ import org.apache.cassandra.db.compaction.CompactionIterator;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.SSTableSplitter;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class SSTableRewriterTest extends SSTableWriterTestBase
{
@Test
- public void basicTest() throws InterruptedException
+ public void basicTest()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -106,7 +105,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
truncate(cfs);
}
@Test
- public void basicTest2() throws InterruptedException
+ public void basicTest2()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -120,7 +119,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
int nowInSec = FBUtilities.nowInSeconds();
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
+ SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
{
@@ -138,7 +137,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void getPositionsTest() throws InterruptedException
+ public void getPositionsTest()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -153,7 +152,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
boolean checked = false;
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
+ SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
{
@@ -194,7 +193,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void testNumberOfFilesAndSizes() throws Exception
+ public void testNumberOfFilesAndSizes()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -211,7 +210,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -250,7 +249,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void testNumberOfFiles_dont_clean_readers() throws Exception
+ public void testNumberOfFiles_dont_clean_readers()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -266,7 +265,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -401,7 +400,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
LifecycleTransaction txn);
}
- private void testNumberOfFiles_abort(RewriterTest test) throws Exception
+ private void testNumberOfFiles_abort(RewriterTest test)
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -417,7 +416,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false))
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
test.run(scanner, controller, s, cfs, rewriter, txn);
@@ -434,7 +433,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void testNumberOfFiles_finish_empty_new_writer() throws Exception
+ public void testNumberOfFiles_finish_empty_new_writer()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -449,7 +448,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -479,7 +478,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void testNumberOfFiles_truncate() throws Exception
+ public void testNumberOfFiles_truncate()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -495,7 +494,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -510,7 +509,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
}
- sstables = rewriter.finish();
+ rewriter.finish();
}
LifecycleTransaction.waitForDeletions();
@@ -519,7 +518,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void testSmallFiles() throws Exception
+ public void testSmallFiles()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -535,7 +534,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
try (ISSTableScanner scanner = s.getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, true);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
{
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -561,7 +560,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void testSSTableSplit() throws InterruptedException
+ public void testSSTableSplit()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -603,12 +602,12 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void testAbort2() throws Exception
+ public void testAbort2()
{
testAbortHelper(true, false);
}
- private void testAbortHelper(boolean earlyException, boolean offline) throws Exception
+ private void testAbortHelper(boolean earlyException, boolean offline)
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -621,7 +620,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN, compacting)
: cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(txn, 100, 10000000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 100, 10000000, false, true);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
)
{
@@ -711,7 +710,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
CompactionController controller = new CompactionController(cfs, compacting, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
- SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1, false);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1, false, true);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
)
{
@@ -735,7 +734,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
@Test
- public void testCanonicalView() throws Exception
+ public void testCanonicalView()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -749,7 +748,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
try (ISSTableScanner scanner = sstables.iterator().next().getScanner();
CompactionController controller = new CompactionController(cfs, sstables, 0);
LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
- SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
+ SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true);
CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
)
{
@@ -775,11 +774,9 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
/**
* emulates anticompaction - writing from one source sstable to two new sstables
- *
- * @throws IOException
*/
@Test
- public void testTwoWriters() throws IOException
+ public void testTwoWriters()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -853,6 +850,74 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
+ /**
+ * tests SSTableRewriter ctor arg controlling whether writers metadata buffers are released.
+ * Verifies that writers trip an assert when updated after cleared on switch
+ *
+ * CASSANDRA-14834
+ */
+ @Test
+ public void testWriterClearing()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+
+ // Can't update a writer that is eagerly cleared on switch
+ boolean eagerWriterMetaRelease = true;
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(new HashSet<>(), OperationType.UNKNOWN);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, eagerWriterMetaRelease)
+ )
+ {
+ SSTableWriter firstWriter = getWriter(cfs, dir, txn);
+ rewriter.switchWriter(firstWriter);
+ rewriter.switchWriter(getWriter(cfs, dir, txn));
+ try
+ {
+ UnfilteredRowIterator uri = mock(UnfilteredRowIterator.class);
+ when(uri.partitionLevelDeletion()).thenReturn(new DeletionTime(0,0));
+ when(uri.partitionKey()).thenReturn(bopKeyFromInt(0));
+ // should not be able to append after buffer release on switch
+ firstWriter.append(uri);
+ fail("Expected AssertionError was not thrown.");
+ }
+ catch(AssertionError ae) {
+ if (!ae.getMessage().contains("update is being called after releaseBuffers"))
+ throw ae;
+ }
+ }
+
+ // Can update a writer that is not eagerly cleared on switch
+ eagerWriterMetaRelease = false;
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(new HashSet<>(), OperationType.UNKNOWN);
+ SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, eagerWriterMetaRelease)
+ )
+ {
+ SSTableWriter firstWriter = getWriter(cfs, dir, txn);
+ rewriter.switchWriter(firstWriter);
+
+ // At least one write so it's not aborted when switched out.
+ UnfilteredRowIterator uri = mock(UnfilteredRowIterator.class);
+ when(uri.partitionLevelDeletion()).thenReturn(new DeletionTime(0,0));
+ when(uri.partitionKey()).thenReturn(bopKeyFromInt(0));
+ rewriter.append(uri);
+
+ rewriter.switchWriter(getWriter(cfs, dir, txn));
+
+ // should be able to append after switch, and assert is not tripped
+ when(uri.partitionKey()).thenReturn(bopKeyFromInt(1));
+ firstWriter.append(uri);
+ }
+ }
+
+ static DecoratedKey bopKeyFromInt(int i)
+ {
+ ByteBuffer bb = ByteBuffer.allocate(4);
+ bb.putInt(i);
+ bb.rewind();
+ return ByteOrderedPartitioner.instance.decorateKey(bb);
+ }
+
private void validateKeys(Keyspace ks)
{
for (int i = 0; i < 100; i++)
@@ -865,10 +930,10 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
public static SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
- return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null);
+ return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100), null);
}
- public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount, int partitionCount, int cellCount, int cellSize)
+ public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount, int partitionCount, int cellCount)
{
int i = 0;
Set<SSTableReader> result = new LinkedHashSet<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org