You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2013/08/14 08:46:17 UTC

[1/2] git commit: Use a range aware scanner for cleanup to improve performance.

Updated Branches:
  refs/heads/trunk c651f8362 -> a69457c14


Use a range aware scanner for cleanup to improve performance.

Patch by thobbs, stuhood and marcuse, reviewed by marcuse for CASSANDRA-2524


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9e846d9f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9e846d9f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9e846d9f

Branch: refs/heads/trunk
Commit: 9e846d9ff69f825f6200f7f75fdfc53926bfc255
Parents: 0fc15e5
Author: Marcus Eriksson <ma...@spotify.com>
Authored: Wed Aug 14 08:41:30 2013 +0200
Committer: Marcus Eriksson <ma...@spotify.com>
Committed: Wed Aug 14 08:44:05 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  12 +
 .../db/compaction/CompactionManager.java        | 178 ++++++++----
 .../cassandra/io/sstable/SSTableReader.java     |  18 +-
 .../cassandra/io/sstable/SSTableScanner.java    | 150 ++++++----
 .../org/apache/cassandra/db/RowCacheTest.java   |  37 ++-
 .../io/sstable/SSTableScannerTest.java          | 287 +++++++++++++++++++
 7 files changed, 563 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f7e439c..805dca2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,7 +3,7 @@
  * Log Merkle tree stats (CASSANDRA-2698)
  * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
  * Improve offheap memcpy performance (CASSANDRA-5884)
-
+ * Use a range aware scanner for cleanup (CASSANDRA-2524)
 
 2.0.0-rc2
  * enable vnodes by default (CASSANDRA-5869)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7b11672..2adda32 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1459,6 +1459,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return columns;
     }
 
+    public void cleanupCache()
+    {
+        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+
+        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+        {
+            DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(key.key));
+            if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
+                invalidateCachedRow(dk);
+        }
+    }
+
     public static abstract class AbstractScanIterator extends AbstractIterator<Row> implements CloseableIterator<Row>
     {
         public boolean needsFiltering()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 35354c8..ed6770f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -97,6 +97,8 @@ public class CompactionManager implements CompactionManagerMBean
 
     private final CompactionExecutor executor = new CompactionExecutor();
     private final CompactionExecutor validationExecutor = new ValidationExecutor();
+    private final static CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor();
+
     private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor);
     private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
 
@@ -461,7 +463,7 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * @throws IOException
      */
-    private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
+    private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables, CounterId.OneShotRenewer renewer) throws IOException
     {
         assert !cfs.isIndex();
         Keyspace keyspace = cfs.keyspace;
@@ -472,8 +474,8 @@ public class CompactionManager implements CompactionManagerMBean
             return;
         }
 
-        boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
         boolean hasIndexes = !cfs.indexManager.getIndexes().isEmpty();
+        CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, renewer);
 
         for (SSTableReader sstable : sstables)
         {
@@ -500,10 +502,9 @@ public class CompactionManager implements CompactionManagerMBean
             if (compactionFileLocation == null)
                 throw new IOException("disk full");
 
-            SSTableScanner scanner = sstable.getScanner(getRateLimiter());
-            List<Column> indexedColumnsInRow = null;
+            ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
+            CleanupInfo ci = new CleanupInfo(sstable, (SSTableScanner)scanner);
 
-            CleanupInfo ci = new CleanupInfo(sstable, scanner);
             metrics.beginCompaction(ci);
             SSTableWriter writer = createWriter(cfs,
                                                 compactionFileLocation,
@@ -517,50 +518,13 @@ public class CompactionManager implements CompactionManagerMBean
                     if (ci.isStopRequested())
                         throw new CompactionInterruptedException(ci.getCompactionInfo());
                     SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-                    if (Range.isInRanges(row.getKey().token, ranges))
-                    {
-                        AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                        if (writer.append(compactedRow) != null)
-                            totalkeysWritten++;
-                    }
-                    else
-                    {
-                        cfs.invalidateCachedRow(row.getKey());
-
-                        if (hasIndexes || isCommutative)
-                        {
-                            if (indexedColumnsInRow != null)
-                                indexedColumnsInRow.clear();
-
-                            while (row.hasNext())
-                            {
-                                OnDiskAtom column = row.next();
-                                if (column instanceof CounterColumn)
-                                    renewer.maybeRenew((CounterColumn) column);
-                                if (column instanceof Column && cfs.indexManager.indexes((Column) column))
-                                {
-                                    if (indexedColumnsInRow == null)
-                                        indexedColumnsInRow = new ArrayList<Column>();
-
-                                    indexedColumnsInRow.add((Column) column);
-                                }
-                            }
-
-                            if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
-                            {
-                                // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
-                                Keyspace.switchLock.readLock().lock();
-                                try
-                                {
-                                    cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
-                                }
-                                finally
-                                {
-                                    Keyspace.switchLock.readLock().unlock();
-                                }
-                            }
-                        }
-                    }
+
+                    row = cleanupStrategy.cleanup(row);
+                    if (row == null)
+                        continue;
+                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                    if (writer.append(compactedRow) != null)
+                        totalkeysWritten++;
                 }
                 if (totalkeysWritten > 0)
                     newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
@@ -599,6 +563,114 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    private static abstract class CleanupStrategy
+    {
+        public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
+        {
+            if (!cfs.indexManager.getIndexes().isEmpty() || cfs.metadata.getDefaultValidator().isCommutative())
+                return new Full(cfs, ranges, renewer);
+
+            return new Bounded(cfs, ranges);
+        }
+
+        public abstract ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter);
+        public abstract SSTableIdentityIterator cleanup(SSTableIdentityIterator row);
+
+        private static final class Bounded extends CleanupStrategy
+        {
+            private final Collection<Range<Token>> ranges;
+
+            public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges)
+            {
+                this.ranges = ranges;
+                cacheCleanupExecutor.submit(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        cfs.cleanupCache();
+                    }
+                });
+
+            }
+            @Override
+            public ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter)
+            {
+                return sstable.getScanner(ranges, limiter);
+            }
+
+            @Override
+            public SSTableIdentityIterator cleanup(SSTableIdentityIterator row)
+            {
+                return row;
+            }
+        }
+
+        private static final class Full extends CleanupStrategy
+        {
+            private final Collection<Range<Token>> ranges;
+            private final ColumnFamilyStore cfs;
+            private List<Column> indexedColumnsInRow;
+            private final CounterId.OneShotRenewer renewer;
+
+            public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, CounterId.OneShotRenewer renewer)
+            {
+                this.cfs = cfs;
+                this.ranges = ranges;
+                this.indexedColumnsInRow = null;
+                this.renewer = renewer;
+            }
+
+            @Override
+            public ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter)
+            {
+                return sstable.getScanner(limiter);
+            }
+
+            @Override
+            public SSTableIdentityIterator cleanup(SSTableIdentityIterator row)
+            {
+                if (Range.isInRanges(row.getKey().token, ranges))
+                    return row;
+
+                cfs.invalidateCachedRow(row.getKey());
+
+                if (indexedColumnsInRow != null)
+                    indexedColumnsInRow.clear();
+
+                while (row.hasNext())
+                {
+                    OnDiskAtom column = row.next();
+                    if (column instanceof CounterColumn)
+                        renewer.maybeRenew((CounterColumn) column);
+
+                    if (column instanceof Column && cfs.indexManager.indexes((Column) column))
+                    {
+                        if (indexedColumnsInRow == null)
+                            indexedColumnsInRow = new ArrayList<>();
+
+                        indexedColumnsInRow.add((Column) column);
+                    }
+                }
+
+                if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
+                {
+                    // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
+                    Keyspace.switchLock.readLock().lock();
+                    try
+                    {
+                        cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
+                    }
+                    finally
+                    {
+                        Keyspace.switchLock.readLock().unlock();
+                    }
+                }
+                return null;
+            }
+        }
+    }
+
     public static SSTableWriter createWriter(ColumnFamilyStore cfs,
                                              File compactionFileLocation,
                                              int expectedBloomFilterSize,
@@ -847,6 +919,14 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    private static class CacheCleanupExecutor extends CompactionExecutor
+    {
+        public CacheCleanupExecutor()
+        {
+            super(1, "CacheCleanupExecutor");
+        }
+    }
+
     public interface CompactionExecutorStatsCollector
     {
         void beginCompaction(CompactionInfo.Holder ci);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 4a62d85..bbca089 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1081,11 +1081,23 @@ public class SSTableReader extends SSTable
     {
         if (range == null)
             return getScanner(limiter);
+        return getScanner(Collections.singletonList(range), limiter);
+    }
 
+   /**
+    * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
+    *
+    * @param ranges the range of keys to cover
+    * @return A Scanner for seeking over the rows of the SSTable.
+    */
+    public ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
+    {
         // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
-        return range.intersects(new Bounds(first.token, last.token))
-             ? new SSTableScanner(this, DataRange.forKeyRange(range), limiter)
-             : new EmptyCompactionScanner(getFilename());
+        List<Pair<Long, Long>> positions = getPositionsForRanges(Range.normalize(ranges));
+        if (positions.isEmpty())
+            return new EmptyCompactionScanner(getFilename());
+        else
+            return new SSTableScanner(this, ranges, limiter);
     }
 
     public FileDataInput getFileDataInput(long position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 3e1db50..13ce706 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -18,18 +18,26 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
-import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
 import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -39,28 +47,17 @@ public class SSTableScanner implements ICompactionScanner
     protected final RandomAccessReader dfile;
     protected final RandomAccessReader ifile;
     public final SSTableReader sstable;
-    private final DataRange dataRange;
 
-    /*
-     * There is 2 cases:
-     *   - Either dataRange is not wrapping, and we just need to read
-     *     everything between startKey and endKey.
-     *   - Or dataRange is wrapping: we must the read everything between
-     *     the beginning of the file and the endKey, and then everything from
-     *     the startKey to the end of the file.
-     *
-     * In the first case, we seek to the start and read until stop. In the
-     * second one, we don't seek just yet, but read until stopAt and then
-     * seek to start (re-adjusting stopAt to be the end of the file in isDone())
-     */
-    private boolean hasSeeked;
-    private long stopAt;
+    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
+    private AbstractBounds<RowPosition> currentRange;
+
+    private final DataRange dataRange;
 
     protected Iterator<OnDiskAtomIterator> iterator;
 
     /**
      * @param sstable SSTable to scan; must not be null
-     * @param filter range of data to fetch; must not be null
+     * @param dataRange a single range to scan; must not be null
      * @param limiter background i/o RateLimiter; may be null
      */
     SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
@@ -71,21 +68,50 @@ public class SSTableScanner implements ICompactionScanner
         this.ifile = sstable.openIndexReader();
         this.sstable = sstable;
         this.dataRange = dataRange;
-        this.stopAt = computeStopAt();
 
-        // If we wrap (stopKey == minimum don't count), we'll seek to start *after* having read from beginning till stopAt
-        if (dataRange.stopKey().isMinimum(sstable.partitioner) || !dataRange.isWrapAround())
-            seekToStart();
+        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
+        if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum(sstable.partitioner))
+        {
+            // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
+            // 2) the part that comes before the wrap-around
+            boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey()));
+            boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound()));
+        }
+        else
+        {
+            boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey()));
+        }
+        this.rangeIterator = boundsList.iterator();
     }
 
-    private void seekToStart()
+    /**
+     * @param sstable SSTable to scan; must not be null
+     * @param tokenRanges A set of token ranges to scan
+     * @param limiter background i/o RateLimiter; may be null
+     */
+    SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
     {
-        hasSeeked = true;
+        assert sstable != null;
+
+        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
+        this.ifile = sstable.openIndexReader();
+        this.sstable = sstable;
+        this.dataRange = null;
+
+        List<Range<Token>> normalized = Range.normalize(tokenRanges);
+        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
+        for (Range<Token> range : normalized)
+            boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner), range.right.maxKeyBound(sstable.partitioner)));
+
+        this.rangeIterator = boundsList.iterator();
+    }
 
-        if (dataRange.startKey().isMinimum(sstable.partitioner))
+    private void seekToCurrentRangeStart()
+    {
+        if (currentRange.left.isMinimum(sstable.partitioner))
             return;
 
-        long indexPosition = sstable.getIndexScanPosition(dataRange.startKey());
+        long indexPosition = sstable.getIndexScanPosition(currentRange.left);
         // -1 means the key is before everything in the sstable. So just start from the beginning.
         if (indexPosition == -1)
             return;
@@ -93,12 +119,15 @@ public class SSTableScanner implements ICompactionScanner
         ifile.seek(indexPosition);
         try
         {
+
             while (!ifile.isEOF())
             {
                 indexPosition = ifile.getFilePointer();
                 DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                int comparison = indexDecoratedKey.compareTo(dataRange.startKey());
-                if (comparison >= 0)
+                int comparison = indexDecoratedKey.compareTo(currentRange.left);
+                // because our range start may be inclusive or exclusive, we need to also contains()
+                // instead of just checking (comparison >= 0)
+                if (comparison > 0 || currentRange.contains(indexDecoratedKey))
                 {
                     // Found, just read the dataPosition and seek into index and data files
                     long dataPosition = ifile.readLong();
@@ -117,16 +146,6 @@ public class SSTableScanner implements ICompactionScanner
             sstable.markSuspect();
             throw new CorruptSSTableException(e, sstable.getFilename());
         }
-
-    }
-
-    private long computeStopAt()
-    {
-        if (dataRange.stopKey().isMinimum(sstable.partitioner))
-            return dfile.length();
-
-        RowIndexEntry position = sstable.getPosition(dataRange.stopKey(), SSTableReader.Operator.GT);
-        return position == null ? dfile.length() : position.position;
     }
 
     public void close() throws IOException
@@ -184,55 +203,59 @@ public class SSTableScanner implements ICompactionScanner
         {
             try
             {
-                if (ifile.isEOF() && nextKey == null)
-                    return endOfData();
-
-                if (currentKey == null)
+                if (nextEntry == null)
                 {
-                    currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                    currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
+                    do
+                    {
+                        // we're starting the first range or we just passed the end of the previous range
+                        if (!rangeIterator.hasNext())
+                            return endOfData();
+
+                        currentRange = rangeIterator.next();
+                        seekToCurrentRangeStart();
+
+                        if (ifile.isEOF())
+                            return endOfData();
+
+                        currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+                        currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
+                    } while (!currentRange.contains(currentKey));
                 }
                 else
                 {
+                    // we're in the middle of a range
                     currentKey = nextKey;
                     currentEntry = nextEntry;
                 }
 
-                if (currentEntry.position >= stopAt)
-                {
-                    // We're in the wrapping, if we have just read the first part (we haven't seeked yet),
-                    // seek to the beginning of the 2nd part and continue;
-                    if (!hasSeeked)
-                    {
-                        seekToStart(); // This sets hasSeeked
-                        stopAt = dfile.length();
-                        // reset currentKey and nextKey since we have seeked
-                        currentKey = null;
-                        nextKey = null;
-                        return computeNext();
-                    }
-                    return endOfData();
-                }
-
+                long readEnd;
                 if (ifile.isEOF())
                 {
-                    nextKey = null;
                     nextEntry = null;
+                    nextKey = null;
+                    readEnd = dfile.length();
                 }
                 else
                 {
+                    // we need the position of the start of the next key, regardless of whether it falls in the current range
                     nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
                     nextEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
+                    readEnd = nextEntry.position;
+
+                    if (!currentRange.contains(nextKey))
+                    {
+                        nextKey = null;
+                        nextEntry = null;
+                    }
                 }
 
-                assert !dfile.isEOF();
-                if (dataRange.selectsFullRowFor(currentKey.key))
+                if (dataRange == null || dataRange.selectsFullRowFor(currentKey.key))
                 {
                     dfile.seek(currentEntry.position);
                     ByteBufferUtil.readWithShortLength(dfile); // key
                     if (sstable.descriptor.version.hasRowSizeAndColumnCount)
                         dfile.readLong();
-                    long dataSize = (nextEntry == null ? dfile.length() : nextEntry.position) - dfile.getFilePointer();
+                    long dataSize = readEnd - dfile.getFilePointer();
                     return new SSTableIdentityIterator(sstable, dfile, currentKey, dataSize);
                 }
 
@@ -243,6 +266,7 @@ public class SSTableScanner implements ICompactionScanner
                         return dataRange.columnFilter(currentKey.key).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
                     }
                 });
+
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 85302ee..8934a27 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.net.InetAddress;
 import java.util.Collection;
 
 import org.junit.AfterClass;
@@ -26,8 +27,12 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
 
 public class RowCacheTest extends SchemaLoader
 {
@@ -134,7 +139,29 @@ public class RowCacheTest extends SchemaLoader
     public void testRowCacheLoad() throws Exception
     {
         CacheService.instance.setRowCacheCapacityInMB(1);
-        rowCacheLoad(100, Integer.MAX_VALUE);
+        rowCacheLoad(100, Integer.MAX_VALUE, 0);
+        CacheService.instance.setRowCacheCapacityInMB(0);
+    }
+
+    @Test
+    public void testRowCacheCleanup() throws Exception
+    {
+        StorageService.instance.initServer(0);
+        CacheService.instance.setRowCacheCapacityInMB(1);
+        rowCacheLoad(100, Integer.MAX_VALUE, 1000);
+
+        ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY);
+        assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100);
+        store.cleanupCache();
+        assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100);
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        byte[] tk1, tk2;
+        tk1 = "key1000".getBytes();
+        tk2 = "key1050".getBytes();
+        tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+        store.cleanupCache();
+        assertEquals(CacheService.instance.rowCache.getKeySet().size(), 50);
         CacheService.instance.setRowCacheCapacityInMB(0);
     }
 
@@ -142,11 +169,11 @@ public class RowCacheTest extends SchemaLoader
     public void testRowCachePartialLoad() throws Exception
     {
         CacheService.instance.setRowCacheCapacityInMB(1);
-        rowCacheLoad(100, 50);
+        rowCacheLoad(100, 50, 0);
         CacheService.instance.setRowCacheCapacityInMB(0);
     }
 
-    public void rowCacheLoad(int totalKeys, int keysToSave) throws Exception
+    public void rowCacheLoad(int totalKeys, int keysToSave, int offset) throws Exception
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -157,8 +184,8 @@ public class RowCacheTest extends SchemaLoader
         assert CacheService.instance.rowCache.size() == 0;
 
         // insert data and fill the cache
-        insertData(KEYSPACE, COLUMN_FAMILY, 0, totalKeys);
-        readData(KEYSPACE, COLUMN_FAMILY, 0, totalKeys);
+        insertData(KEYSPACE, COLUMN_FAMILY, offset, totalKeys);
+        readData(KEYSPACE, COLUMN_FAMILY, offset, totalKeys);
         assert CacheService.instance.rowCache.size() == totalKeys;
 
         // force the cache to disk

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9e846d9f/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
new file mode 100644
index 0000000..3787b3e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@ -0,0 +1,287 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.sstable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static junit.framework.Assert.*;
+
+public class SSTableScannerTest extends SchemaLoader
+{
+    public static final String KEYSPACE = "Keyspace1";
+    public static final String TABLE = "Standard1";
+
+    private static String toKey(int key)
+    {
+        return String.format("%03d", key);
+    }
+
+    private static Bounds<RowPosition> boundsFor(int start, int end)
+    {
+        return new Bounds<RowPosition>(new BytesToken(toKey(start).getBytes()).minKeyBound(),
+                                       new BytesToken(toKey(end).getBytes()).maxKeyBound());
+    }
+
+
+    private static Range<Token> rangeFor(int start, int end)
+    {
+        return new Range<Token>(new BytesToken(toKey(start).getBytes()),
+                                new BytesToken(toKey(end).getBytes()));
+    }
+
+    private static Collection<Range<Token>> makeRanges(int ... keys)
+    {
+        Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(keys.length / 2);
+        for (int i = 0; i < keys.length; i += 2)
+            ranges.add(rangeFor(keys[i], keys[i + 1]));
+        return ranges;
+    }
+
+    private static void insertRowWithKey(int key)
+    {
+        long timestamp = System.currentTimeMillis();
+        DecoratedKey decoratedKey = Util.dk(toKey(key));
+        RowMutation rm = new RowMutation(KEYSPACE, decoratedKey.key);
+        rm.add(TABLE, ByteBufferUtil.bytes("col"), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 1000);
+        rm.apply();
+    }
+
+    private static void assertScanMatches(SSTableReader sstable, int scanStart, int scanEnd, int expectedStart, int expectedEnd)
+    {
+        SSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
+        for (int i = expectedStart; i <= expectedEnd; i++)
+            assertEquals(toKey(i), new String(scanner.next().getKey().key.array()));
+        assertFalse(scanner.hasNext());
+    }
+
+    private static void assertScanEmpty(SSTableReader sstable, int scanStart, int scanEnd)
+    {
+        SSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
+        assertFalse(String.format("scan of (%03d, %03d] should be empty", scanStart, scanEnd), scanner.hasNext());
+    }
+
+    @Test
+    public void testSingleDataRange()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(TABLE);
+        store.clearUnsafe();
+
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+
+        for (int i = 2; i < 10; i++)
+            insertRowWithKey(i);
+        store.forceBlockingFlush();
+
+        assertEquals(1, store.getSSTables().size());
+        SSTableReader sstable = store.getSSTables().iterator().next();
+
+        // full range scan
+        SSTableScanner scanner = sstable.getScanner();
+        for (int i = 2; i < 10; i++)
+            assertEquals(toKey(i), new String(scanner.next().getKey().key.array()));
+
+        // a simple read of a chunk in the middle
+        assertScanMatches(sstable, 3, 6, 3, 6);
+
+        // start of range edge conditions
+        assertScanMatches(sstable, 1, 9, 2, 9);
+        assertScanMatches(sstable, 2, 9, 2, 9);
+        assertScanMatches(sstable, 3, 9, 3, 9);
+
+        // end of range edge conditions
+        assertScanMatches(sstable, 1, 8, 2, 8);
+        assertScanMatches(sstable, 1, 9, 2, 9);
+        assertScanMatches(sstable, 1, 9, 2, 9);
+
+        // single item ranges
+        assertScanMatches(sstable, 2, 2, 2, 2);
+        assertScanMatches(sstable, 5, 5, 5, 5);
+        assertScanMatches(sstable, 9, 9, 9, 9);
+
+        // empty ranges
+        assertScanEmpty(sstable, 0, 1);
+        assertScanEmpty(sstable, 10, 11);
+    }
+
+    private static void assertScanContainsRanges(ICompactionScanner scanner, int ... rangePairs)
+    {
+        assert rangePairs.length % 2 == 0;
+
+        for (int pairIdx = 0; pairIdx < rangePairs.length; pairIdx += 2)
+        {
+            int rangeStart = rangePairs[pairIdx];
+            int rangeEnd = rangePairs[pairIdx + 1];
+
+            for (int expected = rangeStart; expected <= rangeEnd; expected++)
+            {
+                assertTrue(String.format("Expected to see key %03d", expected), scanner.hasNext());
+                assertEquals(toKey(expected), new String(scanner.next().getKey().key.array()));
+            }
+        }
+        assertFalse(scanner.hasNext());
+    }
+
+    @Test
+    public void testMultipleRanges()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(TABLE);
+        store.clearUnsafe();
+
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+
+        for (int i = 0; i < 3; i++)
+            for (int j = 2; j < 10; j++)
+                insertRowWithKey(i * 100 + j);
+        store.forceBlockingFlush();
+
+        assertEquals(1, store.getSSTables().size());
+        SSTableReader sstable = store.getSSTables().iterator().next();
+
+        // full range scan
+        SSTableScanner fullScanner = sstable.getScanner();
+        assertScanContainsRanges(fullScanner,
+                                 2, 9,
+                                 102, 109,
+                                 202, 209);
+
+
+        // scan all three ranges separately
+        ICompactionScanner scanner = sstable.getScanner(makeRanges(1, 9,
+                                                                   101, 109,
+                                                                   201, 209),
+                                                        null);
+        assertScanContainsRanges(scanner,
+                                 2, 9,
+                                 102, 109,
+                                 202, 209);
+
+        // skip the first range
+        scanner = sstable.getScanner(makeRanges(101, 109,
+                                                201, 209),
+                                     null);
+        assertScanContainsRanges(scanner,
+                                 102, 109,
+                                 202, 209);
+
+        // skip the second range
+        scanner = sstable.getScanner(makeRanges(1, 9,
+                                                201, 209),
+                                     null);
+        assertScanContainsRanges(scanner,
+                                 2, 9,
+                                 202, 209);
+
+
+        // skip the last range
+        scanner = sstable.getScanner(makeRanges(1, 9,
+                                                101, 109),
+                                     null);
+        assertScanContainsRanges(scanner,
+                                 2, 9,
+                                 102, 109);
+
+        // the first scanned range stops short of the actual data in the first range
+        scanner = sstable.getScanner(makeRanges(1, 5,
+                                                101, 109,
+                                                201, 209),
+                                     null);
+        assertScanContainsRanges(scanner,
+                                 2, 5,
+                                 102, 109,
+                                 202, 209);
+
+        // the first scanned range requests data beyond actual data in the first range
+        scanner = sstable.getScanner(makeRanges(1, 20,
+                                                101, 109,
+                                                201, 209),
+                                     null);
+        assertScanContainsRanges(scanner,
+                                 2, 9,
+                                 102, 109,
+                                 202, 209);
+
+
+        // the middle scan range splits the outside two data ranges
+        scanner = sstable.getScanner(makeRanges(1, 5,
+                                                6, 205,
+                                                206, 209),
+                                     null);
+        assertScanContainsRanges(scanner,
+                                 2, 5,
+                                 7, 9,
+                                 102, 109,
+                                 202, 205,
+                                 207, 209);
+
+        // empty ranges
+        scanner = sstable.getScanner(makeRanges(0, 1,
+                                                2, 20,
+                                                101, 109,
+                                                150, 159,
+                                                201, 209,
+                                                1000, 1001),
+                                     null);
+        assertScanContainsRanges(scanner,
+                                 3, 9,
+                                 102, 109,
+                                 202, 209);
+
+        // out of order ranges
+        scanner = sstable.getScanner(makeRanges(201, 209,
+                                                1, 20,
+                                                201, 209,
+                                                101, 109,
+                                                1000, 1001,
+                                                150, 159),
+                                     null);
+        assertScanContainsRanges(scanner,
+                                 2, 9,
+                                 102, 109,
+                                 202, 209);
+
+        // only empty ranges
+        scanner = sstable.getScanner(makeRanges(0, 1,
+                                                150, 159,
+                                                250, 259),
+                                     null);
+        assertFalse(scanner.hasNext());
+
+        // no ranges is equivalent to a full scan
+        scanner = sstable.getScanner(new ArrayList<Range<Token>>(), null);
+        assertFalse(scanner.hasNext());
+    }
+}


[2/2] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a69457c1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a69457c1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a69457c1

Branch: refs/heads/trunk
Commit: a69457c14a36c5eb3db523b3eb42b6d399993177
Parents: c651f83 9e846d9
Author: Marcus Eriksson <ma...@spotify.com>
Authored: Wed Aug 14 08:45:34 2013 +0200
Committer: Marcus Eriksson <ma...@spotify.com>
Committed: Wed Aug 14 08:45:34 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  12 +
 .../db/compaction/CompactionManager.java        | 178 ++++++++----
 .../cassandra/io/sstable/SSTableReader.java     |  18 +-
 .../cassandra/io/sstable/SSTableScanner.java    | 150 ++++++----
 .../org/apache/cassandra/db/RowCacheTest.java   |  37 ++-
 .../io/sstable/SSTableScannerTest.java          | 287 +++++++++++++++++++
 7 files changed, 563 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a69457c1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a69457c1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------