You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/11 16:58:32 UTC
[3/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d374379/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 4fcbc31,0000000..127a60c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@@ -1,250 -1,0 +1,260 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
+ * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
+ */
+public class BigTableReader extends SSTableReader
+{
+ private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
+
+ BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason)
+ {
+ super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+ }
+
+ public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns)
+ {
+ return new SSTableNamesIterator(this, key, columns);
+ }
+
+ public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry )
+ {
+ return new SSTableNamesIterator(this, input, key, columns, indexEntry);
+ }
+
+ public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse)
+ {
+ return new SSTableSliceIterator(this, key, slices, reverse);
+ }
+
+ public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry)
+ {
+ return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry);
+ }
+ /**
+ *
+ * @param dataRange filter to use when reading the columns
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter)
+ {
+ return BigTableScanner.getScanner(this, dataRange, limiter);
+ }
+
+
+ /**
+ * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
+ *
+ * @param ranges the range of keys to cover
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
+ {
+ return BigTableScanner.getScanner(this, ranges, limiter);
+ }
+
+
+ /**
+ * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
+ * allow key selection by token bounds but only if op != * EQ
+ * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
+ * @param updateCacheAndStats true if updating stats and cache
+ * @return The index entry corresponding to the key, or null if the key is not present
+ */
- public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats)
++ protected RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast)
+ {
- // first, check bloom filter
+ if (op == Operator.EQ)
+ {
+ assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key
+ if (!bf.isPresent(((DecoratedKey)key).getKey()))
+ {
+ Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation);
+ return null;
+ }
+ }
+
+ // next, the key cache (only make sense for valid row key)
+ if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
+ {
+ DecoratedKey decoratedKey = (DecoratedKey)key;
+ KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey());
+ RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
+ if (cachedPosition != null)
+ {
+ Tracing.trace("Key cache hit for sstable {}", descriptor.generation);
+ return cachedPosition;
+ }
+ }
+
+ // check the smallest and greatest keys in the sstable to see if it can't be present
- if (first.compareTo(key) > 0 || last.compareTo(key) < 0)
++ boolean skip = false;
++ if (key.compareTo(first) < 0)
++ {
++ if (op == Operator.EQ)
++ skip = true;
++ else
++ key = first;
++
++ op = Operator.EQ;
++ }
++ else
++ {
++ int l = last.compareTo(key);
++ // l <= 0 => we may be looking past the end of the file; we then narrow our behaviour to:
++ // 1) skipping if strictly greater for GE and EQ;
++ // 2) skipping if equal and searching GT, and we aren't permitting matching past last
++ skip = l <= 0 && (l < 0 || (!permitMatchPastLast && op == Operator.GT));
++ }
++ if (skip)
+ {
+ if (op == Operator.EQ && updateCacheAndStats)
+ bloomFilterTracker.addFalsePositive();
-
- if (op.apply(1) < 0)
- {
- Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
- return null;
- }
++ Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
++ return null;
+ }
+
+ int binarySearchResult = indexSummary.binarySearch(key);
+ long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
+ int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
+
- // if we matched the -1th position, we'll start at the first position
- sampledPosition = sampledPosition == -1 ? 0 : sampledPosition;
-
+ int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
+
+ // scan the on-disk index, starting at the nearest sampled position.
+ // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present
+ // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the
+ // next index position because the searched key can be greater the last key of the index interval checked if it
+ // is lesser than the first key of next interval (and in that case we must return the position of the first key
+ // of the next interval).
+ int i = 0;
+ Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
+ while (segments.hasNext() && i <= effectiveInterval)
+ {
+ FileDataInput in = segments.next();
+ try
+ {
+ while (!in.isEOF() && i <= effectiveInterval)
+ {
+ i++;
+
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+
+ boolean opSatisfied; // did we find an appropriate position for the op requested
+ boolean exactMatch; // is the current position an exact match for the key, suitable for caching
+
+ // Compare raw keys if possible for performance, otherwise compare decorated keys.
+ if (op == Operator.EQ)
+ {
+ opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
+ }
+ else
+ {
+ DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
+ int comparison = indexDecoratedKey.compareTo(key);
+ int v = op.apply(comparison);
+ opSatisfied = (v == 0);
+ exactMatch = (comparison == 0);
+ if (v < 0)
+ {
+ Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
+ return null;
+ }
+ }
+
+ if (opSatisfied)
+ {
+ // read data position from index entry
+ RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version);
+ if (exactMatch && updateCacheAndStats)
+ {
+ assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
+ DecoratedKey decoratedKey = (DecoratedKey)key;
+
+ if (logger.isTraceEnabled())
+ {
+ // expensive sanity check! see CASSANDRA-4687
+ FileDataInput fdi = dfile.getSegment(indexEntry.position);
+ DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+ if (!keyInDisk.equals(key))
+ throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
+ fdi.close();
+ }
+
+ // store exact match for the key
+ cacheKey(decoratedKey, indexEntry);
+ }
+ if (op == Operator.EQ && updateCacheAndStats)
+ bloomFilterTracker.addTruePositive();
+ Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
+ return indexEntry;
+ }
+
+ RowIndexEntry.Serializer.skip(in);
+ }
+ }
+ catch (IOException e)
+ {
+ markSuspect();
+ throw new CorruptSSTableException(e, in.getPath());
+ }
+ finally
+ {
+ FileUtils.closeQuietly(in);
+ }
+ }
+
+ if (op == SSTableReader.Operator.EQ && updateCacheAndStats)
+ bloomFilterTracker.addFalsePositive();
+ Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation);
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d374379/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 1e187ff,0000000..2e28d0b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -1,350 -1,0 +1,365 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
++import com.google.common.collect.Ordering;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
+import org.apache.cassandra.db.columniterator.LazyColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+public class BigTableScanner implements ISSTableScanner
+{
+ protected final RandomAccessReader dfile;
+ protected final RandomAccessReader ifile;
+ public final SSTableReader sstable;
+
+ private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
+ private AbstractBounds<RowPosition> currentRange;
+
+ private final DataRange dataRange;
+ private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+
+ protected Iterator<OnDiskAtomIterator> iterator;
+
+ public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+ {
+ return new BigTableScanner(sstable, dataRange, limiter);
+ }
+ public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+ {
+ // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
+ List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(tokenRanges);
+ if (positions.isEmpty())
+ return new EmptySSTableScanner(sstable.getFilename());
+
+ return new BigTableScanner(sstable, tokenRanges, limiter);
+ }
+
+ /**
+ * @param sstable SSTable to scan; must not be null
+ * @param dataRange a single range to scan; must not be null
+ * @param limiter background i/o RateLimiter; may be null
+ */
+ private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+ {
+ assert sstable != null;
+
+ this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
+ this.ifile = sstable.openIndexReader();
+ this.sstable = sstable;
+ this.dataRange = dataRange;
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+ List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
- if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum())
++ // we enforce the first/last keys of the sstablereader
++ if (dataRange.isWrapAround())
+ {
- // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
- // 2) the part that comes before the wrap-around
- boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey()));
- boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound()));
++ if (dataRange.stopKey().isMinimum()
++ || dataRange.stopKey().compareTo(sstable.last) >= 0
++ || dataRange.startKey().compareTo(sstable.first) <= 0)
++ {
++ boundsList.add(new Bounds<RowPosition>(sstable.first, sstable.last));
++ }
++ else
++ {
++ if (dataRange.startKey().compareTo(sstable.last) <= 0)
++ boundsList.add(new Bounds<>(dataRange.startKey(), sstable.last));
++ if (dataRange.stopKey().compareTo(sstable.first) >= 0)
++ boundsList.add(new Bounds<>(sstable.first, dataRange.stopKey()));
++ }
+ }
+ else
+ {
- boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey()));
++ assert dataRange.startKey().compareTo(dataRange.stopKey()) <= 0 || dataRange.stopKey().isMinimum();
++ RowPosition left = Ordering.natural().max(dataRange.startKey(), sstable.first);
++ // apparently isWrapAround() doesn't count Bounds that extend to the limit (min) as wrapping
++ RowPosition right = dataRange.stopKey().isMinimum() ? sstable.last : Ordering.natural().min(dataRange.stopKey(), sstable.last);
++ if (left.compareTo(right) <= 0)
++ boundsList.add(new Bounds<>(left, right));
+ }
+ this.rangeIterator = boundsList.iterator();
+ }
+
+ /**
+ * @param sstable SSTable to scan; must not be null
+ * @param tokenRanges A set of token ranges to scan
+ * @param limiter background i/o RateLimiter; may be null
+ */
+ private BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+ {
+ assert sstable != null;
+
+ this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
+ this.ifile = sstable.openIndexReader();
+ this.sstable = sstable;
+ this.dataRange = null;
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+ List<Range<Token>> normalized = Range.normalize(tokenRanges);
+ List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
++ // we enforce the first/last keys of the sstablereader
+ for (Range<Token> range : normalized)
- boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(), range.right.maxKeyBound()));
++ {
++ // cap our ranges by the start/end of the sstable
++ RowPosition right = range.right.maxKeyBound();
++ if (right.compareTo(sstable.last) > 0)
++ right = sstable.last;
++
++ RowPosition left = range.left.maxKeyBound();
++ if (left.compareTo(sstable.first) < 0)
++ {
++ if (sstable.first.compareTo(right) <= 0)
++ boundsList.add(new Bounds<>(sstable.first, right));
++ }
++ else if (left.compareTo(right) < 0)
++ boundsList.add(new Range<>(left, right));
++ }
+
+ this.rangeIterator = boundsList.iterator();
+ }
+
+ private void seekToCurrentRangeStart()
+ {
- if (currentRange.left.isMinimum())
- return;
-
+ long indexPosition = sstable.getIndexScanPosition(currentRange.left);
- // -1 means the key is before everything in the sstable. So just start from the beginning.
- if (indexPosition == -1)
- {
- // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and
- // the seeks are no-op anyway if we are.
- ifile.seek(0);
- dfile.seek(0);
- return;
- }
-
+ ifile.seek(indexPosition);
+ try
+ {
+
+ while (!ifile.isEOF())
+ {
+ indexPosition = ifile.getFilePointer();
+ DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
- int comparison = indexDecoratedKey.compareTo(currentRange.left);
- // because our range start may be inclusive or exclusive, we need to also contains()
- // instead of just checking (comparison >= 0)
- if (comparison > 0 || currentRange.contains(indexDecoratedKey))
++ if (indexDecoratedKey.compareTo(currentRange.left) > 0 || currentRange.contains(indexDecoratedKey))
+ {
+ // Found, just read the dataPosition and seek into index and data files
+ long dataPosition = ifile.readLong();
+ ifile.seek(indexPosition);
+ dfile.seek(dataPosition);
+ break;
+ }
+ else
+ {
+ RowIndexEntry.Serializer.skip(ifile);
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ }
+
+ public void close() throws IOException
+ {
+ FileUtils.close(dfile, ifile);
+ }
+
+ public long getLengthInBytes()
+ {
+ return dfile.length();
+ }
+
+ public long getCurrentPosition()
+ {
+ return dfile.getFilePointer();
+ }
+
+ public String getBackingFiles()
+ {
+ return sstable.toString();
+ }
+
+ public boolean hasNext()
+ {
+ if (iterator == null)
+ iterator = createIterator();
+ return iterator.hasNext();
+ }
+
+ public OnDiskAtomIterator next()
+ {
+ if (iterator == null)
+ iterator = createIterator();
+ return iterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private Iterator<OnDiskAtomIterator> createIterator()
+ {
+ return new KeyScanningIterator();
+ }
+
+ protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
+ {
+ private DecoratedKey nextKey;
+ private RowIndexEntry nextEntry;
+ private DecoratedKey currentKey;
+ private RowIndexEntry currentEntry;
+
+ protected OnDiskAtomIterator computeNext()
+ {
+ try
+ {
+ if (nextEntry == null)
+ {
+ do
+ {
+ // we're starting the first range or we just passed the end of the previous range
+ if (!rangeIterator.hasNext())
+ return endOfData();
+
+ currentRange = rangeIterator.next();
+ seekToCurrentRangeStart();
+
+ if (ifile.isEOF())
+ return endOfData();
+
+ currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+ } while (!currentRange.contains(currentKey));
+ }
+ else
+ {
+ // we're in the middle of a range
+ currentKey = nextKey;
+ currentEntry = nextEntry;
+ }
+
+ long readEnd;
+ if (ifile.isEOF())
+ {
+ nextEntry = null;
+ nextKey = null;
+ readEnd = dfile.length();
+ }
+ else
+ {
+ // we need the position of the start of the next key, regardless of whether it falls in the current range
+ nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+ readEnd = nextEntry.position;
+
+ if (!currentRange.contains(nextKey))
+ {
+ nextKey = null;
+ nextEntry = null;
+ }
+ }
+
+ if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
+ {
+ dfile.seek(currentEntry.position + currentEntry.headerOffset());
+ ByteBufferUtil.readWithShortLength(dfile); // key
+ return new SSTableIdentityIterator(sstable, dfile, currentKey);
+ }
+
+ return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
+ {
+ public OnDiskAtomIterator create()
+ {
+ return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+ }
+ });
+
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "(" +
+ "dfile=" + dfile +
+ " ifile=" + ifile +
+ " sstable=" + sstable +
+ ")";
+ }
+
+ public static class EmptySSTableScanner implements ISSTableScanner
+ {
+ private final String filename;
+
+ public EmptySSTableScanner(String filename)
+ {
+ this.filename = filename;
+ }
+
+ public long getLengthInBytes()
+ {
+ return 0;
+ }
+
+ public long getCurrentPosition()
+ {
+ return 0;
+ }
+
+ public String getBackingFiles()
+ {
+ return filename;
+ }
+
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public OnDiskAtomIterator next()
+ {
+ return null;
+ }
+
+ public void close() throws IOException { }
+
+ public void remove() { }
+ }
+
+
+}