You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/06/26 17:55:14 UTC
[2/3] git commit: fixes for small-sstable compaction patch by
slebresne; reviewed by jbellis for CASSANDRA-4341
fixes for small-sstable compaction
patch by slebresne; reviewed by jbellis for CASSANDRA-4341
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/80d7d43e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/80d7d43e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/80d7d43e
Branch: refs/heads/trunk
Commit: 80d7d43e2952d7612ab4406bca800c2b6f30c85f
Parents: a846cd6
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 22 13:15:51 2012 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 26 17:53:28 2012 +0200
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionTask.java | 4 +-
.../db/compaction/LeveledCompactionTask.java | 4 +-
.../cassandra/db/compaction/LeveledManifest.java | 21 ++++++++++-----
.../io/compress/CompressedSequentialWriter.java | 6 ++++
.../apache/cassandra/io/sstable/SSTableWriter.java | 5 +++
.../apache/cassandra/io/util/SequentialWriter.java | 12 ++++++++
6 files changed, 41 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a697b1f..e9fcdcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -170,7 +170,7 @@ public class CompactionTask extends AbstractCompactionTask
}
}
}
- if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
+ if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
@@ -230,7 +230,7 @@ public class CompactionTask extends AbstractCompactionTask
}
//extensibility point for other strategies that may want to limit the upper bounds of the sstable segment size
- protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position)
+ protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException
{
return false;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 2512f9d..3e58379 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -66,9 +66,9 @@ public class LeveledCompactionTask extends CompactionTask
}
@Override
- protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position)
+ protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException
{
- return position > sstableSizeInMB * 1024L * 1024L;
+ return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a53d519..dca8b7d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -179,7 +179,7 @@ public class LeveledManifest
return;
int newLevel;
- if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) < maxSSTableSizeInBytes)
+ if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) <= maxSSTableSizeInBytes)
{
// special case for tiny L0 sstables; see CASSANDRA-4341
newLevel = 0;
@@ -273,7 +273,7 @@ public class LeveledManifest
logger.debug("Compaction score for level {} is {}", i, score);
// L0 gets a special case that if we don't have anything more important to do,
- // we'll go ahead and compact even just one sstable
+ // we'll go ahead and compact if we have more than one sstable
if (score > 1.001 || (i == 0 && sstables.size() > 1))
{
Collection<SSTableReader> candidates = getCandidatesFor(i);
@@ -350,9 +350,9 @@ public class LeveledManifest
sstableGenerations.put(sstable, Integer.valueOf(level));
}
- private static List<SSTableReader> overlapping(SSTableReader sstable, Iterable<SSTableReader> candidates)
+ private static Set<SSTableReader> overlapping(SSTableReader sstable, Iterable<SSTableReader> candidates)
{
- List<SSTableReader> overlapped = new ArrayList<SSTableReader>();
+ Set<SSTableReader> overlapped = new HashSet<SSTableReader>();
overlapped.add(sstable);
Range<Token> promotedRange = new Range<Token>(sstable.first.token, sstable.last.token);
@@ -378,7 +378,7 @@ public class LeveledManifest
// 1a. add sstables to the candidate set until we have at least maxSSTableSizeInMB
// 1b. prefer choosing older sstables as candidates, to newer ones
// 1c. any L0 sstables that overlap a candidate, will also become candidates
- // 2. At most MAX_COMPACTING_L0 sstables will be compacted at once
+ // 2. At most MAX_COMPACTING_L0 sstables from L0 will be compacted at once
// 3. If total candidate size is less than maxSSTableSizeInMB, we won't bother compacting with L1,
// and the result of the compaction will stay in L0 instead of being promoted (see promote())
//
@@ -408,7 +408,14 @@ public class LeveledManifest
// limit to only the MAX_COMPACTING_L0 oldest candidates
List<SSTableReader> ageSortedCandidates = new ArrayList<SSTableReader>(candidates);
Collections.sort(ageSortedCandidates, SSTable.maxTimestampComparator);
- return ageSortedCandidates.subList(0, MAX_COMPACTING_L0);
+ candidates = new HashSet<SSTableReader>(ageSortedCandidates.subList(0, MAX_COMPACTING_L0));
+ if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes)
+ {
+ // add sstables from L1 that overlap candidates
+ for (SSTableReader candidate : new ArrayList<SSTableReader>(candidates))
+ candidates.addAll(overlapping(candidate, generations[1]));
+ }
+ return candidates;
}
if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes)
@@ -443,7 +450,7 @@ public class LeveledManifest
while (true)
{
SSTableReader sstable = generations[level].get(i);
- List<SSTableReader> candidates = overlapping(sstable, generations[(level + 1)]);
+ Set<SSTableReader> candidates = overlapping(sstable, generations[(level + 1)]);
for (SSTableReader candidate : candidates)
{
if (candidate.isMarkedSuspect())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 6a466f7..7c924fe 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -69,6 +69,12 @@ public class CompressedSequentialWriter extends SequentialWriter
}
@Override
+ public long getOnDiskFilePointer() throws IOException
+ {
+ return out.getFilePointer();
+ }
+
+ @Override
public void sync() throws IOException
{
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index d74514f..3e7e7a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -383,6 +383,11 @@ public class SSTableWriter extends SSTable
return dataFile.getFilePointer();
}
+ public long getOnDiskFilePointer() throws IOException
+ {
+ return dataFile.getOnDiskFilePointer();
+ }
+
/**
* Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 70098cb..0f0bc9e 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -245,6 +245,18 @@ public class SequentialWriter extends OutputStream
return current;
}
+ /**
+ * Return the current file pointer of the underlying on-disk file.
+ * Note that since write works by buffering data, the value of this will increase by buffer
+ * size and not every write to the writer will modify this value.
+ * Furthermore, for compressed files, this value refers to compressed data, while the
+ * writer getFilePointer() refers to uncompressedFile
+ */
+ public long getOnDiskFilePointer() throws IOException
+ {
+ return getFilePointer();
+ }
+
public long length() throws IOException
{
return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);