You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/20 15:20:29 UTC
svn commit: r1173136 - in /cassandra/trunk: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/locator/
Author: jbellis
Date: Tue Sep 20 13:20:28 2011
New Revision: 1173136
URL: http://svn.apache.org/viewvc?rev=1173136&view=rev
Log:
merge #3224 from 1.0
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/build.xml
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1171737,1172026,1172591
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1172719
-/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0:1167085-1173134
+/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1173133
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Sep 20 13:20:28 2011
@@ -14,6 +14,8 @@
true) and set default badness threshold to 0.1 (CASSANDRA-3229)
* Base choice of random or "balanced" token on bootstrap on whether
schema definitions were found (CASSANDRA-3219)
+ * Fixes for LeveledCompactionStrategy score computation, prioritization,
+ and scheduling (CASSANDRA-3224)
1.0.0-beta1
Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Tue Sep 20 13:20:28 2011
@@ -881,6 +881,7 @@ url=${svn.entry.url}?pathrev=${svn.entry
<fileset dir="${build.dir}">
<include name="${final.name}.jar" />
<include name="${ant.project.name}-thrift-${version}.jar" />
+ <include name="${ant.project.name}-clientutil-${version}.jar" />
</fileset>
</copy>
<copy todir="${dist.dir}/javadoc">
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1171737,1172026,1172591
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/contrib:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1173133
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1171737,1172026,1172591
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1173133
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1171737,1172026,1172591
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1173133
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1171737,1172026,1172591
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1173133
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1171737,1172026,1172591
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1173133
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1171737,1172026,1172591
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1173133
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Sep 20 13:20:28 2011
@@ -328,7 +328,7 @@ public class ColumnFamilyStore implement
*/
public static void scrubDataDirectories(String table, String columnFamily)
{
- logger.info("Removing compacted SSTable files (see http://wiki.apache.org/cassandra/MemtableSSTable)");
+ logger.info("Removing compacted SSTable files from " + columnFamily + " (see http://wiki.apache.org/cassandra/MemtableSSTable)");
for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table, columnFamily, true, true).entrySet())
{
Descriptor desc = sstableFiles.getKey();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Tue Sep 20 13:20:28 2011
@@ -76,9 +76,20 @@ public class LeveledCompactionStrategy e
logger.info(this + " subscribed to the data tracker.");
manifest = LeveledManifest.create(cfs, this.maxSSTableSize);
+ logger.debug("Created {}", manifest);
// override min/max for this strategy
cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
cfs.setMinimumCompactionThreshold(1);
+
+ // TODO this is redundant wrt the kickoff in AbstractCompactionStrategy, once CASSANDRA-X is done
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ CompactionManager.instance.submitBackground(LeveledCompactionStrategy.this.cfs);
+ }
+ };
+ StorageService.optionalTasks.scheduleAtFixedRate(runnable, 5 * 60, 5, TimeUnit.SECONDS);
}
public void shutdown()
@@ -96,12 +107,17 @@ public class LeveledCompactionStrategy e
{
LeveledCompactionTask currentTask = task.get();
if (currentTask != null && !currentTask.isDone())
+ {
+ logger.debug("Compaction still in progress for {}", this);
return Collections.emptyList();
+ }
Collection<SSTableReader> sstables = manifest.getCompactionCandidates();
- logger.debug("CompactionManager candidates are {}", StringUtils.join(sstables, ","));
if (sstables.isEmpty())
+ {
+ logger.debug("No compaction necessary for {}", this);
return Collections.emptyList();
+ }
LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize);
return task.compareAndSet(currentTask, newTask)
@@ -139,4 +155,10 @@ public class LeveledCompactionStrategy e
manifest.logDistribution();
}
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("LCS@%d(%s)", hashCode(), cfs.columnFamily);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java Tue Sep 20 13:20:28 2011
@@ -26,6 +26,7 @@ import java.io.IOError;
import java.io.IOException;
import java.util.*;
+import com.google.common.collect.Iterables;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -37,6 +38,7 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
@@ -47,6 +49,13 @@ public class LeveledManifest
{
private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class);
+ /**
+ * limit the number of L0 sstables we do at once, because compaction bloom filter creation
+ * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory
+ * or even OOMing when compacting highly overlapping sstables
+ */
+ private static int MAX_COMPACTING_L0 = 32;
+
private final ColumnFamilyStore cfs;
private final List<SSTableReader>[] generations;
private final DecoratedKey[] lastCompactedKeys;
@@ -85,37 +94,37 @@ public class LeveledManifest
private static void load(ColumnFamilyStore cfs, LeveledManifest manifest)
{
+ File manifestFile = tryGetManifest(cfs);
+ if (manifestFile == null)
+ return;
+
ObjectMapper m = new ObjectMapper();
try
{
- File manifestFile = tryGetManifest(cfs);
-
- if (manifestFile != null && manifestFile.exists())
+ JsonNode rootNode = m.readValue(manifestFile, JsonNode.class);
+ JsonNode generations = rootNode.get("generations");
+ assert generations.isArray();
+ for (JsonNode generation : generations)
{
- JsonNode rootNode = m.readValue(manifestFile, JsonNode.class);
- JsonNode generations = rootNode.get("generations");
- assert generations.isArray();
- for (JsonNode generation : generations)
+ int level = generation.get("generation").getIntValue();
+ JsonNode generationValues = generation.get("members");
+ for (JsonNode generationValue : generationValues)
{
- int level = generation.get("generation").getIntValue();
- JsonNode generationValues = generation.get("members");
- for (JsonNode generationValue : generationValues)
+ for (SSTableReader ssTableReader : cfs.getSSTables())
{
- for (SSTableReader ssTableReader : cfs.getSSTables())
+ if (ssTableReader.descriptor.generation == generationValue.getIntValue())
{
- if (ssTableReader.descriptor.generation == generationValue.getIntValue())
- {
- logger.debug("Loading {} at L{}", ssTableReader, level);
- manifest.add(ssTableReader, level);
- }
+ logger.debug("Loading {} at L{}", ssTableReader, level);
+ manifest.add(ssTableReader, level);
}
}
}
}
}
- catch (IOException e)
+ catch (Exception e)
{
- throw new IOError(e);
+ // TODO try to recover -old first
+ logger.error("Manifest present but corrupt. Cassandra will compact levels from scratch", e);
}
}
@@ -140,8 +149,9 @@ public class LeveledManifest
public synchronized void promote(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
{
- logger.debug("Replacing [{}] with [{}]", StringUtils.join(removed.iterator(), ", "), StringUtils.join(added.iterator(), ", "));
-
+ if (logger.isDebugEnabled())
+ logger.debug((Iterables.isEmpty(added) ? "Removing [" : "Replacing [") + toString(removed) + "]");
+
// the level for the added sstables is the max of the removed ones,
// plus one if the removed were all on the same level
int minimumLevel = Integer.MAX_VALUE;
@@ -160,16 +170,25 @@ public class LeveledManifest
int newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel;
newLevel = skipLevels(newLevel, added);
+ assert newLevel > 0;
+ logger.debug("Adding [{}] at L{}", StringUtils.join(added.iterator(), ", "), newLevel);
lastCompactedKeys[minimumLevel] = SSTable.sstableOrdering.max(added).last;
- logger.debug("Adding [{}] to L{}",
- StringUtils.join(added.iterator(), ", "), newLevel);
for (SSTableReader ssTableReader : added)
add(ssTableReader, newLevel);
serialize();
}
+ private String toString(Iterable<SSTableReader> sstables)
+ {
+ StringBuilder builder = new StringBuilder();
+ for (SSTableReader sstable : sstables)
+ {
+ builder.append(sstable.toString()).append(" (L").append(levelOf(sstable)).append("), ");
+ }
+ return builder.toString();
+ }
private double maxBytesForLevel (int level)
{
@@ -182,28 +201,47 @@ public class LeveledManifest
{
logDistribution();
- double bestScore = -1;
- int bestLevel = -1;
- for (int level = 0; level < generations.length; level++)
+ // LevelDB gives each level a score of how much data it contains vs its ideal amount, and
+ // compacts the level with the highest score. But this falls apart spectacularly once you
+ // get behind. Consider this set of levels:
+ // L0: 988 [ideal: 4]
+ // L1: 117 [ideal: 10]
+ // L2: 12 [ideal: 100]
+ //
+ // The problem is that L0 has a much higher score (almost 250) than L1 (11), so what we'll
+ // do is compact a batch of MAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the
+ // result (say, 120 sstables) in L1. Then we'll compact the next batch of MAX_COMPACTING_L0,
+ // and so forth. So we spend most of our i/o rewriting the L1 data with each batch.
+ //
+ // If we could just do *all* L0 a single time with L1, that would be ideal. But we can't
+ // -- see the javadoc for MAX_COMPACTING_L0.
+ //
+ // LevelDB's way around this is to simply block writes if L0 compaction falls behind.
+ // We don't have that luxury.
+ //
+ // So instead, we force compacting higher levels first. This may not minimize the number
+ // of reads done as quickly in the short term, but it minimizes the i/o needed to compact
+ // optimially which gives us a long term win.
+ for (int i = generations.length - 1; i >= 0; i--)
{
- List<SSTableReader> sstables = generations[level];
+ List<SSTableReader> sstables = generations[i];
if (sstables.isEmpty())
- continue;
-
- double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(level);
- //if we're idle and we don't have anything better to do schedule a compaction for L0
- //by setting its threshold to some very low value
- score = (level == 0 && score < 1) ? 1.001 : 0;
- logger.debug("Compaction score for level {} is {}", level, score);
- if (score > bestScore)
+ continue; // mostly this just avoids polluting the debug log with zero scores
+ double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(i);
+ logger.debug("Compaction score for level {} is {}", i, score);
+
+ // L0 gets a special case that if we don't have anything more important to do,
+ // we'll go ahead and compact even just one sstable
+ if (score > 1 || i == 0)
{
- bestScore = score;
- bestLevel = level;
+ Collection<SSTableReader> candidates = getCandidatesFor(i);
+ if (logger.isDebugEnabled())
+ logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
+ return candidates;
}
}
- // if we have met at least one of our thresholds then trigger a compaction
- return bestScore > 1 ? getCandidatesFor(bestLevel) : Collections.<SSTableReader>emptyList();
+ return Collections.emptyList();
}
public int getLevelSize(int i)
@@ -215,7 +253,13 @@ public class LeveledManifest
public void logDistribution()
{
for (int i = 0; i < generations.length; i++)
- logger.debug("Level {} contains {} SSTables", i, generations[i].size());
+ {
+ if (!generations[i].isEmpty())
+ {
+ logger.debug("L{} contains {} SSTables ({} bytes) in {}",
+ new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this});
+ }
+ }
}
private int levelOf(SSTableReader sstable)
@@ -258,25 +302,21 @@ public class LeveledManifest
private Collection<SSTableReader> getCandidatesFor(int level)
{
assert !generations[level].isEmpty();
+ logger.debug("Choosing candidates for L{}", level);
if (level == 0)
{
// because L0 files may overlap each other, we treat compactions there specially:
// a L0 compaction also checks other L0 files for overlap.
Set<SSTableReader> candidates = new HashSet<SSTableReader>();
- Set<SSTableReader> remaining = new HashSet<SSTableReader>(generations[0]);
-
- while (!remaining.isEmpty())
- {
- // pick a random sstable from L0, and any that overlap with it
- List<SSTableReader> L0 = overlapping(remaining.iterator().next(), remaining);
- // add the overlapping ones from L1
- for (SSTableReader sstable : L0)
- {
- candidates.addAll(overlapping(sstable, generations[1]));
- remaining.remove(sstable);
- }
- }
+ // pick the oldest sstable from L0, and any that overlap with it
+ List<SSTableReader> ageSortedSSTables = new ArrayList<SSTableReader>(generations[0]);
+ Collections.sort(ageSortedSSTables, SSTable.maxTimestampComparator);
+ List<SSTableReader> L0 = overlapping(ageSortedSSTables.get(0), generations[0]);
+ L0 = L0.size() > MAX_COMPACTING_L0 ? L0.subList(0, MAX_COMPACTING_L0) : L0;
+ // add the overlapping ones from L1
+ for (SSTableReader sstable : L0)
+ candidates.addAll(overlapping(sstable, generations[1]));
return candidates;
}
@@ -294,18 +334,16 @@ public class LeveledManifest
public synchronized void serialize()
{
- String dataFileLocation = getDataFilePrefix(cfs);
- String tempManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "tmp.json";
- String manifestFileName = dataFileLocation + cfs.getColumnFamilyName() + ".json";
- String oldManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "old.json";
-
- File tmpManifest = new File(tempManifestFileName);
+ File manifestFile = tryGetManifest(cfs);
+ if (manifestFile == null)
+ manifestFile = new File(new File(DatabaseDescriptor.getAllDataFileLocations()[0], cfs.table.name), cfs.columnFamily + ".json");
+ File oldFile = new File(manifestFile.getPath().replace(".json", "-old.json"));
+ File tmpFile = new File(manifestFile.getPath().replace(".json", "-tmp.json"));
JsonFactory f = new JsonFactory();
-
try
{
- JsonGenerator g = f.createJsonGenerator(tmpManifest, JsonEncoding.UTF8);
+ JsonGenerator g = f.createJsonGenerator(tmpFile, JsonEncoding.UTF8);
g.useDefaultPrettyPrinter();
g.writeStartObject();
g.writeArrayFieldStart("generations");
@@ -323,36 +361,39 @@ public class LeveledManifest
g.writeEndArray(); // for field generations
g.writeEndObject(); // write global object
g.close();
+
+ if (oldFile.exists() && manifestFile.exists())
+ FileUtils.deleteWithConfirm(oldFile);
+ if (manifestFile.exists())
+ FileUtils.renameWithConfirm(manifestFile, oldFile);
+ assert tmpFile.exists();
+ FileUtils.renameWithConfirm(tmpFile, manifestFile);
+ logger.debug("Saved manifest {}", manifestFile);
}
catch (IOException e)
{
- e.printStackTrace();
+ throw new IOError(e);
}
- File oldFile = new File(oldManifestFileName);
- if (oldFile.exists())
- oldFile.delete();
- File currentManifest = new File(manifestFileName);
- if (currentManifest.exists())
- currentManifest.renameTo(new File(oldManifestFileName));
- if (tmpManifest.exists())
- tmpManifest.renameTo(new File(manifestFileName));
}
public static File tryGetManifest(ColumnFamilyStore cfs)
{
- for (String dataFileLocation : DatabaseDescriptor.getAllDataFileLocations())
+ for (String dir : DatabaseDescriptor.getAllDataFileLocations())
{
- dataFileLocation = getDataFilePrefix(cfs);
- String manifestFileName = dataFileLocation + System.getProperty("file.separator") + cfs.table.name + ".json";
- File manifestFile = new File(manifestFileName);
+ File manifestFile = new File(new File(dir, cfs.table.name), cfs.columnFamily + ".json");
if (manifestFile.exists())
+ {
+ logger.debug("Loading manifest from {}", manifestFile);
return manifestFile;
+ }
}
+ logger.debug("No level manifest found");
return null;
}
- public static String getDataFilePrefix(ColumnFamilyStore cfs)
+ @Override
+ public String toString()
{
- return DatabaseDescriptor.getAllDataFileLocations()[0] + System.getProperty("file.separator") + cfs.table.name + System.getProperty("file.separator");
+ return "Manifest@" + hashCode();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Tue Sep 20 13:20:28 2011
@@ -43,22 +43,34 @@ public class TokenMetadata
/* Maintains token to endpoint map of every node in the cluster. */
private BiMap<Token, InetAddress> tokenToEndpointMap;
- // Suppose that there is a ring of nodes A, C and E, with replication factor 3.
+ // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> pendingRanges<tt>,
+ // which was added to when a node began bootstrap and removed from when it finished.
+ //
+ // This is inadequate when multiple changes are allowed simultaneously. For example,
+ // suppose that there is a ring of nodes A, C and E, with replication factor 3.
// Node D bootstraps between C and E, so its pending ranges will be E-A, A-C and C-D.
- // Now suppose node B bootstraps between A and C at the same time. Its pending ranges would be C-E, E-A and A-B.
- // Now both nodes have pending range E-A in their list, which will cause pending range collision
- // even though we're only talking about replica range, not even primary range. The same thing happens
- // for any nodes that boot simultaneously between same two nodes. For this we cannot simply make pending ranges a <tt>Multimap</tt>,
- // since that would make us unable to notice the real problem of two nodes trying to boot using the same token.
- // In order to do this properly, we need to know what tokens are booting at any time.
+ // Now suppose node B bootstraps between A and C at the same time. Its pending ranges
+ // would be C-E, E-A and A-B. Now both nodes need to be assigned pending range E-A,
+ // which we would be unable to represent with the old Map. The same thing happens
+ // even more obviously for any nodes that boot simultaneously between same two nodes.
+ //
+ // So, we made two changes:
+ //
+ // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddress></tt> (now
+ // <tt>Map<String, Multimap<Range, InetAddress>></tt>, because replication strategy
+ // and options are per-KeySpace).
+ //
+ // Second, we added the bootstrapTokens and leavingEndpoints collections, so we can
+ // rebuild pendingRanges from the complete information of what is going on, when
+ // additional changes are made mid-operation.
+ //
+ // Finally, note that recording the tokens of joining nodes in bootstrapTokens also
+ // means we can detect and reject the addition of multiple nodes at the same token
+ // before one becomes part of the ring.
private BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.create();
-
- // we will need to know at all times what nodes are leaving and calculate ranges accordingly.
- // An anonymous pending ranges list is not enough, as that does not tell which node is leaving
- // and/or if the ranges are there because of bootstrap or leave operation.
- // (See CASSANDRA-603 for more detail + examples).
+ // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
private Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
-
+ // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
// nodes which are migrating to the new tokens in the ring