You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2014/12/16 22:11:40 UTC

[2/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 2488f86,0000000..fc346d1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@@ -1,256 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.Descriptor;
++import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Pair;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
 + */
 +public class BigTableReader extends SSTableReader
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
 +
 +    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns)
 +    {
 +        return new SSTableNamesIterator(this, key, columns);
 +    }
 +
 +    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry )
 +    {
 +        return new SSTableNamesIterator(this, input, key, columns, indexEntry);
 +    }
 +
 +    public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse)
 +    {
 +        return new SSTableSliceIterator(this, key, slices, reverse);
 +    }
 +
 +    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry)
 +    {
 +        return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry);
 +    }
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter)
++    public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter)
 +    {
-         return new BigTableScanner(this, dataRange, limiter);
++        return BigTableScanner.getScanner(this, dataRange, limiter);
 +    }
 +
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
++    public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
 +    {
-         // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
-         List<Pair<Long, Long>> positions = getPositionsForRanges(Range.normalize(ranges));
-         if (positions.isEmpty())
-             return new EmptyCompactionScanner(getFilename());
-         else
-             return new BigTableScanner(this, ranges, limiter);
++        return BigTableScanner.getScanner(this, ranges, limiter);
 +    }
 +
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats)
 +    {
 +        // first, check bloom filter
 +        if (op == Operator.EQ)
 +        {
 +            assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key
 +            if (!bf.isPresent(((DecoratedKey)key).getKey()))
 +            {
 +                Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation);
 +                return null;
 +            }
 +        }
 +
 +        // next, the key cache (only make sense for valid row key)
 +        if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
 +        {
 +            DecoratedKey decoratedKey = (DecoratedKey)key;
 +            KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey());
 +            RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
 +            if (cachedPosition != null)
 +            {
 +                Tracing.trace("Key cache hit for sstable {}", descriptor.generation);
 +                return cachedPosition;
 +            }
 +        }
 +
 +        // check the smallest and greatest keys in the sstable to see if it can't be present
 +        if (first.compareTo(key) > 0 || last.compareTo(key) < 0)
 +        {
 +            if (op == Operator.EQ && updateCacheAndStats)
 +                bloomFilterTracker.addFalsePositive();
 +
 +            if (op.apply(1) < 0)
 +            {
 +                Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
 +                return null;
 +            }
 +        }
 +
 +        int binarySearchResult = indexSummary.binarySearch(key);
 +        long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
 +        int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
 +
 +        // if we matched the -1th position, we'll start at the first position
 +        sampledPosition = sampledPosition == -1 ? 0 : sampledPosition;
 +
 +        int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
 +
 +        // scan the on-disk index, starting at the nearest sampled position.
 +        // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present
 +        // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the
 +        // next index position because the searched key can be greater the last key of the index interval checked if it
 +        // is lesser than the first key of next interval (and in that case we must return the position of the first key
 +        // of the next interval).
 +        int i = 0;
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext() && i <= effectiveInterval)
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF() && i <= effectiveInterval)
 +                {
 +                    i++;
 +
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +
 +                    boolean opSatisfied; // did we find an appropriate position for the op requested
 +                    boolean exactMatch; // is the current position an exact match for the key, suitable for caching
 +
 +                    // Compare raw keys if possible for performance, otherwise compare decorated keys.
 +                    if (op == Operator.EQ)
 +                    {
 +                        opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
 +                    }
 +                    else
 +                    {
 +                        DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                        int comparison = indexDecoratedKey.compareTo(key);
 +                        int v = op.apply(comparison);
 +                        opSatisfied = (v == 0);
 +                        exactMatch = (comparison == 0);
 +                        if (v < 0)
 +                        {
 +                            Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
 +                            return null;
 +                        }
 +                    }
 +
 +                    if (opSatisfied)
 +                    {
 +                        // read data position from index entry
 +                        RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version);
 +                        if (exactMatch && updateCacheAndStats)
 +                        {
 +                            assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
 +                            DecoratedKey decoratedKey = (DecoratedKey)key;
 +
 +                            if (logger.isTraceEnabled())
 +                            {
 +                                // expensive sanity check!  see CASSANDRA-4687
 +                                FileDataInput fdi = dfile.getSegment(indexEntry.position);
 +                                DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
 +                                if (!keyInDisk.equals(key))
 +                                    throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
 +                                fdi.close();
 +                            }
 +
 +                            // store exact match for the key
 +                            cacheKey(decoratedKey, indexEntry);
 +                        }
 +                        if (op == Operator.EQ && updateCacheAndStats)
 +                            bloomFilterTracker.addTruePositive();
 +                        Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
 +                        return indexEntry;
 +                    }
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        if (op == SSTableReader.Operator.EQ && updateCacheAndStats)
 +            bloomFilterTracker.addFalsePositive();
 +        Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation);
 +        return null;
 +    }
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 7e3c877,0000000..73a5d76
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -1,300 -1,0 +1,354 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Iterator;
- import java.util.List;
++import java.util.*;
 +
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
 +import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Bounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
++import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.Pair;
 +
- public class BigTableScanner implements ICompactionScanner
++public class BigTableScanner implements ISSTableScanner
 +{
 +    protected final RandomAccessReader dfile;
 +    protected final RandomAccessReader ifile;
 +    public final SSTableReader sstable;
 +
 +    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
 +    private AbstractBounds<RowPosition> currentRange;
 +
 +    private final DataRange dataRange;
 +    private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected Iterator<OnDiskAtomIterator> iterator;
 +
++    // We can race with the sstable for deletion during compaction.  If it's been ref counted to 0, skip
++    public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
++    {
++        return sstable.acquireReference()
++                ? new BigTableScanner(sstable, dataRange, limiter)
++                : new BigTableScanner.EmptySSTableScanner(sstable.getFilename());
++    }
++    public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
++    {
++        // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
++        List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
++        if (positions.isEmpty() || !sstable.acquireReference())
++            return new EmptySSTableScanner(sstable.getFilename());
++
++        return new BigTableScanner(sstable, tokenRanges, limiter);
++    }
++
 +    /**
 +     * @param sstable SSTable to scan; must not be null
 +     * @param dataRange a single range to scan; must not be null
 +     * @param limiter background i/o RateLimiter; may be null
 +     */
-     public BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
++    private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
 +    {
 +        assert sstable != null;
-         sstable.acquireReference();
 +
 +        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
 +        this.ifile = sstable.openIndexReader();
 +        this.sstable = sstable;
 +        this.dataRange = dataRange;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
 +
 +        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
 +        if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum())
 +        {
 +            // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
 +            // 2) the part that comes before the wrap-around
 +            boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey()));
 +            boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound()));
 +        }
 +        else
 +        {
 +            boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey()));
 +        }
 +        this.rangeIterator = boundsList.iterator();
 +    }
 +
 +    /**
 +     * @param sstable SSTable to scan; must not be null
 +     * @param tokenRanges A set of token ranges to scan
 +     * @param limiter background i/o RateLimiter; may be null
 +     */
-     public BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
++    private BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
 +    {
 +        assert sstable != null;
-         sstable.acquireReference();
 +
 +        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
 +        this.ifile = sstable.openIndexReader();
 +        this.sstable = sstable;
 +        this.dataRange = null;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
 +
 +        List<Range<Token>> normalized = Range.normalize(tokenRanges);
 +        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
 +        for (Range<Token> range : normalized)
 +            boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(), range.right.maxKeyBound()));
 +
 +        this.rangeIterator = boundsList.iterator();
 +    }
 +
 +    private void seekToCurrentRangeStart()
 +    {
 +        if (currentRange.left.isMinimum())
 +            return;
 +
 +        long indexPosition = sstable.getIndexScanPosition(currentRange.left);
 +        // -1 means the key is before everything in the sstable. So just start from the beginning.
 +        if (indexPosition == -1)
 +        {
 +            // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and
 +            // the seeks are no-op anyway if we are.
 +            ifile.seek(0);
 +            dfile.seek(0);
 +            return;
 +        }
 +
 +        ifile.seek(indexPosition);
 +        try
 +        {
 +
 +            while (!ifile.isEOF())
 +            {
 +                indexPosition = ifile.getFilePointer();
 +                DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                int comparison = indexDecoratedKey.compareTo(currentRange.left);
 +                // because our range start may be inclusive or exclusive, we need to also contains()
 +                // instead of just checking (comparison >= 0)
 +                if (comparison > 0 || currentRange.contains(indexDecoratedKey))
 +                {
 +                    // Found, just read the dataPosition and seek into index and data files
 +                    long dataPosition = ifile.readLong();
 +                    ifile.seek(indexPosition);
 +                    dfile.seek(dataPosition);
 +                    break;
 +                }
 +                else
 +                {
 +                    RowIndexEntry.Serializer.skip(ifile);
 +                }
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, sstable.getFilename());
 +        }
 +    }
 +
 +    public void close() throws IOException
 +    {
 +        FileUtils.close(dfile, ifile);
 +        sstable.releaseReference();
 +    }
 +
 +    public long getLengthInBytes()
 +    {
 +        return dfile.length();
 +    }
 +
 +    public long getCurrentPosition()
 +    {
 +        return dfile.getFilePointer();
 +    }
 +
 +    public String getBackingFiles()
 +    {
 +        return sstable.toString();
 +    }
 +
 +    public boolean hasNext()
 +    {
 +        if (iterator == null)
 +            iterator = createIterator();
 +        return iterator.hasNext();
 +    }
 +
 +    public OnDiskAtomIterator next()
 +    {
 +        if (iterator == null)
 +            iterator = createIterator();
 +        return iterator.next();
 +    }
 +
 +    public void remove()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    private Iterator<OnDiskAtomIterator> createIterator()
 +    {
 +        return new KeyScanningIterator();
 +    }
 +
 +    protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
 +    {
 +        private DecoratedKey nextKey;
 +        private RowIndexEntry nextEntry;
 +        private DecoratedKey currentKey;
 +        private RowIndexEntry currentEntry;
 +
 +        protected OnDiskAtomIterator computeNext()
 +        {
 +            try
 +            {
 +                if (nextEntry == null)
 +                {
 +                    do
 +                    {
 +                        // we're starting the first range or we just passed the end of the previous range
 +                        if (!rangeIterator.hasNext())
 +                            return endOfData();
 +
 +                        currentRange = rangeIterator.next();
 +                        seekToCurrentRangeStart();
 +
 +                        if (ifile.isEOF())
 +                            return endOfData();
 +
 +                        currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                        currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
 +                    } while (!currentRange.contains(currentKey));
 +                }
 +                else
 +                {
 +                    // we're in the middle of a range
 +                    currentKey = nextKey;
 +                    currentEntry = nextEntry;
 +                }
 +
 +                long readEnd;
 +                if (ifile.isEOF())
 +                {
 +                    nextEntry = null;
 +                    nextKey = null;
 +                    readEnd = dfile.length();
 +                }
 +                else
 +                {
 +                    // we need the position of the start of the next key, regardless of whether it falls in the current range
 +                    nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                    nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
 +                    readEnd = nextEntry.position;
 +
 +                    if (!currentRange.contains(nextKey))
 +                    {
 +                        nextKey = null;
 +                        nextEntry = null;
 +                    }
 +                }
 +
 +                if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
 +                {
 +                    dfile.seek(currentEntry.position + currentEntry.headerOffset());
 +                    ByteBufferUtil.readWithShortLength(dfile); // key
 +                    return new SSTableIdentityIterator(sstable, dfile, currentKey);
 +                }
 +
 +                return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
 +                {
 +                    public OnDiskAtomIterator create()
 +                    {
 +                        return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
 +                    }
 +                });
 +
 +            }
 +            catch (IOException e)
 +            {
 +                sstable.markSuspect();
 +                throw new CorruptSSTableException(e, sstable.getFilename());
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return getClass().getSimpleName() + "(" +
 +               "dfile=" + dfile +
 +               " ifile=" + ifile +
 +               " sstable=" + sstable +
 +               ")";
 +    }
++
++    public static class EmptySSTableScanner implements ISSTableScanner
++    {
++        private final String filename;
++
++        public EmptySSTableScanner(String filename)
++        {
++            this.filename = filename;
++        }
++
++        public long getLengthInBytes()
++        {
++            return 0;
++        }
++
++        public long getCurrentPosition()
++        {
++            return 0;
++        }
++
++        public String getBackingFiles()
++        {
++            return filename;
++        }
++
++        public boolean hasNext()
++        {
++            return false;
++        }
++
++        public OnDiskAtomIterator next()
++        {
++            return null;
++        }
++
++        public void close() throws IOException { }
++
++        public void remove() { }
++    }
++
++
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableExport.java
index 3a04a81,22aebdb..fa6b973
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@@ -22,8 -22,6 +22,7 @@@ import java.io.IOException
  import java.io.PrintStream;
  import java.util.*;
  
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.commons.cli.*;
  
  import org.apache.cassandra.config.CFMetaData;
@@@ -320,10 -318,10 +319,10 @@@ public class SSTableExpor
          Set<String> excludeSet = new HashSet<String>();
  
          if (excludes != null)
 -            excludeSet = new HashSet<String>(Arrays.asList(excludes));
 +            excludeSet = new HashSet<>(Arrays.asList(excludes));
  
          SSTableIdentityIterator row;
-         ICompactionScanner scanner = reader.getScanner();
+         ISSTableScanner scanner = reader.getScanner();
          try
          {
              outs.println("[");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 2396acb,a09d8b4..63a49de
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -46,13 -37,15 +46,13 @@@ import org.apache.cassandra.db.ColumnFa
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.Mutation;
 -import org.apache.cassandra.dht.BytesToken;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+ import org.apache.cassandra.io.sstable.*;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.junit.After;
 -import org.junit.Test;
  
  import com.google.common.collect.Iterables;
  
@@@ -207,38 -150,14 +207,38 @@@ public class AntiCompactionTes
          List<Range<Token>> ranges = Arrays.asList(range);
  
          SSTableReader.acquireReferences(sstables);
 -        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
 -
 -        assertThat(store.getSSTables().size(), is(1));
 -        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 -        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
 -        assertThat(store.getDataTracker().getCompacting().size(), is(0));
 +        long repairedAt = 1000;
 +        CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
 +        /*
 +        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
 +        so there will be no net change in the number of sstables
 +         */
 +        assertEquals(10, store.getSSTables().size());
 +        int repairedKeys = 0;
 +        int nonRepairedKeys = 0;
 +        for (SSTableReader sstable : store.getSSTables())
 +        {
-             ICompactionScanner scanner = sstable.getScanner();
++            ISSTableScanner scanner = sstable.getScanner();
 +            while (scanner.hasNext())
 +            {
 +                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 +                if (sstable.isRepaired())
 +                {
 +                    assertTrue(range.contains(row.getKey().getToken()));
 +                    assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
 +                    repairedKeys++;
 +                }
 +                else
 +                {
 +                    assertFalse(range.contains(row.getKey().getToken()));
 +                    assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
 +                    nonRepairedKeys++;
 +                }
 +            }
 +        }
 +        assertEquals(repairedKeys, 40);
 +        assertEquals(nonRepairedKeys, 60);
      }
 -
      @Test
      public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index faf3808,4659b5c..a6ee3f9
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -189,11 -158,11 +189,11 @@@ public class CompactionsTes
          // check that the shadowed column is gone
          SSTableReader sstable = cfs.getSSTables().iterator().next();
          Range keyRange = new Range<RowPosition>(key, sstable.partitioner.getMinimumToken().maxKeyBound());
-         ICompactionScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange));
+         ISSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange));
          OnDiskAtomIterator iter = scanner.next();
          assertEquals(key, iter.getKey());
 -        assert iter.next() instanceof RangeTombstone;
 -        assert !iter.hasNext();
 +        assertTrue(iter.next() instanceof RangeTombstone);
 +        assertFalse(iter.hasNext());
      }
  
      @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index d49d8bb,4c2236b..1eca4e6
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -36,15 -31,11 +36,16 @@@ import org.junit.runner.RunWith
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.db.*;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.notifications.SSTableAddedNotification;
  import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
  import org.apache.cassandra.repair.RepairJobDesc;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index 924c4b5,678601b..c56c910
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@@ -29,11 -27,10 +29,12 @@@ import org.junit.runner.RunWith
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
  import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 2a0c3e6,03b5553..647a67b
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@@ -59,10 -56,9 +59,9 @@@ import org.apache.cassandra.db.Row
  import org.apache.cassandra.db.RowPosition;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
  import org.apache.cassandra.db.compaction.CompactionManager;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
  import org.apache.cassandra.db.composites.Composites;
  import org.apache.cassandra.dht.LocalPartitioner;
 -import org.apache.cassandra.dht.LocalToken;
 +import org.apache.cassandra.dht.LocalPartitioner.LocalToken;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.io.util.FileDataInput;
@@@ -370,7 -346,7 +369,7 @@@ public class SSTableReaderTes
          boolean foundScanner = false;
          for (SSTableReader s : store.getSSTables())
          {
-             ICompactionScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null);
 -            ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1), s.partitioner), null);
++            ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null);
              scanner.next(); // throws exception pre 5407
              foundScanner = true;
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 5eae831,ecf97c3..24a0091
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -41,14 -37,10 +40,13 @@@ import org.apache.cassandra.db.Mutation
  import org.apache.cassandra.db.compaction.AbstractCompactedRow;
  import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
  import org.apache.cassandra.db.compaction.CompactionController;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
  import org.apache.cassandra.db.compaction.LazilyCompactedRow;
  import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.db.compaction.SSTableSplitter;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.metrics.StorageMetrics;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index d4053eb,ff60481..9da895e
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@@ -27,15 -25,12 +27,14 @@@ import org.junit.Test
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
  import org.apache.cassandra.dht.Bounds;
 -import org.apache.cassandra.dht.BytesToken;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
  import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 4f3d22a,b9a3821..00f07ff
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@@ -25,9 -25,6 +25,8 @@@ import java.util.*
  
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.utils.ByteBufferUtil;