You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/06/30 08:47:49 UTC

git commit: Include high level sstables in lower level compactions if not compacted for a while.

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 536ae7ae9 -> fc9eeb99f


Include high level sstables in lower level compactions if not compacted for a while.

Patch by marcuse, reviewed by kohlisankalp and yukim for CASSANDRA-7414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc9eeb99
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc9eeb99
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc9eeb99

Branch: refs/heads/cassandra-2.0
Commit: fc9eeb99f9c1c13335c7df19b725e8b1b80b8cbb
Parents: 536ae7a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 17 09:12:51 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jun 30 08:40:37 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LeveledManifest.java          | 86 +++++++++++++++++++-
 2 files changed, 84 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc9eeb99/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e722392..e88753f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
    operations to incorrect become full QUORUM (CASSANDRA-7345)
  * Properly handle unrecognized opcodes and flags (CASSANDRA-7440)
  * (Hadoop) close CqlRecordWriter clients when finished (CASSANDRA-7459)
+ * Make sure high level sstables get compacted (CASSANDRA-7414)
 
 
 2.0.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc9eeb99/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a78a867..1118ddc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
@@ -56,12 +57,19 @@ public class LeveledManifest
      * or even OOMing when compacting highly overlapping sstables
      */
     private static final int MAX_COMPACTING_L0 = 32;
+    /**
+     * If we go this many rounds without compacting
+     * in the highest level, we start bringing in sstables from
+     * that level into lower level compactions
+     */
+    private static final int NO_COMPACTION_LIMIT = 25;
 
     private final ColumnFamilyStore cfs;
     private final List<SSTableReader>[] generations;
     private final RowPosition[] lastCompactedKeys;
     private final int maxSSTableSizeInBytes;
     private final SizeTieredCompactionStrategyOptions options;
+    private final int [] compactionCounter;
 
     private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
     {
@@ -80,6 +88,7 @@ public class LeveledManifest
             generations[i] = new ArrayList<SSTableReader>();
             lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
         }
+        compactionCounter = new int[n];
     }
 
     public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
@@ -276,10 +285,18 @@ public class LeveledManifest
 
                 // L0 is fine, proceed with this level
                 Collection<SSTableReader> candidates = getCandidatesFor(i);
-                if (logger.isDebugEnabled())
-                    logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
                 if (!candidates.isEmpty())
-                    return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
+                {
+                    int nextLevel = getNextLevel(candidates);
+                    candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
+                    if (logger.isDebugEnabled())
+                        logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
+                    return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes());
+                }
+                else
+                {
+                    logger.debug("No compaction candidates for L{}", i);
+                }
             }
         }
 
@@ -292,6 +309,69 @@ public class LeveledManifest
         return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
     }
 
+    /**
+     * If we do something that makes many levels contain too little data (cleanup, change sstable size) we will "never"
+     * compact the high levels.
+     *
+     * This method finds if we have gone many compaction rounds without doing any high-level compaction, if so
+     * we start bringing in one sstable from the highest level until that level is either empty or is doing compaction.
+     *
+     * @param targetLevel the level the candidates will be compacted into
+     * @param candidates the original sstables to compact
+     * @return
+     */
+    private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
+    {
+        Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
+
+        for (int i = generations.length - 1; i > 0; i--)
+            compactionCounter[i]++;
+        compactionCounter[targetLevel] = 0;
+        if (logger.isDebugEnabled())
+        {
+            for (int j = 0; j < compactionCounter.length; j++)
+                logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
+        }
+
+        for (int i = generations.length - 1; i > 0; i--)
+        {
+            if (getLevelSize(i) > 0)
+            {
+                if (compactionCounter[i] > NO_COMPACTION_LIMIT)
+                {
+                    // we try to find an sstable that is fully contained within  the boundaries we are compacting;
+                    // say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
+                    // this means that we will not create overlap in L2 if we add an sstable
+                    // contained within 0 -> 33 to the compaction
+                    RowPosition max = null;
+                    RowPosition min = null;
+                    for (SSTableReader candidate : candidates)
+                    {
+                        if (min == null || candidate.first.compareTo(min) < 0)
+                            min = candidate.first;
+                        if (max == null || candidate.last.compareTo(max) > 0)
+                            max = candidate.last;
+                    }
+                    Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+                    Range<RowPosition> boundaries = new Range<>(min, max);
+                    for (SSTableReader sstable : getLevel(i))
+                    {
+                        Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
+                        if (boundaries.contains(r) && !compacting.contains(sstable))
+                        {
+                            logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
+                            withStarvedCandidate.add(sstable);
+                            return withStarvedCandidate;
+                        }
+                    }
+                }
+                return candidates;
+            }
+        }
+
+        return candidates;
+    }
+
     public synchronized int getLevelSize(int i)
     {
         if (i >= generations.length)