You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/06/30 08:47:49 UTC
git commit: Include high level sstables in lower level compactions if
not compacted for a while.
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 536ae7ae9 -> fc9eeb99f
Include high level sstables in lower level compactions if not compacted for a while.
Patch by marcuse, reviewed by kohlisankalp and yukim for CASSANDRA-7414
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc9eeb99
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc9eeb99
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc9eeb99
Branch: refs/heads/cassandra-2.0
Commit: fc9eeb99f9c1c13335c7df19b725e8b1b80b8cbb
Parents: 536ae7a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 17 09:12:51 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jun 30 08:40:37 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/LeveledManifest.java | 86 +++++++++++++++++++-
2 files changed, 84 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc9eeb99/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e722392..e88753f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
operations to incorrect become full QUORUM (CASSANDRA-7345)
* Properly handle unrecognized opcodes and flags (CASSANDRA-7440)
* (Hadoop) close CqlRecordWriter clients when finished (CASSANDRA-7459)
+ * Make sure high level sstables get compacted (CASSANDRA-7414)
2.0.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc9eeb99/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a78a867..1118ddc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
@@ -56,12 +57,19 @@ public class LeveledManifest
* or even OOMing when compacting highly overlapping sstables
*/
private static final int MAX_COMPACTING_L0 = 32;
+ /**
+ * If we go this many rounds without compacting
+ * in the highest level, we start bringing in sstables from
+ * that level into lower level compactions
+ */
+ private static final int NO_COMPACTION_LIMIT = 25;
private final ColumnFamilyStore cfs;
private final List<SSTableReader>[] generations;
private final RowPosition[] lastCompactedKeys;
private final int maxSSTableSizeInBytes;
private final SizeTieredCompactionStrategyOptions options;
+ private final int [] compactionCounter;
private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
{
@@ -80,6 +88,7 @@ public class LeveledManifest
generations[i] = new ArrayList<SSTableReader>();
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
}
+ compactionCounter = new int[n];
}
public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
@@ -276,10 +285,18 @@ public class LeveledManifest
// L0 is fine, proceed with this level
Collection<SSTableReader> candidates = getCandidatesFor(i);
- if (logger.isDebugEnabled())
- logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
if (!candidates.isEmpty())
- return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
+ {
+ int nextLevel = getNextLevel(candidates);
+ candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
+ if (logger.isDebugEnabled())
+ logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
+ return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes());
+ }
+ else
+ {
+ logger.debug("No compaction candidates for L{}", i);
+ }
}
}
@@ -292,6 +309,69 @@ public class LeveledManifest
return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
}
+ /**
+ * If we do something that makes many levels contain too little data (cleanup, change sstable size) we will "never"
+ * compact the high levels.
+ *
+ * This method finds if we have gone many compaction rounds without doing any high-level compaction, if so
+ * we start bringing in one sstable from the highest level until that level is either empty or is doing compaction.
+ *
+ * @param targetLevel the level the candidates will be compacted into
+ * @param candidates the original sstables to compact
+ * @return
+ */
+ private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
+ {
+ Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
+
+ for (int i = generations.length - 1; i > 0; i--)
+ compactionCounter[i]++;
+ compactionCounter[targetLevel] = 0;
+ if (logger.isDebugEnabled())
+ {
+ for (int j = 0; j < compactionCounter.length; j++)
+ logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
+ }
+
+ for (int i = generations.length - 1; i > 0; i--)
+ {
+ if (getLevelSize(i) > 0)
+ {
+ if (compactionCounter[i] > NO_COMPACTION_LIMIT)
+ {
+ // we try to find an sstable that is fully contained within the boundaries we are compacting;
+ // say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
+ // this means that we will not create overlap in L2 if we add an sstable
+ // contained within 0 -> 33 to the compaction
+ RowPosition max = null;
+ RowPosition min = null;
+ for (SSTableReader candidate : candidates)
+ {
+ if (min == null || candidate.first.compareTo(min) < 0)
+ min = candidate.first;
+ if (max == null || candidate.last.compareTo(max) > 0)
+ max = candidate.last;
+ }
+ Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+ Range<RowPosition> boundaries = new Range<>(min, max);
+ for (SSTableReader sstable : getLevel(i))
+ {
+ Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
+ if (boundaries.contains(r) && !compacting.contains(sstable))
+ {
+ logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
+ withStarvedCandidate.add(sstable);
+ return withStarvedCandidate;
+ }
+ }
+ }
+ return candidates;
+ }
+ }
+
+ return candidates;
+ }
+
public synchronized int getLevelSize(int i)
{
if (i >= generations.length)