You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/01/07 21:13:06 UTC

[1/2] cassandra git commit: Remove ref counting in SSTableScanner, fix CompactionTask ordering

Repository: cassandra
Updated Branches:
  refs/heads/trunk 493859bf6 -> 65ffc39b1


Remove ref counting in SSTableScanner, fix CompactionTask ordering

Patch by jmckenzie; reviewed by belliottsmith as a follow-up for CASSANDRA-8399


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdbb071f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdbb071f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdbb071f

Branch: refs/heads/trunk
Commit: bdbb071f4f87131d6996aac52f2b75a5833d5238
Parents: ddca610
Author: Joshua McKenzie <jm...@apache.org>
Authored: Wed Jan 7 13:05:31 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Jan 7 14:05:40 2015 -0600

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionTask.java | 82 ++++++++++----------
 .../cassandra/io/sstable/SSTableScanner.java    |  8 +-
 2 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4885bc8..d215b4c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -140,7 +140,6 @@ public class CompactionTask extends AbstractCompactionTask
 
         try (CompactionController controller = getCompactionController(sstables);)
         {
-
             Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
 
             long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
@@ -149,11 +148,16 @@ public class CompactionTask extends AbstractCompactionTask
             long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
             logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
+            List<SSTableReader> newSStables;
+            AbstractCompactionIterable ci;
+
+            // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
+            // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
+            // See CASSANDRA-8019 and CASSANDRA-8399
             try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
             {
-                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+                ci = new CompactionIterable(compactionType, scanners.scanners, controller);
                 Iterator<AbstractCompactedRow> iter = ci.iterator();
-                List<SSTableReader> newSStables;
                 // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
                 // replace the old entries.  Track entries to preheat here until then.
                 long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@ -215,44 +219,44 @@ public class CompactionTask extends AbstractCompactionTask
                     if (collector != null)
                         collector.finishCompaction(ci);
                 }
+            }
 
-                Collection<SSTableReader> oldSStables = this.sstables;
-                if (!offline)
-                    cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
-                // log a bunch of statistics about the result and save to system table compaction_history
-                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                long startsize = SSTableReader.getTotalBytes(oldSStables);
-                long endsize = SSTableReader.getTotalBytes(newSStables);
-                double ratio = (double) endsize / (double) startsize;
-
-                StringBuilder newSSTableNames = new StringBuilder();
-                for (SSTableReader reader : newSStables)
-                    newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
-                double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-                long totalSourceRows = 0;
-                long[] counts = ci.getMergedRowCounts();
-                StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-                Map<Integer, Long> mergedRows = new HashMap<>();
-                for (int i = 0; i < counts.length; i++)
-                {
-                    long count = counts[i];
-                    if (count == 0)
-                        continue;
-
-                    int rows = i + 1;
-                    totalSourceRows += rows * count;
-                    mergeSummary.append(String.format("%d:%d, ", rows, count));
-                    mergedRows.put(rows, count);
-                }
-
-                SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-                logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                          oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-                logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-                logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
+            Collection<SSTableReader> oldSStables = this.sstables;
+            if (!offline)
+                cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+            // log a bunch of statistics about the result and save to system table compaction_history
+            long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            long startsize = SSTableReader.getTotalBytes(oldSStables);
+            long endsize = SSTableReader.getTotalBytes(newSStables);
+            double ratio = (double) endsize / (double) startsize;
+
+            StringBuilder newSSTableNames = new StringBuilder();
+            for (SSTableReader reader : newSStables)
+                newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+            double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+            long totalSourceRows = 0;
+            long[] counts = ci.getMergedRowCounts();
+            StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+            Map<Integer, Long> mergedRows = new HashMap<>();
+            for (int i = 0; i < counts.length; i++)
+            {
+                long count = counts[i];
+                if (count == 0)
+                    continue;
+
+                int rows = i + 1;
+                totalSourceRows += rows * count;
+                mergeSummary.append(String.format("%d:%d, ", rows, count));
+                mergedRows.put(rows, count);
             }
+
+            SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+            logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
+                                      oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+            logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+            logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 5499195..dc065af 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -52,18 +52,15 @@ public class SSTableScanner implements ISSTableScanner
 
     protected Iterator<OnDiskAtomIterator> iterator;
 
-    // We can race with the sstable for deletion during compaction.  If it's been ref counted to 0, skip
     public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
     {
-        return sstable.acquireReference()
-                ? new SSTableScanner(sstable, dataRange, limiter)
-                : new SSTableScanner.EmptySSTableScanner(sstable.getFilename());
+        return new SSTableScanner(sstable, dataRange, limiter);
     }
     public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
     {
         // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
         List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
-        if (positions.isEmpty() || !sstable.acquireReference())
+        if (positions.isEmpty())
             return new EmptySSTableScanner(sstable.getFilename());
 
         return new SSTableScanner(sstable, tokenRanges, limiter);
@@ -173,7 +170,6 @@ public class SSTableScanner implements ISSTableScanner
     public void close() throws IOException
     {
         FileUtils.close(dfile, ifile);
-        sstable.releaseReference();
     }
 
     public long getLengthInBytes()


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionTask.java
	src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65ffc39b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65ffc39b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65ffc39b

Branch: refs/heads/trunk
Commit: 65ffc39b16c83c6319498874b5f0c56149371aef
Parents: 493859b bdbb071
Author: Joshua McKenzie <jm...@apache.org>
Authored: Wed Jan 7 14:11:04 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Jan 7 14:11:04 2015 -0600

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionTask.java | 82 ++++++++++----------
 .../io/sstable/format/big/BigTableScanner.java  |  8 +-
 2 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65ffc39b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 2543f47,d215b4c..8b5058b
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -157,11 -148,16 +156,16 @@@ public class CompactionTask extends Abs
              long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
              logger.debug("Expected bloom filter size : {}", keysPerSSTable);
  
+             List<SSTableReader> newSStables;
+             AbstractCompactionIterable ci;
+ 
+             // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
+             // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
+             // See CASSANDRA-8019 and CASSANDRA-8399
              try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
              {
-                 AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
 -                ci = new CompactionIterable(compactionType, scanners.scanners, controller);
++                ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
                  Iterator<AbstractCompactedRow> iter = ci.iterator();
-                 List<SSTableReader> newSStables;
                  // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
                  // replace the old entries.  Track entries to preheat here until then.
                  long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@@ -223,44 -219,44 +227,44 @@@
                      if (collector != null)
                          collector.finishCompaction(ci);
                  }
+             }
  
-                 Collection<SSTableReader> oldSStables = this.sstables;
-                 if (!offline)
-                     cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
- 
-                 // log a bunch of statistics about the result and save to system table compaction_history
-                 long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                 long startsize = SSTableReader.getTotalBytes(oldSStables);
-                 long endsize = SSTableReader.getTotalBytes(newSStables);
-                 double ratio = (double) endsize / (double) startsize;
- 
-                 StringBuilder newSSTableNames = new StringBuilder();
-                 for (SSTableReader reader : newSStables)
-                     newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
- 
-                 double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-                 long totalSourceRows = 0;
-                 long[] counts = ci.getMergedRowCounts();
-                 StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-                 Map<Integer, Long> mergedRows = new HashMap<>();
-                 for (int i = 0; i < counts.length; i++)
-                 {
-                     long count = counts[i];
-                     if (count == 0)
-                         continue;
- 
-                     int rows = i + 1;
-                     totalSourceRows += rows * count;
-                     mergeSummary.append(String.format("%d:%d, ", rows, count));
-                     mergedRows.put(rows, count);
-                 }
- 
-                 SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-                 logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                           taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-                 logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-                 logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
+             Collection<SSTableReader> oldSStables = this.sstables;
+             if (!offline)
+                 cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+ 
+             // log a bunch of statistics about the result and save to system table compaction_history
+             long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+             long startsize = SSTableReader.getTotalBytes(oldSStables);
+             long endsize = SSTableReader.getTotalBytes(newSStables);
+             double ratio = (double) endsize / (double) startsize;
+ 
+             StringBuilder newSSTableNames = new StringBuilder();
+             for (SSTableReader reader : newSStables)
+                 newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+ 
+             double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+             long totalSourceRows = 0;
+             long[] counts = ci.getMergedRowCounts();
+             StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+             Map<Integer, Long> mergedRows = new HashMap<>();
+             for (int i = 0; i < counts.length; i++)
+             {
+                 long count = counts[i];
+                 if (count == 0)
+                     continue;
+ 
+                 int rows = i + 1;
+                 totalSourceRows += rows * count;
+                 mergeSummary.append(String.format("%d:%d, ", rows, count));
+                 mergedRows.put(rows, count);
              }
+ 
+             SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 -            logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 -                                      oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
++            logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
++                                      taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+             logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+             logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65ffc39b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 73a5d76,0000000..85bc37d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -1,354 -1,0 +1,350 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.IOException;
 +import java.util.*;
 +
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
 +import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Bounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Pair;
 +
 +public class BigTableScanner implements ISSTableScanner
 +{
 +    protected final RandomAccessReader dfile;
 +    protected final RandomAccessReader ifile;
 +    public final SSTableReader sstable;
 +
 +    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
 +    private AbstractBounds<RowPosition> currentRange;
 +
 +    private final DataRange dataRange;
 +    private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected Iterator<OnDiskAtomIterator> iterator;
 +
-     // We can race with the sstable for deletion during compaction.  If it's been ref counted to 0, skip
 +    public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
 +    {
-         return sstable.acquireReference()
-                 ? new BigTableScanner(sstable, dataRange, limiter)
-                 : new BigTableScanner.EmptySSTableScanner(sstable.getFilename());
++        return new BigTableScanner(sstable, dataRange, limiter);
 +    }
 +    public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
 +    {
 +        // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
 +        List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
-         if (positions.isEmpty() || !sstable.acquireReference())
++        if (positions.isEmpty())
 +            return new EmptySSTableScanner(sstable.getFilename());
 +
 +        return new BigTableScanner(sstable, tokenRanges, limiter);
 +    }
 +
 +    /**
 +     * @param sstable SSTable to scan; must not be null
 +     * @param dataRange a single range to scan; must not be null
 +     * @param limiter background i/o RateLimiter; may be null
 +     */
 +    private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
 +    {
 +        assert sstable != null;
 +
 +        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
 +        this.ifile = sstable.openIndexReader();
 +        this.sstable = sstable;
 +        this.dataRange = dataRange;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
 +
 +        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
 +        if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum())
 +        {
 +            // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
 +            // 2) the part that comes before the wrap-around
 +            boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey()));
 +            boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound()));
 +        }
 +        else
 +        {
 +            boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey()));
 +        }
 +        this.rangeIterator = boundsList.iterator();
 +    }
 +
 +    /**
 +     * @param sstable SSTable to scan; must not be null
 +     * @param tokenRanges A set of token ranges to scan
 +     * @param limiter background i/o RateLimiter; may be null
 +     */
 +    private BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
 +    {
 +        assert sstable != null;
 +
 +        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
 +        this.ifile = sstable.openIndexReader();
 +        this.sstable = sstable;
 +        this.dataRange = null;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
 +
 +        List<Range<Token>> normalized = Range.normalize(tokenRanges);
 +        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
 +        for (Range<Token> range : normalized)
 +            boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(), range.right.maxKeyBound()));
 +
 +        this.rangeIterator = boundsList.iterator();
 +    }
 +
 +    private void seekToCurrentRangeStart()
 +    {
 +        if (currentRange.left.isMinimum())
 +            return;
 +
 +        long indexPosition = sstable.getIndexScanPosition(currentRange.left);
 +        // -1 means the key is before everything in the sstable. So just start from the beginning.
 +        if (indexPosition == -1)
 +        {
 +            // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and
 +            // the seeks are no-op anyway if we are.
 +            ifile.seek(0);
 +            dfile.seek(0);
 +            return;
 +        }
 +
 +        ifile.seek(indexPosition);
 +        try
 +        {
 +
 +            while (!ifile.isEOF())
 +            {
 +                indexPosition = ifile.getFilePointer();
 +                DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                int comparison = indexDecoratedKey.compareTo(currentRange.left);
 +                // because our range start may be inclusive or exclusive, we need to also contains()
 +                // instead of just checking (comparison >= 0)
 +                if (comparison > 0 || currentRange.contains(indexDecoratedKey))
 +                {
 +                    // Found, just read the dataPosition and seek into index and data files
 +                    long dataPosition = ifile.readLong();
 +                    ifile.seek(indexPosition);
 +                    dfile.seek(dataPosition);
 +                    break;
 +                }
 +                else
 +                {
 +                    RowIndexEntry.Serializer.skip(ifile);
 +                }
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, sstable.getFilename());
 +        }
 +    }
 +
 +    public void close() throws IOException
 +    {
 +        FileUtils.close(dfile, ifile);
-         sstable.releaseReference();
 +    }
 +
 +    public long getLengthInBytes()
 +    {
 +        return dfile.length();
 +    }
 +
 +    public long getCurrentPosition()
 +    {
 +        return dfile.getFilePointer();
 +    }
 +
 +    public String getBackingFiles()
 +    {
 +        return sstable.toString();
 +    }
 +
 +    public boolean hasNext()
 +    {
 +        if (iterator == null)
 +            iterator = createIterator();
 +        return iterator.hasNext();
 +    }
 +
 +    public OnDiskAtomIterator next()
 +    {
 +        if (iterator == null)
 +            iterator = createIterator();
 +        return iterator.next();
 +    }
 +
 +    public void remove()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    private Iterator<OnDiskAtomIterator> createIterator()
 +    {
 +        return new KeyScanningIterator();
 +    }
 +
 +    protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
 +    {
 +        private DecoratedKey nextKey;
 +        private RowIndexEntry nextEntry;
 +        private DecoratedKey currentKey;
 +        private RowIndexEntry currentEntry;
 +
 +        protected OnDiskAtomIterator computeNext()
 +        {
 +            try
 +            {
 +                if (nextEntry == null)
 +                {
 +                    do
 +                    {
 +                        // we're starting the first range or we just passed the end of the previous range
 +                        if (!rangeIterator.hasNext())
 +                            return endOfData();
 +
 +                        currentRange = rangeIterator.next();
 +                        seekToCurrentRangeStart();
 +
 +                        if (ifile.isEOF())
 +                            return endOfData();
 +
 +                        currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                        currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
 +                    } while (!currentRange.contains(currentKey));
 +                }
 +                else
 +                {
 +                    // we're in the middle of a range
 +                    currentKey = nextKey;
 +                    currentEntry = nextEntry;
 +                }
 +
 +                long readEnd;
 +                if (ifile.isEOF())
 +                {
 +                    nextEntry = null;
 +                    nextKey = null;
 +                    readEnd = dfile.length();
 +                }
 +                else
 +                {
 +                    // we need the position of the start of the next key, regardless of whether it falls in the current range
 +                    nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                    nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
 +                    readEnd = nextEntry.position;
 +
 +                    if (!currentRange.contains(nextKey))
 +                    {
 +                        nextKey = null;
 +                        nextEntry = null;
 +                    }
 +                }
 +
 +                if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
 +                {
 +                    dfile.seek(currentEntry.position + currentEntry.headerOffset());
 +                    ByteBufferUtil.readWithShortLength(dfile); // key
 +                    return new SSTableIdentityIterator(sstable, dfile, currentKey);
 +                }
 +
 +                return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
 +                {
 +                    public OnDiskAtomIterator create()
 +                    {
 +                        return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
 +                    }
 +                });
 +
 +            }
 +            catch (IOException e)
 +            {
 +                sstable.markSuspect();
 +                throw new CorruptSSTableException(e, sstable.getFilename());
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return getClass().getSimpleName() + "(" +
 +               "dfile=" + dfile +
 +               " ifile=" + ifile +
 +               " sstable=" + sstable +
 +               ")";
 +    }
 +
 +    public static class EmptySSTableScanner implements ISSTableScanner
 +    {
 +        private final String filename;
 +
 +        public EmptySSTableScanner(String filename)
 +        {
 +            this.filename = filename;
 +        }
 +
 +        public long getLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        public long getCurrentPosition()
 +        {
 +            return 0;
 +        }
 +
 +        public String getBackingFiles()
 +        {
 +            return filename;
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            return false;
 +        }
 +
 +        public OnDiskAtomIterator next()
 +        {
 +            return null;
 +        }
 +
 +        public void close() throws IOException { }
 +
 +        public void remove() { }
 +    }
 +
 +
 +}