You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/08/15 22:17:29 UTC
[3/5] git commit: Cleanup doesn't need to inspect sstables that
contain only local data patch by Tyler Hobbs;
reviewed by jbellis for CASSANDRA-5722
Cleanup doesn't need to inspect sstables that contain only local data
patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5722
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc12d73a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc12d73a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc12d73a
Branch: refs/heads/trunk
Commit: bc12d73a5a0f31ab8258b3d2a35063b5750df91c
Parents: 16fcd15
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Aug 15 15:17:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Aug 15 15:17:13 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../db/compaction/CompactionManager.java | 58 ++++++++++
.../org/apache/cassandra/dht/IPartitioner.java | 3 +-
.../cassandra/io/sstable/SSTableReader.java | 44 +++++++-
.../apache/cassandra/io/util/SegmentedFile.java | 2 +-
.../db/compaction/CompactionsTest.java | 112 +++++++++++++++++++
6 files changed, 216 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 805dca2..4c9f9a1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@
* Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
* Improve offheap memcpy performance (CASSANDRA-5884)
* Use a range aware scanner for cleanup (CASSANDRA-2524)
+ * Cleanup doesn't need to inspect sstables that contain only local data
+ (CASSANDRA-5722)
+
2.0.0-rc2
* enable vnodes by default (CASSANDRA-5869)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ed6770f..8e8220f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -458,6 +458,59 @@ public class CompactionManager implements CompactionManagerMBean
}
/**
+ * Determines if a cleanup would actually remove any data in this SSTable based
+ * on a set of owned ranges.
+ */
+ static boolean needsCleanup(SSTableReader sstable, Collection<Range<Token>> ownedRanges)
+ {
+ assert !ownedRanges.isEmpty(); // cleanup checks for this
+
+ // unwrap and sort the ranges by LHS token
+ List<Range<Token>> sortedRanges = Range.normalize(ownedRanges);
+
+ // see if there are any keys LTE the token for the start of the first range
+ // (token range ownership is exclusive on the LHS.)
+ Range<Token> firstRange = sortedRanges.get(0);
+ if (sstable.first.token.compareTo(firstRange.left) <= 0)
+ return true;
+
+ // then, iterate over all owned ranges and see if the next key beyond the end of the owned
+ // range falls before the start of the next range
+ for (int i = 0; i < sortedRanges.size(); i++)
+ {
+ Range<Token> range = sortedRanges.get(i);
+ if (range.right.isMinimum())
+ {
+ // we split a wrapping range and this is the second half.
+ // there can't be any keys beyond this (and this is the last range)
+ return false;
+ }
+
+ DecoratedKey firstBeyondRange = sstable.firstKeyBeyond(range.right.maxKeyBound());
+ if (firstBeyondRange == null)
+ {
+ // we ran off the end of the sstable looking for the next key; we don't need to check any more ranges
+ return false;
+ }
+
+ if (i == (ownedRanges.size() - 1))
+ {
+ // we're at the last range and we found a key beyond the end of the range
+ return true;
+ }
+
+ Range<Token> nextRange = sortedRanges.get(i + 1);
+ if (!nextRange.contains(firstBeyondRange.token))
+ {
+ // we found a key in between the owned ranges
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
* This function goes over each file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
*
@@ -484,6 +537,11 @@ public class CompactionManager implements CompactionManagerMBean
cfs.replaceCompactedSSTables(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
continue;
}
+ if (!needsCleanup(sstable, ranges))
+ {
+ logger.debug("Skipping {} for cleanup; all rows should be kept", sstable);
+ continue;
+ }
CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs));
long start = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index 084a1e5..46165b8 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -43,7 +43,8 @@ public interface IPartitioner<T extends Token>
public Token midpoint(Token left, Token right);
/**
- * @return The minimum possible Token in the range that is being partitioned.
+ * @return A Token smaller than all others in the range that is being partitioned.
+ * Not legal to assign to a node or key. (But legal to use in range scans.)
*/
public T getMinimumToken();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index bbca089..9e3f774 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -68,9 +68,6 @@ public class SSTableReader extends SSTable
{
private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
- // guesstimated size of INDEX_INTERVAL index entries
- private static final int INDEX_FILE_BUFFER_BYTES = 16 * CFMetaData.DEFAULT_INDEX_INTERVAL;
-
/**
* maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound
* to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
@@ -878,7 +875,7 @@ public class SSTableReader extends SSTable
// is lesser than the first key of next interval (and in that case we must return the position of the first key
// of the next interval).
int i = 0;
- Iterator<FileDataInput> segments = ifile.iterator(sampledPosition, INDEX_FILE_BUFFER_BYTES);
+ Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
while (segments.hasNext() && i <= indexSummary.getIndexInterval())
{
FileDataInput in = segments.next();
@@ -961,6 +958,45 @@ public class SSTableReader extends SSTable
}
/**
+ * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
+ */
+ public DecoratedKey firstKeyBeyond(RowPosition token)
+ {
+ long sampledPosition = getIndexScanPosition(token);
+ if (sampledPosition == -1)
+ sampledPosition = 0;
+
+ Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
+ while (segments.hasNext())
+ {
+ FileDataInput in = segments.next();
+ try
+ {
+ while (!in.isEOF())
+ {
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+ DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+ if (indexDecoratedKey.compareTo(token) > 0)
+ return indexDecoratedKey;
+
+ RowIndexEntry.serializer.skip(in);
+ }
+ }
+ catch (IOException e)
+ {
+ markSuspect();
+ throw new CorruptSSTableException(e, in.getPath());
+ }
+ finally
+ {
+ FileUtils.closeQuietly(in);
+ }
+ }
+
+ return null;
+ }
+
+ /**
* @return The length in bytes of the data for this SSTable. For
* compressed files, this is not the same thing as the on disk size (see
* onDiskLength())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 8681b03..6231fd7 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -82,7 +82,7 @@ public abstract class SegmentedFile
/**
* @return An Iterator over segments, beginning with the segment containing the given position: each segment must be closed after use.
*/
- public Iterator<FileDataInput> iterator(long position, int bufferSize)
+ public Iterator<FileDataInput> iterator(long position)
{
return new SegmentIterator(position);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a775988..bc89f4f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -39,7 +39,9 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.dht.BytesToken;
import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
@@ -343,4 +345,114 @@ public class CompactionsTest extends SchemaLoader
cf = cfs.getColumnFamily(filter);
assert cf == null || cf.getColumnCount() == 0 : "should be empty: " + cf;
}
+
+ private static Range<Token> rangeFor(int start, int end)
+ {
+ return new Range<Token>(new BytesToken(String.format("%03d", start).getBytes()),
+ new BytesToken(String.format("%03d", end).getBytes()));
+ }
+
+ private static Collection<Range<Token>> makeRanges(int ... keys)
+ {
+ Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(keys.length / 2);
+ for (int i = 0; i < keys.length; i += 2)
+ ranges.add(rangeFor(keys[i], keys[i + 1]));
+ return ranges;
+ }
+
+ private static void insertRowWithKey(int key)
+ {
+ long timestamp = System.currentTimeMillis();
+ DecoratedKey decoratedKey = Util.dk(String.format("%03d", key));
+ RowMutation rm = new RowMutation(KEYSPACE1, decoratedKey.key);
+ rm.add("Standard1", ByteBufferUtil.bytes("col"), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 1000);
+ rm.apply();
+ }
+
+ @Test
+ public void testNeedsCleanup() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+ store.clearUnsafe();
+
+ // disable compaction while flushing
+ store.disableAutoCompaction();
+
+ // write three groups of 9 keys: 001, 002, ... 008, 009
+ // 101, 102, ... 108, 109
+ // 201, 202, ... 208, 209
+ for (int i = 1; i < 10; i++)
+ {
+ insertRowWithKey(i);
+ insertRowWithKey(i + 100);
+ insertRowWithKey(i + 200);
+ }
+ store.forceBlockingFlush();
+
+ assertEquals(1, store.getSSTables().size());
+ SSTableReader sstable = store.getSSTables().iterator().next();
+
+
+ // contiguous range spans all data
+ assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 209)));
+ assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 210)));
+
+ // separate ranges span all data
+ assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+ 100, 109,
+ 200, 209)));
+ assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 109,
+ 200, 210)));
+ assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+ 100, 210)));
+
+ // one range is missing completely
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(100, 109,
+ 200, 209)));
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+ 200, 209)));
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+ 100, 109)));
+
+
+ // the beginning of one range is missing
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(1, 9,
+ 100, 109,
+ 200, 209)));
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+ 101, 109,
+ 200, 209)));
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+ 100, 109,
+ 201, 209)));
+
+ // the end of one range is missing
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 8,
+ 100, 109,
+ 200, 209)));
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+ 100, 108,
+ 200, 209)));
+ assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+ 100, 109,
+ 200, 208)));
+
+ // some ranges don't contain any data
+ assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 0,
+ 0, 9,
+ 50, 51,
+ 100, 109,
+ 150, 199,
+ 200, 209,
+ 300, 301)));
+ // same case, but with a middle range not covering some of the existing data
+ assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 0,
+ 0, 9,
+ 50, 51,
+ 100, 103,
+ 150, 199,
+ 200, 209,
+ 300, 301)));
+ }
}