You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/06/26 17:55:14 UTC

[2/3] git commit: fixes for small-sstable compaction patch by slebresne; reviewed by jbellis for CASSANDRA-4341

fixes for small-sstable compaction
patch by slebresne; reviewed by jbellis for CASSANDRA-4341

Conflicts:

	src/java/org/apache/cassandra/db/compaction/CompactionTask.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/80d7d43e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/80d7d43e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/80d7d43e

Branch: refs/heads/trunk
Commit: 80d7d43e2952d7612ab4406bca800c2b6f30c85f
Parents: a846cd6
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 22 13:15:51 2012 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 26 17:53:28 2012 +0200

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionTask.java    |    4 +-
 .../db/compaction/LeveledCompactionTask.java       |    4 +-
 .../cassandra/db/compaction/LeveledManifest.java   |   21 ++++++++++-----
 .../io/compress/CompressedSequentialWriter.java    |    6 ++++
 .../apache/cassandra/io/sstable/SSTableWriter.java |    5 +++
 .../apache/cassandra/io/util/SequentialWriter.java |   12 ++++++++
 6 files changed, 41 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a697b1f..e9fcdcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -170,7 +170,7 @@ public class CompactionTask extends AbstractCompactionTask
                         }
                     }
                 }
-                if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
+                if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
                 {
                     SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
                     cachedKeyMap.put(toIndex, cachedKeys);
@@ -230,7 +230,7 @@ public class CompactionTask extends AbstractCompactionTask
     }
 
     //extensibility point for other strategies that may want to limit the upper bounds of the sstable segment size
-    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position)
+    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException
     {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 2512f9d..3e58379 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -66,9 +66,9 @@ public class LeveledCompactionTask extends CompactionTask
     }
 
     @Override
-    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position)
+    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException
     {
-        return position > sstableSizeInMB * 1024L * 1024L;
+        return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a53d519..dca8b7d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -179,7 +179,7 @@ public class LeveledManifest
             return;
 
         int newLevel;
-        if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) < maxSSTableSizeInBytes)
+        if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) <= maxSSTableSizeInBytes)
         {
             // special case for tiny L0 sstables; see CASSANDRA-4341
             newLevel = 0;
@@ -273,7 +273,7 @@ public class LeveledManifest
             logger.debug("Compaction score for level {} is {}", i, score);
 
             // L0 gets a special case that if we don't have anything more important to do,
-            // we'll go ahead and compact even just one sstable
+            // we'll go ahead and compact if we have more than one sstable
             if (score > 1.001 || (i == 0 && sstables.size() > 1))
             {
                 Collection<SSTableReader> candidates = getCandidatesFor(i);
@@ -350,9 +350,9 @@ public class LeveledManifest
         sstableGenerations.put(sstable, Integer.valueOf(level));
     }
 
-    private static List<SSTableReader> overlapping(SSTableReader sstable, Iterable<SSTableReader> candidates)
+    private static Set<SSTableReader> overlapping(SSTableReader sstable, Iterable<SSTableReader> candidates)
     {
-        List<SSTableReader> overlapped = new ArrayList<SSTableReader>();
+        Set<SSTableReader> overlapped = new HashSet<SSTableReader>();
         overlapped.add(sstable);
 
         Range<Token> promotedRange = new Range<Token>(sstable.first.token, sstable.last.token);
@@ -378,7 +378,7 @@ public class LeveledManifest
             // 1a. add sstables to the candidate set until we have at least maxSSTableSizeInMB
             // 1b. prefer choosing older sstables as candidates, to newer ones
             // 1c. any L0 sstables that overlap a candidate, will also become candidates
-            // 2. At most MAX_COMPACTING_L0 sstables will be compacted at once
+            // 2. At most MAX_COMPACTING_L0 sstables from L0 will be compacted at once
             // 3. If total candidate size is less than maxSSTableSizeInMB, we won't bother compacting with L1,
             //    and the result of the compaction will stay in L0 instead of being promoted (see promote())
             //
@@ -408,7 +408,14 @@ public class LeveledManifest
                     // limit to only the MAX_COMPACTING_L0 oldest candidates
                     List<SSTableReader> ageSortedCandidates = new ArrayList<SSTableReader>(candidates);
                     Collections.sort(ageSortedCandidates, SSTable.maxTimestampComparator);
-                    return ageSortedCandidates.subList(0, MAX_COMPACTING_L0);
+                    candidates = new HashSet<SSTableReader>(ageSortedCandidates.subList(0, MAX_COMPACTING_L0));
+                    if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes)
+                    {
+                        // add sstables from L1 that overlap candidates
+                        for (SSTableReader candidate : new ArrayList<SSTableReader>(candidates))
+                            candidates.addAll(overlapping(candidate, generations[1]));
+                    }
+                    return candidates;
                 }
 
                 if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes)
@@ -443,7 +450,7 @@ public class LeveledManifest
         while (true)
         {
             SSTableReader sstable = generations[level].get(i);
-            List<SSTableReader> candidates = overlapping(sstable, generations[(level + 1)]);
+            Set<SSTableReader> candidates = overlapping(sstable, generations[(level + 1)]);
             for (SSTableReader candidate : candidates)
             {
                 if (candidate.isMarkedSuspect())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 6a466f7..7c924fe 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -69,6 +69,12 @@ public class CompressedSequentialWriter extends SequentialWriter
     }
 
     @Override
+    public long getOnDiskFilePointer() throws IOException
+    {
+        return out.getFilePointer();
+    }
+
+    @Override
     public void sync() throws IOException
     {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index d74514f..3e7e7a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -383,6 +383,11 @@ public class SSTableWriter extends SSTable
         return dataFile.getFilePointer();
     }
 
+    public long getOnDiskFilePointer() throws IOException
+    {
+        return dataFile.getOnDiskFilePointer();
+    }
+
     /**
      * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 70098cb..0f0bc9e 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -245,6 +245,18 @@ public class SequentialWriter extends OutputStream
         return current;
     }
 
+    /**
+     * Return the current file pointer of the underlying on-disk file.
+     * Note that since write works by buffering data, the value of this will increase by buffer
+     * size and not every write to the writer will modify this value.
+     * Furthermore, for compressed files, this value refers to compressed data, while the
+     * writer getFilePointer() refers to uncompressedFile
+     */
+    public long getOnDiskFilePointer() throws IOException
+    {
+        return getFilePointer();
+    }
+
     public long length() throws IOException
     {
         return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);