You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/06/19 16:01:41 UTC
cassandra git commit: Make it possible to set the max/min compaction
thresholds
Repository: cassandra
Updated Branches:
refs/heads/trunk c6570fac1 -> d52bdaefd
Make it possible to set the max/min compaction thresholds
Patch by Marcus Eriksson, reviewed by Alex Petrov for CASSANDRA-14388
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d52bdaef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d52bdaef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d52bdaef
Branch: refs/heads/trunk
Commit: d52bdaefda366b4485acb4e8852b3c0549b184bd
Parents: c6570fa
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Apr 16 16:14:28 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Tue Jun 19 17:36:54 2018 +0200
----------------------------------------------------------------------
NEWS.txt | 5 ++
.../compaction/LeveledCompactionStrategy.java | 4 ++
.../db/compaction/LeveledManifest.java | 22 +++----
.../apache/cassandra/db/TableCQLHelperTest.java | 2 +-
.../db/compaction/CompactionsCQLTest.java | 67 ++++++++++++++++++++
5 files changed, 88 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index da80684..e1e76de 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,11 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - LCS now respects the max_threshold parameter when compacting - this was hard coded to 32
+ before, but now it is possible to do bigger compactions when compacting from L0 to L1.
+ This also applies to STCS-compactions in L0 - if there are more than 32 sstables in L0
+ we will compact at most max_threshold sstables in an L0 STCS compaction. See CASSANDRA-14388
+ for more information.
- There is now an option to automatically upgrade sstables after Cassandra upgrade, enable
either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during runtime. See
CASSANDRA-14197.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 7a7b1c1..b1091ce 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -569,6 +570,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
uncheckedOptions.remove(LEVEL_FANOUT_SIZE_OPTION);
+ uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+ uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
+
uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
return uncheckedOptions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 013f0b4..291973f 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -49,9 +49,8 @@ public class LeveledManifest
private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class);
/**
- * limit the number of L0 sstables we do at once, because compaction bloom filter creation
- * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory
- * or even OOMing when compacting highly overlapping sstables
+ * if we have more than MAX_COMPACTING_L0 sstables in L0, we will run a round of STCS with at most
+ * cfs.getMaxCompactionThreshold() sstables.
*/
private static final int MAX_COMPACTING_L0 = 32;
/**
@@ -163,6 +162,7 @@ public class LeveledManifest
// The add(..):ed sstable will be sent to level 0
try
{
+ logger.debug("Could not add sstable {} in level {} - dropping to 0", reader, reader.getSSTableLevel());
reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
reader.reloadSSTableMetadata();
}
@@ -346,12 +346,12 @@ public class LeveledManifest
// L2: 12 [ideal: 100]
//
// The problem is that L0 has a much higher score (almost 250) than L1 (11), so what we'll
- // do is compact a batch of MAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the
- // result (say, 120 sstables) in L1. Then we'll compact the next batch of MAX_COMPACTING_L0,
+ // do is compact a batch of cfs.getMaximumCompactionThreshold() sstables with all 117 L1 sstables, and put the
+ // result (say, 120 sstables) in L1. Then we'll compact the next batch of cfs.getMaxCompactionThreshold(),
// and so forth. So we spend most of our i/o rewriting the L1 data with each batch.
//
// If we could just do *all* L0 a single time with L1, that would be ideal. But we can't
- // -- see the javadoc for MAX_COMPACTING_L0.
+ // since we might run out of memory
//
// LevelDB's way around this is to simply block writes if L0 compaction falls behind.
// We don't have that luxury.
@@ -667,7 +667,7 @@ public class LeveledManifest
// 1a. add sstables to the candidate set until we have at least maxSSTableSizeInMB
// 1b. prefer choosing older sstables as candidates, to newer ones
// 1c. any L0 sstables that overlap a candidate, will also become candidates
- // 2. At most MAX_COMPACTING_L0 sstables from L0 will be compacted at once
+ // 2. At most max_threshold sstables from L0 will be compacted at once
// 3. If total candidate size is less than maxSSTableSizeInMB, we won't bother compacting with L1,
// and the result of the compaction will stay in L0 instead of being promoted (see promote())
//
@@ -693,10 +693,10 @@ public class LeveledManifest
remaining.remove(newCandidate);
}
- if (candidates.size() > MAX_COMPACTING_L0)
+ if (candidates.size() > cfs.getMaximumCompactionThreshold())
{
- // limit to only the MAX_COMPACTING_L0 oldest candidates
- candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0));
+ // limit to only the cfs.getMaximumCompactionThreshold() oldest candidates
+ candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, cfs.getMaximumCompactionThreshold()));
break;
}
}
@@ -820,7 +820,7 @@ public class LeveledManifest
if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
{
- int l0compactions = getLevel(0).size() / MAX_COMPACTING_L0;
+ int l0compactions = getLevel(0).size() / cfs.getMaximumCompactionThreshold();
tasks += l0compactions;
estimated[0] += l0compactions;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
index 1e465b3..701afbb 100644
--- a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
+++ b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
@@ -274,7 +274,7 @@ public class TableCQLHelperTest extends CQLTester
"\tAND speculative_retry = 'ALWAYS'\n" +
"\tAND comment = 'comment'\n" +
"\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" +
- "\tAND compaction = { 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'sstable_size_in_mb': '1' }\n" +
+ "\tAND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'sstable_size_in_mb': '1', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' }\n" +
"\tAND compression = { 'chunk_length_in_kb': '64', 'min_compress_ratio': '2.0', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor' }\n" +
"\tAND cdc = false\n" +
"\tAND extensions = { 'ext1': 0x76616c31 };"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index ca420da..c91d2fe 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -17,10 +17,12 @@
*/
package org.apache.cassandra.db.compaction;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import org.apache.commons.lang.StringUtils;
import org.junit.After;
@@ -31,11 +33,13 @@ import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RangeTombstone;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
@@ -388,6 +392,69 @@ public class CompactionsCQLTest extends CQLTester
DatabaseDescriptor.setColumnIndexSize(maxSizePre);
}
+
+ @Test
+ public void testLCSThresholdParams() throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t blob, primary key (id, id2)) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':'1', 'max_threshold':'60'}");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ cfs.disableAutoCompaction();
+ byte [] b = new byte[100 * 1024];
+ new Random().nextBytes(b);
+ ByteBuffer value = ByteBuffer.wrap(b);
+ for (int i = 0; i < 50; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ execute("insert into %s (id, id2, t) values (?, ?, ?)", i, j, value);
+ }
+ cfs.forceBlockingFlush();
+ }
+ assertEquals(50, cfs.getLiveSSTables().size());
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+ AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
+ // we should be compacting all 50 sstables:
+ assertEquals(50, act.transaction.originals().size());
+ act.execute(null);
+ }
+
+ @Test
+ public void testSTCSinL0() throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t blob, primary key (id, id2)) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':'1', 'max_threshold':'60'}");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ cfs.disableAutoCompaction();
+ execute("insert into %s (id, id2, t) values (?, ?, ?)", 1,1,"L1");
+ cfs.forceBlockingFlush();
+ cfs.forceMajorCompaction();
+ SSTableReader l1sstable = cfs.getLiveSSTables().iterator().next();
+ assertEquals(1, l1sstable.getSSTableLevel());
+ // now we have a single L1 sstable, create many L0 ones:
+ byte [] b = new byte[100 * 1024];
+ new Random().nextBytes(b);
+ ByteBuffer value = ByteBuffer.wrap(b);
+ for (int i = 0; i < 50; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ execute("insert into %s (id, id2, t) values (?, ?, ?)", i, j, value);
+ }
+ cfs.forceBlockingFlush();
+ }
+ assertEquals(51, cfs.getLiveSSTables().size());
+
+ // mark the L1 sstable as compacting to make sure we trigger STCS in L0:
+ LifecycleTransaction txn = cfs.getTracker().tryModify(l1sstable, OperationType.COMPACTION);
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+ AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
+ // note that max_threshold is 60 (more than the amount of L0 sstables), but MAX_COMPACTING_L0 is 32, which means we will trigger STCS with at most max_threshold sstables
+ assertEquals(50, act.transaction.originals().size());
+ assertEquals(0, ((LeveledCompactionTask)act).getLevel());
+ assertTrue(act.transaction.originals().stream().allMatch(s -> s.getSSTableLevel() == 0));
+ txn.abort(); // unmark the l1 sstable compacting
+ act.execute(null);
+ }
+
private void prepareWide() throws Throwable
{
createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org