You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/06/19 16:01:41 UTC

cassandra git commit: Make it possible to set the max/min compaction thresholds

Repository: cassandra
Updated Branches:
  refs/heads/trunk c6570fac1 -> d52bdaefd


Make it possible to set the max/min compaction thresholds

Patch by Marcus Eriksson, reviewed by Alex Petrov for CASSANDRA-14388


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d52bdaef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d52bdaef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d52bdaef

Branch: refs/heads/trunk
Commit: d52bdaefda366b4485acb4e8852b3c0549b184bd
Parents: c6570fa
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Apr 16 16:14:28 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Tue Jun 19 17:36:54 2018 +0200

----------------------------------------------------------------------
 NEWS.txt                                        |  5 ++
 .../compaction/LeveledCompactionStrategy.java   |  4 ++
 .../db/compaction/LeveledManifest.java          | 22 +++----
 .../apache/cassandra/db/TableCQLHelperTest.java |  2 +-
 .../db/compaction/CompactionsCQLTest.java       | 67 ++++++++++++++++++++
 5 files changed, 88 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index da80684..e1e76de 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,11 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - LCS now respects the max_threshold parameter when compacting - this was hard coded to 32
+     before, but now it is possible to do bigger compactions when compacting from L0 to L1.
+     This also applies to STCS-compactions in L0 - if there are more than 32 sstables in L0
+     we will compact at most max_threshold sstables in an L0 STCS compaction. See CASSANDRA-14388
+     for more information.
    - There is now an option to automatically upgrade sstables after Cassandra upgrade, enable
      either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during runtime. See
      CASSANDRA-14197.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 7a7b1c1..b1091ce 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -569,6 +570,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
 
         uncheckedOptions.remove(LEVEL_FANOUT_SIZE_OPTION);
 
+        uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+        uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
+
         uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
 
         return uncheckedOptions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 013f0b4..291973f 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -49,9 +49,8 @@ public class LeveledManifest
     private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class);
 
     /**
-     * limit the number of L0 sstables we do at once, because compaction bloom filter creation
-     * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory
-     * or even OOMing when compacting highly overlapping sstables
+     * if we have more than MAX_COMPACTING_L0 sstables in L0, we will run a round of STCS with at most
+     * cfs.getMaxCompactionThreshold() sstables.
      */
     private static final int MAX_COMPACTING_L0 = 32;
     /**
@@ -163,6 +162,7 @@ public class LeveledManifest
             // The add(..):ed sstable will be sent to level 0
             try
             {
+                logger.debug("Could not add sstable {} in level {} - dropping to 0", reader, reader.getSSTableLevel());
                 reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
                 reader.reloadSSTableMetadata();
             }
@@ -346,12 +346,12 @@ public class LeveledManifest
         // L2: 12  [ideal: 100]
         //
         // The problem is that L0 has a much higher score (almost 250) than L1 (11), so what we'll
-        // do is compact a batch of MAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the
-        // result (say, 120 sstables) in L1. Then we'll compact the next batch of MAX_COMPACTING_L0,
+        // do is compact a batch of cfs.getMaximumCompactionThreshold() sstables with all 117 L1 sstables, and put the
+        // result (say, 120 sstables) in L1. Then we'll compact the next batch of cfs.getMaxCompactionThreshold(),
         // and so forth.  So we spend most of our i/o rewriting the L1 data with each batch.
         //
         // If we could just do *all* L0 a single time with L1, that would be ideal.  But we can't
-        // -- see the javadoc for MAX_COMPACTING_L0.
+        // since we might run out of memory
         //
         // LevelDB's way around this is to simply block writes if L0 compaction falls behind.
         // We don't have that luxury.
@@ -667,7 +667,7 @@ public class LeveledManifest
             // 1a. add sstables to the candidate set until we have at least maxSSTableSizeInMB
             // 1b. prefer choosing older sstables as candidates, to newer ones
             // 1c. any L0 sstables that overlap a candidate, will also become candidates
-            // 2. At most MAX_COMPACTING_L0 sstables from L0 will be compacted at once
+            // 2. At most max_threshold sstables from L0 will be compacted at once
             // 3. If total candidate size is less than maxSSTableSizeInMB, we won't bother compacting with L1,
             //    and the result of the compaction will stay in L0 instead of being promoted (see promote())
             //
@@ -693,10 +693,10 @@ public class LeveledManifest
                     remaining.remove(newCandidate);
                 }
 
-                if (candidates.size() > MAX_COMPACTING_L0)
+                if (candidates.size() > cfs.getMaximumCompactionThreshold())
                 {
-                    // limit to only the MAX_COMPACTING_L0 oldest candidates
-                    candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0));
+                    // limit to only the cfs.getMaximumCompactionThreshold() oldest candidates
+                    candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, cfs.getMaximumCompactionThreshold()));
                     break;
                 }
             }
@@ -820,7 +820,7 @@ public class LeveledManifest
 
         if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
         {
-            int l0compactions = getLevel(0).size() / MAX_COMPACTING_L0;
+            int l0compactions = getLevel(0).size() / cfs.getMaximumCompactionThreshold();
             tasks += l0compactions;
             estimated[0] += l0compactions;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
index 1e465b3..701afbb 100644
--- a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
+++ b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
@@ -274,7 +274,7 @@ public class TableCQLHelperTest extends CQLTester
         "\tAND speculative_retry = 'ALWAYS'\n" +
         "\tAND comment = 'comment'\n" +
         "\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" +
-        "\tAND compaction = { 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'sstable_size_in_mb': '1' }\n" +
+        "\tAND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'sstable_size_in_mb': '1', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' }\n" +
         "\tAND compression = { 'chunk_length_in_kb': '64', 'min_compress_ratio': '2.0', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor' }\n" +
         "\tAND cdc = false\n" +
         "\tAND extensions = { 'ext1': 0x76616c31 };"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d52bdaef/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index ca420da..c91d2fe 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.commons.lang.StringUtils;
 import org.junit.After;
@@ -31,11 +33,13 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RangeTombstone;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
@@ -388,6 +392,69 @@ public class CompactionsCQLTest extends CQLTester
         DatabaseDescriptor.setColumnIndexSize(maxSizePre);
     }
 
+
+    @Test
+    public void testLCSThresholdParams() throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t blob, primary key (id, id2)) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':'1', 'max_threshold':'60'}");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+        byte [] b = new byte[100 * 1024];
+        new Random().nextBytes(b);
+        ByteBuffer value = ByteBuffer.wrap(b);
+        for (int i = 0; i < 50; i++)
+        {
+            for (int j = 0; j < 10; j++)
+            {
+                execute("insert into %s (id, id2, t) values (?, ?, ?)", i, j, value);
+            }
+            cfs.forceBlockingFlush();
+        }
+        assertEquals(50, cfs.getLiveSSTables().size());
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+        AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
+        // we should be compacting all 50 sstables:
+        assertEquals(50, act.transaction.originals().size());
+        act.execute(null);
+    }
+
+    @Test
+    public void testSTCSinL0() throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t blob, primary key (id, id2)) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':'1', 'max_threshold':'60'}");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+        execute("insert into %s (id, id2, t) values (?, ?, ?)", 1,1,"L1");
+        cfs.forceBlockingFlush();
+        cfs.forceMajorCompaction();
+        SSTableReader l1sstable = cfs.getLiveSSTables().iterator().next();
+        assertEquals(1, l1sstable.getSSTableLevel());
+        // now we have a single L1 sstable, create many L0 ones:
+        byte [] b = new byte[100 * 1024];
+        new Random().nextBytes(b);
+        ByteBuffer value = ByteBuffer.wrap(b);
+        for (int i = 0; i < 50; i++)
+        {
+            for (int j = 0; j < 10; j++)
+            {
+                execute("insert into %s (id, id2, t) values (?, ?, ?)", i, j, value);
+            }
+            cfs.forceBlockingFlush();
+        }
+        assertEquals(51, cfs.getLiveSSTables().size());
+
+        // mark the L1 sstable as compacting to make sure we trigger STCS in L0:
+        LifecycleTransaction txn = cfs.getTracker().tryModify(l1sstable, OperationType.COMPACTION);
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+        AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
+        // note that max_threshold is 60 (more than the amount of L0 sstables), but MAX_COMPACTING_L0 is 32, which means we will trigger STCS with at most max_threshold sstables
+        assertEquals(50, act.transaction.originals().size());
+        assertEquals(0, ((LeveledCompactionTask)act).getLevel());
+        assertTrue(act.transaction.originals().stream().allMatch(s -> s.getSSTableLevel() == 0));
+        txn.abort(); // unmark the l1 sstable compacting
+        act.execute(null);
+    }
+
     private void prepareWide() throws Throwable
     {
         createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org