You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/20 15:20:29 UTC

svn commit: r1173136 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/locator/

Author: jbellis
Date: Tue Sep 20 13:20:28 2011
New Revision: 1173136

URL: http://svn.apache.org/viewvc?rev=1173136&view=rev
Log:
merge #3224 from 1.0

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/build.xml
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1172719
-/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0:1167085-1173134
+/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1173133
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Sep 20 13:20:28 2011
@@ -14,6 +14,8 @@
    true) and set default badness threshold to 0.1 (CASSANDRA-3229)
  * Base choice of random or "balanced" token on bootstrap on whether
    schema definitions were found (CASSANDRA-3219)
+ * Fixes for LeveledCompactionStrategy score computation, prioritization,
+   and scheduling (CASSANDRA-3224)
 
 
 1.0.0-beta1

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Tue Sep 20 13:20:28 2011
@@ -881,6 +881,7 @@ url=${svn.entry.url}?pathrev=${svn.entry
         <fileset dir="${build.dir}">
           <include name="${final.name}.jar" />
           <include name="${ant.project.name}-thrift-${version}.jar" />
+          <include name="${ant.project.name}-clientutil-${version}.jar" />
         </fileset>
       </copy>
       <copy todir="${dist.dir}/javadoc">

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/contrib:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1173133
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1173133
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1173133
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1173133
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1173133
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Sep 20 13:20:28 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1172719
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1172718
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1173134
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1173133
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Sep 20 13:20:28 2011
@@ -328,7 +328,7 @@ public class ColumnFamilyStore implement
      */
     public static void scrubDataDirectories(String table, String columnFamily)
     {
-        logger.info("Removing compacted SSTable files (see http://wiki.apache.org/cassandra/MemtableSSTable)");
+        logger.info("Removing compacted SSTable files from " + columnFamily + " (see http://wiki.apache.org/cassandra/MemtableSSTable)");
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table, columnFamily, true, true).entrySet())
         {
             Descriptor desc = sstableFiles.getKey();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Tue Sep 20 13:20:28 2011
@@ -76,9 +76,20 @@ public class LeveledCompactionStrategy e
         logger.info(this + " subscribed to the data tracker.");
 
         manifest = LeveledManifest.create(cfs, this.maxSSTableSize);
+        logger.debug("Created {}", manifest);
         // override min/max for this strategy
         cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
         cfs.setMinimumCompactionThreshold(1);
+
+        // TODO this is redundant wrt the kickoff in AbstractCompactionStrategy, once CASSANDRA-X is done
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                CompactionManager.instance.submitBackground(LeveledCompactionStrategy.this.cfs);
+            }
+        };
+        StorageService.optionalTasks.scheduleAtFixedRate(runnable, 5 * 60, 5, TimeUnit.SECONDS);
     }
 
     public void shutdown()
@@ -96,12 +107,17 @@ public class LeveledCompactionStrategy e
     {
         LeveledCompactionTask currentTask = task.get();
         if (currentTask != null && !currentTask.isDone())
+        {
+            logger.debug("Compaction still in progress for {}", this);
             return Collections.emptyList();
+        }
 
         Collection<SSTableReader> sstables = manifest.getCompactionCandidates();
-        logger.debug("CompactionManager candidates are {}", StringUtils.join(sstables, ","));
         if (sstables.isEmpty())
+        {
+            logger.debug("No compaction necessary for {}", this);
             return Collections.emptyList();
+        }
 
         LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize);
         return task.compareAndSet(currentTask, newTask)
@@ -139,4 +155,10 @@ public class LeveledCompactionStrategy e
             manifest.logDistribution();
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return String.format("LCS@%d(%s)", hashCode(), cfs.columnFamily);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java Tue Sep 20 13:20:28 2011
@@ -26,6 +26,7 @@ import java.io.IOError;
 import java.io.IOException;
 import java.util.*;
 
+import com.google.common.collect.Iterables;
 import org.apache.commons.lang.StringUtils;
 
 import org.slf4j.Logger;
@@ -37,6 +38,7 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
 import org.codehaus.jackson.JsonEncoding;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
@@ -47,6 +49,13 @@ public class LeveledManifest
 {
     private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class);
 
+    /**
+     * limit the number of L0 sstables we do at once, because compaction bloom filter creation
+     * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory
+     * or even OOMing when compacting highly overlapping sstables
+     */
+    private static int MAX_COMPACTING_L0 = 32;
+
     private final ColumnFamilyStore cfs;
     private final List<SSTableReader>[] generations;
     private final DecoratedKey[] lastCompactedKeys;
@@ -85,37 +94,37 @@ public class LeveledManifest
 
     private static void load(ColumnFamilyStore cfs, LeveledManifest manifest)
     {
+        File manifestFile = tryGetManifest(cfs);
+        if (manifestFile == null)
+            return;
+
         ObjectMapper m = new ObjectMapper();
         try
         {
-            File manifestFile = tryGetManifest(cfs);
-
-            if (manifestFile != null && manifestFile.exists())
+            JsonNode rootNode = m.readValue(manifestFile, JsonNode.class);
+            JsonNode generations = rootNode.get("generations");
+            assert generations.isArray();
+            for (JsonNode generation : generations)
             {
-                JsonNode rootNode = m.readValue(manifestFile, JsonNode.class);
-                JsonNode generations = rootNode.get("generations");
-                assert generations.isArray();
-                for (JsonNode generation : generations)
+                int level = generation.get("generation").getIntValue();
+                JsonNode generationValues = generation.get("members");
+                for (JsonNode generationValue : generationValues)
                 {
-                    int level = generation.get("generation").getIntValue();
-                    JsonNode generationValues = generation.get("members");
-                    for (JsonNode generationValue : generationValues)
+                    for (SSTableReader ssTableReader : cfs.getSSTables())
                     {
-                        for (SSTableReader ssTableReader : cfs.getSSTables())
+                        if (ssTableReader.descriptor.generation == generationValue.getIntValue())
                         {
-                            if (ssTableReader.descriptor.generation == generationValue.getIntValue())
-                            {
-                                logger.debug("Loading {} at L{}", ssTableReader, level);
-                                manifest.add(ssTableReader, level);
-                            }
+                            logger.debug("Loading {} at L{}", ssTableReader, level);
+                            manifest.add(ssTableReader, level);
                         }
                     }
                 }
             }
         }
-        catch (IOException e)
+        catch (Exception e)
         {
-            throw new IOError(e);
+            // TODO try to recover -old first
+            logger.error("Manifest present but corrupt. Cassandra will compact levels from scratch", e);
         }
     }
 
@@ -140,8 +149,9 @@ public class LeveledManifest
 
     public synchronized void promote(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
     {
-        logger.debug("Replacing [{}] with [{}]", StringUtils.join(removed.iterator(), ", "), StringUtils.join(added.iterator(), ", "));
-        
+        if (logger.isDebugEnabled())
+            logger.debug((Iterables.isEmpty(added) ? "Removing [" : "Replacing [") + toString(removed) + "]");
+
         // the level for the added sstables is the max of the removed ones,
         // plus one if the removed were all on the same level
         int minimumLevel = Integer.MAX_VALUE;
@@ -160,16 +170,25 @@ public class LeveledManifest
 
         int newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel;
         newLevel = skipLevels(newLevel, added);
+        assert newLevel > 0;
+        logger.debug("Adding [{}] at L{}", StringUtils.join(added.iterator(), ", "), newLevel);
 
         lastCompactedKeys[minimumLevel] = SSTable.sstableOrdering.max(added).last;
-        logger.debug("Adding [{}] to L{}",
-                     StringUtils.join(added.iterator(), ", "), newLevel);
         for (SSTableReader ssTableReader : added)
             add(ssTableReader, newLevel);
 
         serialize();
     }
 
+    private String toString(Iterable<SSTableReader> sstables)
+    {
+        StringBuilder builder = new StringBuilder();
+        for (SSTableReader sstable : sstables)
+        {
+            builder.append(sstable.toString()).append(" (L").append(levelOf(sstable)).append("), ");
+        }
+        return builder.toString();
+    }
 
     private double maxBytesForLevel (int level)
     {
@@ -182,28 +201,47 @@ public class LeveledManifest
     {
         logDistribution();
 
-        double bestScore = -1;
-        int bestLevel = -1;
-        for (int level = 0; level < generations.length; level++)
+        // LevelDB gives each level a score of how much data it contains vs its ideal amount, and
+        // compacts the level with the highest score. But this falls apart spectacularly once you
+        // get behind.  Consider this set of levels:
+        // L0: 988 [ideal: 4]
+        // L1: 117 [ideal: 10]
+        // L2: 12  [ideal: 100]
+        //
+        // The problem is that L0 has a much higher score (almost 250) than L1 (11), so what we'll
+        // do is compact a batch of MAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the
+        // result (say, 120 sstables) in L1. Then we'll compact the next batch of MAX_COMPACTING_L0,
+        // and so forth.  So we spend most of our i/o rewriting the L1 data with each batch.
+        //
+        // If we could just do *all* L0 a single time with L1, that would be ideal.  But we can't
+        // -- see the javadoc for MAX_COMPACTING_L0.
+        //
+        // LevelDB's way around this is to simply block writes if L0 compaction falls behind.
+        // We don't have that luxury.
+        //
+        // So instead, we force compacting higher levels first.  This may not minimize the number
+        // of reads done as quickly in the short term, but it minimizes the i/o needed to compact
+        // optimially which gives us a long term win.
+        for (int i = generations.length - 1; i >= 0; i--)
         {
-            List<SSTableReader> sstables = generations[level];
+            List<SSTableReader> sstables = generations[i];
             if (sstables.isEmpty())
-                continue;
-
-            double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(level);
-            //if we're idle and we don't have anything better to do schedule a compaction for L0
-            //by setting its threshold to some very low value
-            score = (level == 0 && score < 1) ? 1.001 : 0;
-            logger.debug("Compaction score for level {} is {}", level, score);
-            if (score > bestScore)
+                continue; // mostly this just avoids polluting the debug log with zero scores
+            double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(i);
+            logger.debug("Compaction score for level {} is {}", i, score);
+
+            // L0 gets a special case that if we don't have anything more important to do,
+            // we'll go ahead and compact even just one sstable
+            if (score > 1 || i == 0)
             {
-                bestScore = score;
-                bestLevel = level;
+                Collection<SSTableReader> candidates = getCandidatesFor(i);
+                if (logger.isDebugEnabled())
+                    logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
+                return candidates;
             }
         }
 
-        // if we have met at least one of our thresholds then trigger a compaction
-        return bestScore > 1 ? getCandidatesFor(bestLevel) : Collections.<SSTableReader>emptyList();
+        return Collections.emptyList();
     }
 
     public int getLevelSize(int i)
@@ -215,7 +253,13 @@ public class LeveledManifest
     public void logDistribution()
     {
         for (int i = 0; i < generations.length; i++)
-            logger.debug("Level {} contains {} SSTables", i, generations[i].size());
+        {
+            if (!generations[i].isEmpty())
+            {
+                logger.debug("L{} contains {} SSTables ({} bytes) in {}",
+                             new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this});
+            }
+        }
     }
 
     private int levelOf(SSTableReader sstable)
@@ -258,25 +302,21 @@ public class LeveledManifest
     private Collection<SSTableReader> getCandidatesFor(int level)
     {
         assert !generations[level].isEmpty();
+        logger.debug("Choosing candidates for L{}", level);
 
         if (level == 0)
         {
             // because L0 files may overlap each other, we treat compactions there specially:
             // a L0 compaction also checks other L0 files for overlap.
             Set<SSTableReader> candidates = new HashSet<SSTableReader>();
-            Set<SSTableReader> remaining = new HashSet<SSTableReader>(generations[0]);
-
-            while (!remaining.isEmpty())
-            {
-                // pick a random sstable from L0, and any that overlap with it
-                List<SSTableReader> L0 = overlapping(remaining.iterator().next(), remaining);
-                // add the overlapping ones from L1
-                for (SSTableReader sstable : L0)
-                {
-                    candidates.addAll(overlapping(sstable, generations[1]));
-                    remaining.remove(sstable);
-                }
-            }
+            // pick the oldest sstable from L0, and any that overlap with it
+            List<SSTableReader> ageSortedSSTables = new ArrayList<SSTableReader>(generations[0]);
+            Collections.sort(ageSortedSSTables, SSTable.maxTimestampComparator);
+            List<SSTableReader> L0 = overlapping(ageSortedSSTables.get(0), generations[0]);
+            L0 = L0.size() > MAX_COMPACTING_L0 ? L0.subList(0, MAX_COMPACTING_L0) : L0;
+            // add the overlapping ones from L1
+            for (SSTableReader sstable : L0)
+                candidates.addAll(overlapping(sstable, generations[1]));
             return candidates;
         }
 
@@ -294,18 +334,16 @@ public class LeveledManifest
 
     public synchronized void serialize()
     {
-        String dataFileLocation = getDataFilePrefix(cfs);
-        String tempManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "tmp.json";
-        String manifestFileName = dataFileLocation + cfs.getColumnFamilyName() + ".json";
-        String oldManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "old.json";
-
-        File tmpManifest = new File(tempManifestFileName);
+        File manifestFile = tryGetManifest(cfs);
+        if (manifestFile == null)
+            manifestFile = new File(new File(DatabaseDescriptor.getAllDataFileLocations()[0], cfs.table.name), cfs.columnFamily + ".json");
+        File oldFile = new File(manifestFile.getPath().replace(".json", "-old.json"));
+        File tmpFile = new File(manifestFile.getPath().replace(".json", "-tmp.json"));
 
         JsonFactory f = new JsonFactory();
-
         try
         {
-            JsonGenerator g = f.createJsonGenerator(tmpManifest, JsonEncoding.UTF8);
+            JsonGenerator g = f.createJsonGenerator(tmpFile, JsonEncoding.UTF8);
             g.useDefaultPrettyPrinter();
             g.writeStartObject();
             g.writeArrayFieldStart("generations");
@@ -323,36 +361,39 @@ public class LeveledManifest
             g.writeEndArray(); // for field generations
             g.writeEndObject(); // write global object
             g.close();
+
+            if (oldFile.exists() && manifestFile.exists())
+                FileUtils.deleteWithConfirm(oldFile);
+            if (manifestFile.exists())
+                FileUtils.renameWithConfirm(manifestFile, oldFile);
+            assert tmpFile.exists();
+            FileUtils.renameWithConfirm(tmpFile, manifestFile);
+            logger.debug("Saved manifest {}", manifestFile);
         }
         catch (IOException e)
         {
-            e.printStackTrace();
+            throw new IOError(e);
         }
-        File oldFile = new File(oldManifestFileName);
-        if (oldFile.exists())
-            oldFile.delete();
-        File currentManifest = new File(manifestFileName);
-        if (currentManifest.exists())
-            currentManifest.renameTo(new File(oldManifestFileName));
-        if (tmpManifest.exists())
-            tmpManifest.renameTo(new File(manifestFileName));
     }
 
     public static File tryGetManifest(ColumnFamilyStore cfs)
     {
-        for (String dataFileLocation : DatabaseDescriptor.getAllDataFileLocations())
+        for (String dir : DatabaseDescriptor.getAllDataFileLocations())
         {
-            dataFileLocation = getDataFilePrefix(cfs);
-            String manifestFileName = dataFileLocation + System.getProperty("file.separator") + cfs.table.name + ".json";
-            File manifestFile = new File(manifestFileName);
+            File manifestFile = new File(new File(dir, cfs.table.name), cfs.columnFamily + ".json");
             if (manifestFile.exists())
+            {
+                logger.debug("Loading manifest from {}", manifestFile);
                 return manifestFile;
+            }
         }
+        logger.debug("No level manifest found");
         return null;
     }
 
-    public static String getDataFilePrefix(ColumnFamilyStore cfs)
+    @Override
+    public String toString()
     {
-        return DatabaseDescriptor.getAllDataFileLocations()[0] + System.getProperty("file.separator") + cfs.table.name + System.getProperty("file.separator");
+        return "Manifest@" + hashCode();
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1173136&r1=1173135&r2=1173136&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Tue Sep 20 13:20:28 2011
@@ -43,22 +43,34 @@ public class TokenMetadata
     /* Maintains token to endpoint map of every node in the cluster. */
     private BiMap<Token, InetAddress> tokenToEndpointMap;
 
-    // Suppose that there is a ring of nodes A, C and E, with replication factor 3.
+    // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> pendingRanges<tt>,
+    // which was added to when a node began bootstrap and removed from when it finished.
+    //
+    // This is inadequate when multiple changes are allowed simultaneously.  For example,
+    // suppose that there is a ring of nodes A, C and E, with replication factor 3.
     // Node D bootstraps between C and E, so its pending ranges will be E-A, A-C and C-D.
-    // Now suppose node B bootstraps between A and C at the same time. Its pending ranges would be C-E, E-A and A-B.
-    // Now both nodes have pending range E-A in their list, which will cause pending range collision
-    // even though we're only talking about replica range, not even primary range. The same thing happens
-    // for any nodes that boot simultaneously between same two nodes. For this we cannot simply make pending ranges a <tt>Multimap</tt>,
-    // since that would make us unable to notice the real problem of two nodes trying to boot using the same token.
-    // In order to do this properly, we need to know what tokens are booting at any time.
+    // Now suppose node B bootstraps between A and C at the same time. Its pending ranges
+    // would be C-E, E-A and A-B. Now both nodes need to be assigned pending range E-A,
+    // which we would be unable to represent with the old Map.  The same thing happens
+    // even more obviously for any nodes that boot simultaneously between same two nodes.
+    //
+    // So, we made two changes:
+    //
+    // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddress></tt> (now
+    // <tt>Map<String, Multimap<Range, InetAddress>></tt>, because replication strategy
+    // and options are per-KeySpace).
+    //
+    // Second, we added the bootstrapTokens and leavingEndpoints collections, so we can
+    // rebuild pendingRanges from the complete information of what is going on, when
+    // additional changes are made mid-operation.
+    //
+    // Finally, note that recording the tokens of joining nodes in bootstrapTokens also
+    // means we can detect and reject the addition of multiple nodes at the same token
+    // before one becomes part of the ring.
     private BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.create();
-
-    // we will need to know at all times what nodes are leaving and calculate ranges accordingly.
-    // An anonymous pending ranges list is not enough, as that does not tell which node is leaving
-    // and/or if the ranges are there because of bootstrap or leave operation.
-    // (See CASSANDRA-603 for more detail + examples).
+    // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
     private Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
-
+    // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
     private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
 
     // nodes which are migrating to the new tokens in the ring