You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2014/11/13 19:07:42 UTC

[1/2] cassandra git commit: Fix ordering on SSTableScanner / SSTableReader close

Repository: cassandra
Updated Branches:
  refs/heads/trunk 4ff6e7dff -> 9c66482e1


Fix ordering on SSTableScanner / SSTableReader close

Patch by jmckenzie; reviewed by marcuse for CASSANDRA-8019


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3484181e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3484181e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3484181e

Branch: refs/heads/trunk
Commit: 3484181e2b68ce04acfe4e9028d5d1d76335094a
Parents: 5615a79
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Nov 13 11:56:34 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Nov 13 11:56:34 2014 -0600

----------------------------------------------------------------------
 .../cassandra/io/sstable/SSTableScanner.java    |   3 +
 .../io/sstable/SSTableRewriterTest.java         | 190 ++++++++++---------
 2 files changed, 106 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3484181e/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 62ac175..3f1f1f0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -63,6 +63,7 @@ public class SSTableScanner implements ICompactionScanner
     SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
     {
         assert sstable != null;
+        sstable.acquireReference();
 
         this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
@@ -92,6 +93,7 @@ public class SSTableScanner implements ICompactionScanner
     SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
     {
         assert sstable != null;
+        sstable.acquireReference();
 
         this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
@@ -159,6 +161,7 @@ public class SSTableScanner implements ICompactionScanner
     public void close() throws IOException
     {
         FileUtils.close(dfile, ifile);
+        sstable.releaseReference();
     }
 
     public long getLengthInBytes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3484181e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 4d248bd..8a494a6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -69,14 +69,16 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
         SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
-        AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
-        ICompactionScanner scanner = scanners.scanners.get(0);
-        CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
-        writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
-        while(scanner.hasNext())
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
         {
-            AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
-            writer.append(row);
+            ICompactionScanner scanner = scanners.scanners.get(0);
+            CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            while(scanner.hasNext())
+            {
+                AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+                writer.append(row);
+            }
         }
         cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
 
@@ -144,7 +146,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
 
     @Test
-    public void testNumberOfFilesAndSizes() throws InterruptedException
+    public void testNumberOfFilesAndSizes() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -158,20 +160,22 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
-                assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
-                assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
-
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
+                    assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
+
+                }
             }
         }
         List<SSTableReader> sstables = rewriter.finish();
@@ -191,7 +195,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testNumberOfFiles_dont_clean_readers() throws InterruptedException
+    public void testNumberOfFiles_dont_clean_readers() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -205,17 +209,19 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                }
             }
         }
         List<SSTableReader> sstables = rewriter.finish();
@@ -230,7 +236,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
 
     @Test
-    public void testNumberOfFiles_abort() throws InterruptedException
+    public void testNumberOfFiles_abort() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -246,17 +252,19 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                }
             }
         }
         rewriter.abort();
@@ -271,7 +279,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testNumberOfFiles_abort2() throws InterruptedException
+    public void testNumberOfFiles_abort2() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -287,23 +295,25 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
-            }
-            if (files == 3)
-            {
-                //testing to abort when we have nothing written in the new file
-                rewriter.abort();
-                break;
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                }
+                if (files == 3)
+                {
+                    //testing to abort when we have nothing written in the new file
+                    rewriter.abort();
+                    break;
+                }
             }
         }
         Thread.sleep(1000);
@@ -316,7 +326,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testNumberOfFiles_finish_empty_new_writer() throws InterruptedException
+    public void testNumberOfFiles_finish_empty_new_writer() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -330,24 +340,26 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
-            {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
-            }
-            if (files == 3)
+            while(scanner.hasNext())
             {
-                //testing to finish when we have nothing written in the new file
-                List<SSTableReader> sstables = rewriter.finish();
-                cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
-                break;
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                }
+                if (files == 3)
+                {
+                    //testing to finish when we have nothing written in the new file
+                    List<SSTableReader> sstables = rewriter.finish();
+                    cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+                    break;
+                }
             }
         }
         Thread.sleep(1000);
@@ -357,7 +369,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testNumberOfFiles_truncate() throws InterruptedException
+    public void testNumberOfFiles_truncate() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -371,17 +383,19 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            while(scanner.hasNext())
             {
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
-                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    files++;
+                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                }
             }
         }
         List<SSTableReader> sstables = rewriter.finish();
@@ -393,7 +407,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testSmallFiles() throws InterruptedException
+    public void testSmallFiles() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -408,18 +422,20 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        ICompactionScanner scanner = s.getScanner();
-        CompactionController controller = new CompactionController(cfs, compacting, 0);
         int files = 1;
-        while(scanner.hasNext())
+        try (ICompactionScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-            if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+            while(scanner.hasNext())
             {
-                assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
-                assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
-                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                files++;
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+                {
+                    assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
+                    assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    files++;
+                }
             }
         }
         List<SSTableReader> sstables = rewriter.finish();


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c66482e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c66482e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c66482e

Branch: refs/heads/trunk
Commit: 9c66482e16b2fe45605dae8132fec8aff5eda470
Parents: 4ff6e7d 3484181
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Nov 13 12:01:19 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Nov 13 12:01:19 2014 -0600

----------------------------------------------------------------------
 .../io/sstable/format/big/BigTableScanner.java  |   3 +
 .../io/sstable/SSTableRewriterTest.java         | 190 ++++++++++---------
 2 files changed, 106 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c66482e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index c1fc079,0000000..db55353
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -1,299 -1,0 +1,302 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.List;
 +
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
 +import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Bounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +public class BigTableScanner implements ICompactionScanner
 +{
 +    protected final RandomAccessReader dfile;
 +    protected final RandomAccessReader ifile;
 +    public final SSTableReader sstable;
 +
 +    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
 +    private AbstractBounds<RowPosition> currentRange;
 +
 +    private final DataRange dataRange;
 +    private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected Iterator<OnDiskAtomIterator> iterator;
 +
 +    /**
 +     * @param sstable SSTable to scan; must not be null
 +     * @param dataRange a single range to scan; must not be null
 +     * @param limiter background i/o RateLimiter; may be null
 +     */
 +    public BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
 +    {
 +        assert sstable != null;
++        sstable.acquireReference();
 +
 +        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
 +        this.ifile = sstable.openIndexReader();
 +        this.sstable = sstable;
 +        this.dataRange = dataRange;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
 +
 +        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
 +        if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum(sstable.partitioner))
 +        {
 +            // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
 +            // 2) the part that comes before the wrap-around
 +            boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey(), sstable.partitioner));
 +            boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound(), sstable.partitioner));
 +        }
 +        else
 +        {
 +            boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey(), sstable.partitioner));
 +        }
 +        this.rangeIterator = boundsList.iterator();
 +    }
 +
 +    /**
 +     * @param sstable SSTable to scan; must not be null
 +     * @param tokenRanges A set of token ranges to scan
 +     * @param limiter background i/o RateLimiter; may be null
 +     */
 +    public BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
 +    {
 +        assert sstable != null;
++        sstable.acquireReference();
 +
 +        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
 +        this.ifile = sstable.openIndexReader();
 +        this.sstable = sstable;
 +        this.dataRange = null;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
 +
 +        List<Range<Token>> normalized = Range.normalize(tokenRanges);
 +        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
 +        for (Range<Token> range : normalized)
 +            boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner),
 +                                                  range.right.maxKeyBound(sstable.partitioner),
 +                                                  sstable.partitioner));
 +
 +        this.rangeIterator = boundsList.iterator();
 +    }
 +
 +    private void seekToCurrentRangeStart()
 +    {
 +        if (currentRange.left.isMinimum(sstable.partitioner))
 +            return;
 +
 +        long indexPosition = sstable.getIndexScanPosition(currentRange.left);
 +        // -1 means the key is before everything in the sstable. So just start from the beginning.
 +        if (indexPosition == -1)
 +        {
 +            // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and
 +            // the seeks are no-op anyway if we are.
 +            ifile.seek(0);
 +            dfile.seek(0);
 +            return;
 +        }
 +
 +        ifile.seek(indexPosition);
 +        try
 +        {
 +
 +            while (!ifile.isEOF())
 +            {
 +                indexPosition = ifile.getFilePointer();
 +                DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                int comparison = indexDecoratedKey.compareTo(currentRange.left);
 +                // because our range start may be inclusive or exclusive, we need to also contains()
 +                // instead of just checking (comparison >= 0)
 +                if (comparison > 0 || currentRange.contains(indexDecoratedKey))
 +                {
 +                    // Found, just read the dataPosition and seek into index and data files
 +                    long dataPosition = ifile.readLong();
 +                    ifile.seek(indexPosition);
 +                    dfile.seek(dataPosition);
 +                    break;
 +                }
 +                else
 +                {
 +                    RowIndexEntry.Serializer.skip(ifile);
 +                }
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, sstable.getFilename());
 +        }
 +    }
 +
 +    public void close() throws IOException
 +    {
 +        FileUtils.close(dfile, ifile);
++        sstable.releaseReference();
 +    }
 +
 +    public long getLengthInBytes()
 +    {
 +        return dfile.length();
 +    }
 +
 +    public long getCurrentPosition()
 +    {
 +        return dfile.getFilePointer();
 +    }
 +
 +    public String getBackingFiles()
 +    {
 +        return sstable.toString();
 +    }
 +
 +    public boolean hasNext()
 +    {
 +        if (iterator == null)
 +            iterator = createIterator();
 +        return iterator.hasNext();
 +    }
 +
 +    public OnDiskAtomIterator next()
 +    {
 +        if (iterator == null)
 +            iterator = createIterator();
 +        return iterator.next();
 +    }
 +
 +    public void remove()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    private Iterator<OnDiskAtomIterator> createIterator()
 +    {
 +        return new KeyScanningIterator();
 +    }
 +
 +    protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
 +    {
 +        private DecoratedKey nextKey;
 +        private RowIndexEntry nextEntry;
 +        private DecoratedKey currentKey;
 +        private RowIndexEntry currentEntry;
 +
 +        protected OnDiskAtomIterator computeNext()
 +        {
 +            try
 +            {
 +                if (nextEntry == null)
 +                {
 +                    do
 +                    {
 +                        // we're starting the first range or we just passed the end of the previous range
 +                        if (!rangeIterator.hasNext())
 +                            return endOfData();
 +
 +                        currentRange = rangeIterator.next();
 +                        seekToCurrentRangeStart();
 +
 +                        if (ifile.isEOF())
 +                            return endOfData();
 +
 +                        currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                        currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
 +                    } while (!currentRange.contains(currentKey));
 +                }
 +                else
 +                {
 +                    // we're in the middle of a range
 +                    currentKey = nextKey;
 +                    currentEntry = nextEntry;
 +                }
 +
 +                long readEnd;
 +                if (ifile.isEOF())
 +                {
 +                    nextEntry = null;
 +                    nextKey = null;
 +                    readEnd = dfile.length();
 +                }
 +                else
 +                {
 +                    // we need the position of the start of the next key, regardless of whether it falls in the current range
 +                    nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                    nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
 +                    readEnd = nextEntry.position;
 +
 +                    if (!currentRange.contains(nextKey))
 +                    {
 +                        nextKey = null;
 +                        nextEntry = null;
 +                    }
 +                }
 +
 +                if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
 +                {
 +                    dfile.seek(currentEntry.position + currentEntry.headerOffset());
 +                    ByteBufferUtil.readWithShortLength(dfile); // key
 +                    return new SSTableIdentityIterator(sstable, dfile, currentKey);
 +                }
 +
 +                return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
 +                {
 +                    public OnDiskAtomIterator create()
 +                    {
 +                        return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
 +                    }
 +                });
 +
 +            }
 +            catch (IOException e)
 +            {
 +                sstable.markSuspect();
 +                throw new CorruptSSTableException(e, sstable.getFilename());
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return getClass().getSimpleName() + "(" +
 +               "dfile=" + dfile +
 +               " ifile=" + ifile +
 +               " sstable=" + sstable +
 +               ")";
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c66482e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------