You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2013/08/14 08:46:17 UTC
[1/2] git commit: Use a range aware scanner for cleanup to improve
performance.
Updated Branches:
refs/heads/trunk c651f8362 -> a69457c14
Use a range aware scanner for cleanup to improve performance.
Patch by thobbs, stuhood and marcuse, reviewed by marcuse for CASSANDRA-2524
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9e846d9f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9e846d9f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9e846d9f
Branch: refs/heads/trunk
Commit: 9e846d9ff69f825f6200f7f75fdfc53926bfc255
Parents: 0fc15e5
Author: Marcus Eriksson <ma...@spotify.com>
Authored: Wed Aug 14 08:41:30 2013 +0200
Committer: Marcus Eriksson <ma...@spotify.com>
Committed: Wed Aug 14 08:44:05 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 12 +
.../db/compaction/CompactionManager.java | 178 ++++++++----
.../cassandra/io/sstable/SSTableReader.java | 18 +-
.../cassandra/io/sstable/SSTableScanner.java | 150 ++++++----
.../org/apache/cassandra/db/RowCacheTest.java | 37 ++-
.../io/sstable/SSTableScannerTest.java | 287 +++++++++++++++++++
7 files changed, 563 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f7e439c..805dca2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,7 +3,7 @@
* Log Merkle tree stats (CASSANDRA-2698)
* Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
* Improve offheap memcpy performance (CASSANDRA-5884)
-
+ * Use a range aware scanner for cleanup (CASSANDRA-2524)
2.0.0-rc2
* enable vnodes by default (CASSANDRA-5869)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7b11672..2adda32 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1459,6 +1459,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return columns;
}
+ public void cleanupCache()
+ {
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+
+ for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+ {
+ DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(key.key));
+ if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
+ invalidateCachedRow(dk);
+ }
+ }
+
public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
{
public boolean needsFiltering()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 35354c8..ed6770f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -97,6 +97,8 @@ public class CompactionManager implements CompactionManagerMBean
private final CompactionExecutor executor = new CompactionExecutor();
private final CompactionExecutor validationExecutor = new ValidationExecutor();
+ private final static CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor();
+
private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor);
private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
@@ -461,7 +463,7 @@ public class CompactionManager implements CompactionManagerMBean
*
* @throws IOException
*/
- private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
+ private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
{
assert !cfs.isIndex();
Keyspace keyspace = cfs.keyspace;
@@ -472,8 +474,8 @@ public class CompactionManager implements CompactionManagerMBean
return;
}
- boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
boolean hasIndexes = !cfs.indexManager.getIndexes().isEmpty();
+ CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, renewer);
for (SSTableReader sstable : sstables)
{
@@ -500,10 +502,9 @@ public class CompactionManager implements CompactionManagerMBean
if (compactionFileLocation == null)
throw new IOException("disk full");
- SSTableScanner scanner = sstable.getScanner(getRateLimiter());
- List<Column> indexedColumnsInRow = null;
+ ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
+ CleanupInfo ci = new CleanupInfo(sstable, (SSTableScanner)scanner);
- CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
SSTableWriter writer = createWriter(cfs,
compactionFileLocation,
@@ -517,50 +518,13 @@ public class CompactionManager implements CompactionManagerMBean
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- if (Range.isInRanges(row.getKey().token, ranges))
- {
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (writer.append(compactedRow) != null)
- totalkeysWritten++;
- }
- else
- {
- cfs.invalidateCachedRow(row.getKey());
-
- if (hasIndexes || isCommutative)
- {
- if (indexedColumnsInRow != null)
- indexedColumnsInRow.clear();
-
- while (row.hasNext())
- {
- OnDiskAtom column = row.next();
- if (column instanceof CounterColumn)
- renewer.maybeRenew((CounterColumn) column);
- if (column instanceof Column && cfs.indexManager.indexes((Column) column))
- {
- if (indexedColumnsInRow == null)
- indexedColumnsInRow = new ArrayList<Column>();
-
- indexedColumnsInRow.add((Column) column);
- }
- }
-
- if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
- {
- // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
- Keyspace.switchLock.readLock().lock();
- try
- {
- cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
- }
- finally
- {
- Keyspace.switchLock.readLock().unlock();
- }
- }
- }
- }
+
+ row = cleanupStrategy.cleanup(row);
+ if (row == null)
+ continue;
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+ if (writer.append(compactedRow) != null)
+ totalkeysWritten++;
}
if (totalkeysWritten > 0)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
@@ -599,6 +563,114 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ private static abstract class CleanupStrategy
+ {
+ public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
+ {
+ if (!cfs.indexManager.getIndexes().isEmpty() || cfs.metadata.getDefaultValidator().isCommutative())
+ return new Full(cfs, ranges, renewer);
+
+ return new Bounded(cfs, ranges);
+ }
+
+ public abstract ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter);
+ public abstract SSTableIdentityIterator cleanup(SSTableIdentityIterator row);
+
+ private static final class Bounded extends CleanupStrategy
+ {
+ private final Collection<Range<Token>> ranges;
+
+ public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
+ {
+ this.ranges = ranges;
+ cacheCleanupExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ cfs.cleanupCache();
+ }
+ });
+
+ }
+ @Override
+ public ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter)
+ {
+ return sstable.getScanner(ranges, limiter);
+ }
+
+ @Override
+ public SSTableIdentityIterator cleanup(SSTableIdentityIterator row)
+ {
+ return row;
+ }
+ }
+
+ private static final class Full extends CleanupStrategy
+ {
+ private final Collection<Range<Token>> ranges;
+ private final ColumnFamilyStore cfs;
+ private List<Column> indexedColumnsInRow;
+ private final CounterId.OneShotRenewer renewer;
+
+ public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
+ {
+ this.cfs = cfs;
+ this.ranges = ranges;
+ this.indexedColumnsInRow = null;
+ this.renewer = renewer;
+ }
+
+ @Override
+ public ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter)
+ {
+ return sstable.getScanner(limiter);
+ }
+
+ @Override
+ public SSTableIdentityIterator cleanup(SSTableIdentityIterator row)
+ {
+ if (Range.isInRanges(row.getKey().token, ranges))
+ return row;
+
+ cfs.invalidateCachedRow(row.getKey());
+
+ if (indexedColumnsInRow != null)
+ indexedColumnsInRow.clear();
+
+ while (row.hasNext())
+ {
+ OnDiskAtom column = row.next();
+ if (column instanceof CounterColumn)
+ renewer.maybeRenew((CounterColumn) column);
+
+ if (column instanceof Column && cfs.indexManager.indexes((Column) column))
+ {
+ if (indexedColumnsInRow == null)
+ indexedColumnsInRow = new ArrayList<>();
+
+ indexedColumnsInRow.add((Column) column);
+ }
+ }
+
+ if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
+ {
+ // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
+ Keyspace.switchLock.readLock().lock();
+ try
+ {
+ cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
+ }
+ finally
+ {
+ Keyspace.switchLock.readLock().unlock();
+ }
+ }
+ return null;
+ }
+ }
+ }
+
public static SSTableWriter createWriter(ColumnFamilyStore cfs,
File compactionFileLocation,
int expectedBloomFilterSize,
@@ -847,6 +919,14 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ private static class CacheCleanupExecutor extends CompactionExecutor
+ {
+ public CacheCleanupExecutor()
+ {
+ super(1, "CacheCleanupExecutor");
+ }
+ }
+
public interface CompactionExecutorStatsCollector
{
void beginCompaction(CompactionInfo.Holder ci);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 4a62d85..bbca089 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1081,11 +1081,23 @@ public class SSTableReader extends SSTable
{
if (range == null)
return getScanner(limiter);
+ return getScanner(Collections.singletonList(range), limiter);
+ }
+ /**
+ * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
+ *
+ * @param ranges the range of keys to cover
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
+ {
// We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
- return range.intersects(new Bounds(first.token, last.token))
- ? new SSTableScanner(this, DataRange.forKeyRange(range), limiter)
- : new EmptyCompactionScanner(getFilename());
+ List<Pair<Long, Long>> positions = getPositionsForRanges(Range.normalize(ranges));
+ if (positions.isEmpty())
+ return new EmptyCompactionScanner(getFilename());
+ else
+ return new SSTableScanner(this, ranges, limiter);
}
public FileDataInput getFileDataInput(long position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 3e1db50..13ce706 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -18,18 +18,26 @@
package org.apache.cassandra.io.sstable;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
-import com.google.common.util.concurrent.RateLimiter;
import com.google.common.collect.AbstractIterator;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
import org.apache.cassandra.db.columniterator.LazyColumnIterator;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -39,28 +47,17 @@ public class SSTableScanner implements ICompactionScanner
protected final RandomAccessReader dfile;
protected final RandomAccessReader ifile;
public final SSTableReader sstable;
- private final DataRange dataRange;
- /*
- * There is 2 cases:
- * - Either dataRange is not wrapping, and we just need to read
- * everything between startKey and endKey.
- * - Or dataRange is wrapping: we must the read everything between
- * the beginning of the file and the endKey, and then everything from
- * the startKey to the end of the file.
- *
- * In the first case, we seek to the start and read until stop. In the
- * second one, we don't seek just yet, but read until stopAt and then
- * seek to start (re-adjusting stopAt to be the end of the file in isDone())
- */
- private boolean hasSeeked;
- private long stopAt;
+ private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
+ private AbstractBounds<RowPosition> currentRange;
+
+ private final DataRange dataRange;
protected Iterator<OnDiskAtomIterator> iterator;
/**
* @param sstable SSTable to scan; must not be null
- * @param filter range of data to fetch; must not be null
+ * @param dataRange a single range to scan; must not be null
* @param limiter background i/o RateLimiter; may be null
*/
SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
@@ -71,21 +68,50 @@ public class SSTableScanner implements ICompactionScanner
this.ifile = sstable.openIndexReader();
this.sstable = sstable;
this.dataRange = dataRange;
- this.stopAt = computeStopAt();
- // If we wrap (stopKey == minimum don't count), we'll seek to start *after* having read from beginning till stopAt
- if (dataRange.stopKey().isMinimum(sstable.partitioner) || !dataRange.isWrapAround())
- seekToStart();
+ List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
+ if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum(sstable.partitioner))
+ {
+ // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
+ // 2) the part that comes before the wrap-around
+ boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey()));
+ boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound()));
+ }
+ else
+ {
+ boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey()));
+ }
+ this.rangeIterator = boundsList.iterator();
}
- private void seekToStart()
+ /**
+ * @param sstable SSTable to scan; must not be null
+ * @param tokenRanges A set of token ranges to scan
+ * @param limiter background i/o RateLimiter; may be null
+ */
+ SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
{
- hasSeeked = true;
+ assert sstable != null;
+
+ this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
+ this.ifile = sstable.openIndexReader();
+ this.sstable = sstable;
+ this.dataRange = null;
+
+ List<Range<Token>> normalized = Range.normalize(tokenRanges);
+ List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
+ for (Range<Token> range : normalized)
+ boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner), range.right.maxKeyBound(sstable.partitioner)));
+
+ this.rangeIterator = boundsList.iterator();
+ }
- if (dataRange.startKey().isMinimum(sstable.partitioner))
+ private void seekToCurrentRangeStart()
+ {
+ if (currentRange.left.isMinimum(sstable.partitioner))
return;
- long indexPosition = sstable.getIndexScanPosition(dataRange.startKey());
+ long indexPosition = sstable.getIndexScanPosition(currentRange.left);
// -1 means the key is before everything in the sstable. So just start from the beginning.
if (indexPosition == -1)
return;
@@ -93,12 +119,15 @@ public class SSTableScanner implements ICompactionScanner
ifile.seek(indexPosition);
try
{
+
while (!ifile.isEOF())
{
indexPosition = ifile.getFilePointer();
DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
- int comparison = indexDecoratedKey.compareTo(dataRange.startKey());
- if (comparison >= 0)
+ int comparison = indexDecoratedKey.compareTo(currentRange.left);
+ // because our range start may be inclusive or exclusive, we need to also contains()
+ // instead of just checking (comparison >= 0)
+ if (comparison > 0 || currentRange.contains(indexDecoratedKey))
{
// Found, just read the dataPosition and seek into index and data files
long dataPosition = ifile.readLong();
@@ -117,16 +146,6 @@ public class SSTableScanner implements ICompactionScanner
sstable.markSuspect();
throw new CorruptSSTableException(e, sstable.getFilename());
}
-
- }
-
- private long computeStopAt()
- {
- if (dataRange.stopKey().isMinimum(sstable.partitioner))
- return dfile.length();
-
- RowIndexEntry position = sstable.getPosition(dataRange.stopKey(), SSTableReader.Operator.GT);
- return position == null ? dfile.length() : position.position;
}
public void close() throws IOException
@@ -184,55 +203,59 @@ public class SSTableScanner implements ICompactionScanner
{
try
{
- if (ifile.isEOF() && nextKey == null)
- return endOfData();
-
- if (currentKey == null)
+ if (nextEntry == null)
{
- currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
- currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
+ do
+ {
+ // we're starting the first range or we just passed the end of the previous range
+ if (!rangeIterator.hasNext())
+ return endOfData();
+
+ currentRange = rangeIterator.next();
+ seekToCurrentRangeStart();
+
+ if (ifile.isEOF())
+ return endOfData();
+
+ currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
+ } while (!currentRange.contains(currentKey));
}
else
{
+ // we're in the middle of a range
currentKey = nextKey;
currentEntry = nextEntry;
}
- if (currentEntry.position >= stopAt)
- {
- // We're in the wrapping, if we have just read the first part (we haven't seeked yet),
- // seek to the beginning of the 2nd part and continue;
- if (!hasSeeked)
- {
- seekToStart(); // This sets hasSeeked
- stopAt = dfile.length();
- // reset currentKey and nextKey since we have seeked
- currentKey = null;
- nextKey = null;
- return computeNext();
- }
- return endOfData();
- }
-
+ long readEnd;
if (ifile.isEOF())
{
- nextKey = null;
nextEntry = null;
+ nextKey = null;
+ readEnd = dfile.length();
}
else
{
+ // we need the position of the start of the next key, regardless of whether it falls in the current range
nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
nextEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
+ readEnd = nextEntry.position;
+
+ if (!currentRange.contains(nextKey))
+ {
+ nextKey = null;
+ nextEntry = null;
+ }
}
- assert !dfile.isEOF();
- if (dataRange.selectsFullRowFor(currentKey.key))
+ if (dataRange == null || dataRange.selectsFullRowFor(currentKey.key))
{
dfile.seek(currentEntry.position);
ByteBufferUtil.readWithShortLength(dfile); // key
if (sstable.descriptor.version.hasRowSizeAndColumnCount)
dfile.readLong();
- long dataSize = (nextEntry == null ? dfile.length() : nextEntry.position) - dfile.getFilePointer();
+ long dataSize = readEnd - dfile.getFilePointer();
return new SSTableIdentityIterator(sstable, dfile, currentKey, dataSize);
}
@@ -243,6 +266,7 @@ public class SSTableScanner implements ICompactionScanner
return dataRange.columnFilter(currentKey.key).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
}
});
+
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 85302ee..8934a27 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
+import java.net.InetAddress;
import java.util.Collection;
import org.junit.AfterClass;
@@ -26,8 +27,12 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
public class RowCacheTest extends SchemaLoader
{
@@ -134,7 +139,29 @@ public class RowCacheTest extends SchemaLoader
public void testRowCacheLoad() throws Exception
{
CacheService.instance.setRowCacheCapacityInMB(1);
- rowCacheLoad(100, Integer.MAX_VALUE);
+ rowCacheLoad(100, Integer.MAX_VALUE, 0);
+ CacheService.instance.setRowCacheCapacityInMB(0);
+ }
+
+ @Test
+ public void testRowCacheCleanup() throws Exception
+ {
+ StorageService.instance.initServer(0);
+ CacheService.instance.setRowCacheCapacityInMB(1);
+ rowCacheLoad(100, Integer.MAX_VALUE, 1000);
+
+ ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY);
+ assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100);
+ store.cleanupCache();
+ assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100);
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ byte[] tk1, tk2;
+ tk1 = "key1000".getBytes();
+ tk2 = "key1050".getBytes();
+ tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+ store.cleanupCache();
+ assertEquals(CacheService.instance.rowCache.getKeySet().size(), 50);
CacheService.instance.setRowCacheCapacityInMB(0);
}
@@ -142,11 +169,11 @@ public class RowCacheTest extends SchemaLoader
public void testRowCachePartialLoad() throws Exception
{
CacheService.instance.setRowCacheCapacityInMB(1);
- rowCacheLoad(100, 50);
+ rowCacheLoad(100, 50, 0);
CacheService.instance.setRowCacheCapacityInMB(0);
}
- public void rowCacheLoad(int totalKeys, int keysToSave) throws Exception
+ public void rowCacheLoad(int totalKeys, int keysToSave, int offset) throws Exception
{
CompactionManager.instance.disableAutoCompaction();
@@ -157,8 +184,8 @@ public class RowCacheTest extends SchemaLoader
assert CacheService.instance.rowCache.size() == 0;
// insert data and fill the cache
- insertData(KEYSPACE, COLUMN_FAMILY, 0, totalKeys);
- readData(KEYSPACE, COLUMN_FAMILY, 0, totalKeys);
+ insertData(KEYSPACE, COLUMN_FAMILY, offset, totalKeys);
+ readData(KEYSPACE, COLUMN_FAMILY, offset, totalKeys);
assert CacheService.instance.rowCache.size() == totalKeys;
// force the cache to disk
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
new file mode 100644
index 0000000..3787b3e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@ -0,0 +1,287 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.sstable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static junit.framework.Assert.*;
+
+public class SSTableScannerTest extends SchemaLoader
+{
+ public static final String KEYSPACE = "Keyspace1";
+ public static final String TABLE = "Standard1";
+
+ private static String toKey(int key)
+ {
+ return String.format("%03d", key);
+ }
+
+ private static Bounds<RowPosition> boundsFor(int start, int end)
+ {
+ return new Bounds<RowPosition>(new BytesToken(toKey(start).getBytes()).minKeyBound(),
+ new BytesToken(toKey(end).getBytes()).maxKeyBound());
+ }
+
+
+ private static Range<Token> rangeFor(int start, int end)
+ {
+ return new Range<Token>(new BytesToken(toKey(start).getBytes()),
+ new BytesToken(toKey(end).getBytes()));
+ }
+
+ private static Collection<Range<Token>> makeRanges(int ... keys)
+ {
+ Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(keys.length / 2);
+ for (int i = 0; i < keys.length; i += 2)
+ ranges.add(rangeFor(keys[i], keys[i + 1]));
+ return ranges;
+ }
+
+ private static void insertRowWithKey(int key)
+ {
+ long timestamp = System.currentTimeMillis();
+ DecoratedKey decoratedKey = Util.dk(toKey(key));
+ RowMutation rm = new RowMutation(KEYSPACE, decoratedKey.key);
+ rm.add(TABLE, ByteBufferUtil.bytes("col"), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 1000);
+ rm.apply();
+ }
+
+ private static void assertScanMatches(SSTableReader sstable, int scanStart, int scanEnd, int expectedStart, int expectedEnd)
+ {
+ SSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
+ for (int i = expectedStart; i <= expectedEnd; i++)
+ assertEquals(toKey(i), new String(scanner.next().getKey().key.array()));
+ assertFalse(scanner.hasNext());
+ }
+
+ private static void assertScanEmpty(SSTableReader sstable, int scanStart, int scanEnd)
+ {
+ SSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
+ assertFalse(String.format("scan of (%03d, %03d] should be empty", scanStart, scanEnd), scanner.hasNext());
+ }
+
+ @Test
+ public void testSingleDataRange()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(TABLE);
+ store.clearUnsafe();
+
+ // disable compaction while flushing
+ store.disableAutoCompaction();
+
+ for (int i = 2; i < 10; i++)
+ insertRowWithKey(i);
+ store.forceBlockingFlush();
+
+ assertEquals(1, store.getSSTables().size());
+ SSTableReader sstable = store.getSSTables().iterator().next();
+
+ // full range scan
+ SSTableScanner scanner = sstable.getScanner();
+ for (int i = 2; i < 10; i++)
+ assertEquals(toKey(i), new String(scanner.next().getKey().key.array()));
+
+ // a simple read of a chunk in the middle
+ assertScanMatches(sstable, 3, 6, 3, 6);
+
+ // start of range edge conditions
+ assertScanMatches(sstable, 1, 9, 2, 9);
+ assertScanMatches(sstable, 2, 9, 2, 9);
+ assertScanMatches(sstable, 3, 9, 3, 9);
+
+ // end of range edge conditions
+ assertScanMatches(sstable, 1, 8, 2, 8);
+ assertScanMatches(sstable, 1, 9, 2, 9);
+ assertScanMatches(sstable, 1, 9, 2, 9);
+
+ // single item ranges
+ assertScanMatches(sstable, 2, 2, 2, 2);
+ assertScanMatches(sstable, 5, 5, 5, 5);
+ assertScanMatches(sstable, 9, 9, 9, 9);
+
+ // empty ranges
+ assertScanEmpty(sstable, 0, 1);
+ assertScanEmpty(sstable, 10, 11);
+ }
+
+ private static void assertScanContainsRanges(ICompactionScanner scanner, int ... rangePairs)
+ {
+ assert rangePairs.length % 2 == 0;
+
+ for (int pairIdx = 0; pairIdx < rangePairs.length; pairIdx += 2)
+ {
+ int rangeStart = rangePairs[pairIdx];
+ int rangeEnd = rangePairs[pairIdx + 1];
+
+ for (int expected = rangeStart; expected <= rangeEnd; expected++)
+ {
+ assertTrue(String.format("Expected to see key %03d", expected), scanner.hasNext());
+ assertEquals(toKey(expected), new String(scanner.next().getKey().key.array()));
+ }
+ }
+ assertFalse(scanner.hasNext());
+ }
+
+ @Test
+ public void testMultipleRanges()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(TABLE);
+ store.clearUnsafe();
+
+ // disable compaction while flushing
+ store.disableAutoCompaction();
+
+ for (int i = 0; i < 3; i++)
+ for (int j = 2; j < 10; j++)
+ insertRowWithKey(i * 100 + j);
+ store.forceBlockingFlush();
+
+ assertEquals(1, store.getSSTables().size());
+ SSTableReader sstable = store.getSSTables().iterator().next();
+
+ // full range scan
+ SSTableScanner fullScanner = sstable.getScanner();
+ assertScanContainsRanges(fullScanner,
+ 2, 9,
+ 102, 109,
+ 202, 209);
+
+
+ // scan all three ranges separately
+ ICompactionScanner scanner = sstable.getScanner(makeRanges(1, 9,
+ 101, 109,
+ 201, 209),
+ null);
+ assertScanContainsRanges(scanner,
+ 2, 9,
+ 102, 109,
+ 202, 209);
+
+ // skip the first range
+ scanner = sstable.getScanner(makeRanges(101, 109,
+ 201, 209),
+ null);
+ assertScanContainsRanges(scanner,
+ 102, 109,
+ 202, 209);
+
+ // skip the second range
+ scanner = sstable.getScanner(makeRanges(1, 9,
+ 201, 209),
+ null);
+ assertScanContainsRanges(scanner,
+ 2, 9,
+ 202, 209);
+
+
+ // skip the last range
+ scanner = sstable.getScanner(makeRanges(1, 9,
+ 101, 109),
+ null);
+ assertScanContainsRanges(scanner,
+ 2, 9,
+ 102, 109);
+
+ // the first scanned range stops short of the actual data in the first range
+ scanner = sstable.getScanner(makeRanges(1, 5,
+ 101, 109,
+ 201, 209),
+ null);
+ assertScanContainsRanges(scanner,
+ 2, 5,
+ 102, 109,
+ 202, 209);
+
+ // the first scanned range requests data beyond actual data in the first range
+ scanner = sstable.getScanner(makeRanges(1, 20,
+ 101, 109,
+ 201, 209),
+ null);
+ assertScanContainsRanges(scanner,
+ 2, 9,
+ 102, 109,
+ 202, 209);
+
+
+ // the middle scan range splits the outside two data ranges
+ scanner = sstable.getScanner(makeRanges(1, 5,
+ 6, 205,
+ 206, 209),
+ null);
+ assertScanContainsRanges(scanner,
+ 2, 5,
+ 7, 9,
+ 102, 109,
+ 202, 205,
+ 207, 209);
+
+ // empty ranges
+ scanner = sstable.getScanner(makeRanges(0, 1,
+ 2, 20,
+ 101, 109,
+ 150, 159,
+ 201, 209,
+ 1000, 1001),
+ null);
+ assertScanContainsRanges(scanner,
+ 3, 9,
+ 102, 109,
+ 202, 209);
+
+ // out of order ranges
+ scanner = sstable.getScanner(makeRanges(201, 209,
+ 1, 20,
+ 201, 209,
+ 101, 109,
+ 1000, 1001,
+ 150, 159),
+ null);
+ assertScanContainsRanges(scanner,
+ 2, 9,
+ 102, 109,
+ 202, 209);
+
+ // only empty ranges
+ scanner = sstable.getScanner(makeRanges(0, 1,
+ 150, 159,
+ 250, 259),
+ null);
+ assertFalse(scanner.hasNext());
+
+ // no ranges is equivalent to a full scan
+ scanner = sstable.getScanner(new ArrayList<Range<Token>>(), null);
+ assertFalse(scanner.hasNext());
+ }
+}
[2/2] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a69457c1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a69457c1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a69457c1
Branch: refs/heads/trunk
Commit: a69457c14a36c5eb3db523b3eb42b6d399993177
Parents: c651f83 9e846d9
Author: Marcus Eriksson <ma...@spotify.com>
Authored: Wed Aug 14 08:45:34 2013 +0200
Committer: Marcus Eriksson <ma...@spotify.com>
Committed: Wed Aug 14 08:45:34 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 12 +
.../db/compaction/CompactionManager.java | 178 ++++++++----
.../cassandra/io/sstable/SSTableReader.java | 18 +-
.../cassandra/io/sstable/SSTableScanner.java | 150 ++++++----
.../org/apache/cassandra/db/RowCacheTest.java | 37 ++-
.../io/sstable/SSTableScannerTest.java | 287 +++++++++++++++++++
7 files changed, 563 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a69457c1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a69457c1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------