You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/01/07 21:13:06 UTC
[1/2] cassandra git commit: Remove ref counting in SSTableScanner,
fix CompactionTask ordering
Repository: cassandra
Updated Branches:
refs/heads/trunk 493859bf6 -> 65ffc39b1
Remove ref counting in SSTableScanner, fix CompactionTask ordering
Patch by jmckenzie; reviewed by belliottsmith as a follow-up for CASSANDRA-8399
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdbb071f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdbb071f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdbb071f
Branch: refs/heads/trunk
Commit: bdbb071f4f87131d6996aac52f2b75a5833d5238
Parents: ddca610
Author: Joshua McKenzie <jm...@apache.org>
Authored: Wed Jan 7 13:05:31 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Jan 7 14:05:40 2015 -0600
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionTask.java | 82 ++++++++++----------
.../cassandra/io/sstable/SSTableScanner.java | 8 +-
2 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4885bc8..d215b4c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -140,7 +140,6 @@ public class CompactionTask extends AbstractCompactionTask
try (CompactionController controller = getCompactionController(sstables);)
{
-
Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
@@ -149,11 +148,16 @@ public class CompactionTask extends AbstractCompactionTask
long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
+ List<SSTableReader> newSStables;
+ AbstractCompactionIterable ci;
+
+ // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
+ // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
+ // See CASSANDRA-8019 and CASSANDRA-8399
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ ci = new CompactionIterable(compactionType, scanners.scanners, controller);
Iterator<AbstractCompactedRow> iter = ci.iterator();
- List<SSTableReader> newSStables;
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@ -215,44 +219,44 @@ public class CompactionTask extends AbstractCompactionTask
if (collector != null)
collector.finishCompaction(ci);
}
+ }
- Collection<SSTableReader> oldSStables = this.sstables;
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
- long endsize = SSTableReader.getTotalBytes(newSStables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder newSSTableNames = new StringBuilder();
- for (SSTableReader reader : newSStables)
- newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
- }
-
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
+ Collection<SSTableReader> oldSStables = this.sstables;
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
}
+
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdbb071f/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 5499195..dc065af 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -52,18 +52,15 @@ public class SSTableScanner implements ISSTableScanner
protected Iterator<OnDiskAtomIterator> iterator;
- // We can race with the sstable for deletion during compaction. If it's been ref counted to 0, skip
public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
{
- return sstable.acquireReference()
- ? new SSTableScanner(sstable, dataRange, limiter)
- : new SSTableScanner.EmptySSTableScanner(sstable.getFilename());
+ return new SSTableScanner(sstable, dataRange, limiter);
}
public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
{
// We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
- if (positions.isEmpty() || !sstable.acquireReference())
+ if (positions.isEmpty())
return new EmptySSTableScanner(sstable.getFilename());
return new SSTableScanner(sstable, tokenRanges, limiter);
@@ -173,7 +170,6 @@ public class SSTableScanner implements ISSTableScanner
public void close() throws IOException
{
FileUtils.close(dfile, ifile);
- sstable.releaseReference();
}
public long getLengthInBytes()
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65ffc39b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65ffc39b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65ffc39b
Branch: refs/heads/trunk
Commit: 65ffc39b16c83c6319498874b5f0c56149371aef
Parents: 493859b bdbb071
Author: Joshua McKenzie <jm...@apache.org>
Authored: Wed Jan 7 14:11:04 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Jan 7 14:11:04 2015 -0600
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionTask.java | 82 ++++++++++----------
.../io/sstable/format/big/BigTableScanner.java | 8 +-
2 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65ffc39b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 2543f47,d215b4c..8b5058b
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -157,11 -148,16 +156,16 @@@ public class CompactionTask extends Abs
long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
+ List<SSTableReader> newSStables;
+ AbstractCompactionIterable ci;
+
+ // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
+ // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
+ // See CASSANDRA-8019 and CASSANDRA-8399
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
- ci = new CompactionIterable(compactionType, scanners.scanners, controller);
++ ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
Iterator<AbstractCompactedRow> iter = ci.iterator();
- List<SSTableReader> newSStables;
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@@ -223,44 -219,44 +227,44 @@@
if (collector != null)
collector.finishCompaction(ci);
}
+ }
- Collection<SSTableReader> oldSStables = this.sstables;
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
- long endsize = SSTableReader.getTotalBytes(newSStables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder newSSTableNames = new StringBuilder();
- for (SSTableReader reader : newSStables)
- newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
- }
-
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
+ Collection<SSTableReader> oldSStables = this.sstables;
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
}
+
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
++ logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
++ taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65ffc39b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 73a5d76,0000000..85bc37d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -1,354 -1,0 +1,350 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
+import org.apache.cassandra.db.columniterator.LazyColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+public class BigTableScanner implements ISSTableScanner
+{
+ protected final RandomAccessReader dfile;
+ protected final RandomAccessReader ifile;
+ public final SSTableReader sstable;
+
+ private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
+ private AbstractBounds<RowPosition> currentRange;
+
+ private final DataRange dataRange;
+ private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+
+ protected Iterator<OnDiskAtomIterator> iterator;
+
- // We can race with the sstable for deletion during compaction. If it's been ref counted to 0, skip
+ public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+ {
- return sstable.acquireReference()
- ? new BigTableScanner(sstable, dataRange, limiter)
- : new BigTableScanner.EmptySSTableScanner(sstable.getFilename());
++ return new BigTableScanner(sstable, dataRange, limiter);
+ }
+ public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+ {
+ // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
+ List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
- if (positions.isEmpty() || !sstable.acquireReference())
++ if (positions.isEmpty())
+ return new EmptySSTableScanner(sstable.getFilename());
+
+ return new BigTableScanner(sstable, tokenRanges, limiter);
+ }
+
+ /**
+ * @param sstable SSTable to scan; must not be null
+ * @param dataRange a single range to scan; must not be null
+ * @param limiter background i/o RateLimiter; may be null
+ */
+ private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+ {
+ assert sstable != null;
+
+ this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
+ this.ifile = sstable.openIndexReader();
+ this.sstable = sstable;
+ this.dataRange = dataRange;
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+ List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
+ if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum())
+ {
+ // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
+ // 2) the part that comes before the wrap-around
+ boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey()));
+ boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound()));
+ }
+ else
+ {
+ boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey()));
+ }
+ this.rangeIterator = boundsList.iterator();
+ }
+
+ /**
+ * @param sstable SSTable to scan; must not be null
+ * @param tokenRanges A set of token ranges to scan
+ * @param limiter background i/o RateLimiter; may be null
+ */
+ private BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+ {
+ assert sstable != null;
+
+ this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
+ this.ifile = sstable.openIndexReader();
+ this.sstable = sstable;
+ this.dataRange = null;
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+ List<Range<Token>> normalized = Range.normalize(tokenRanges);
+ List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
+ for (Range<Token> range : normalized)
+ boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(), range.right.maxKeyBound()));
+
+ this.rangeIterator = boundsList.iterator();
+ }
+
+ private void seekToCurrentRangeStart()
+ {
+ if (currentRange.left.isMinimum())
+ return;
+
+ long indexPosition = sstable.getIndexScanPosition(currentRange.left);
+ // -1 means the key is before everything in the sstable. So just start from the beginning.
+ if (indexPosition == -1)
+ {
+ // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and
+ // the seeks are no-op anyway if we are.
+ ifile.seek(0);
+ dfile.seek(0);
+ return;
+ }
+
+ ifile.seek(indexPosition);
+ try
+ {
+
+ while (!ifile.isEOF())
+ {
+ indexPosition = ifile.getFilePointer();
+ DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ int comparison = indexDecoratedKey.compareTo(currentRange.left);
+ // because our range start may be inclusive or exclusive, we need to also contains()
+ // instead of just checking (comparison >= 0)
+ if (comparison > 0 || currentRange.contains(indexDecoratedKey))
+ {
+ // Found, just read the dataPosition and seek into index and data files
+ long dataPosition = ifile.readLong();
+ ifile.seek(indexPosition);
+ dfile.seek(dataPosition);
+ break;
+ }
+ else
+ {
+ RowIndexEntry.Serializer.skip(ifile);
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ }
+
+ public void close() throws IOException
+ {
+ FileUtils.close(dfile, ifile);
- sstable.releaseReference();
+ }
+
+ public long getLengthInBytes()
+ {
+ return dfile.length();
+ }
+
+ public long getCurrentPosition()
+ {
+ return dfile.getFilePointer();
+ }
+
+ public String getBackingFiles()
+ {
+ return sstable.toString();
+ }
+
+ public boolean hasNext()
+ {
+ if (iterator == null)
+ iterator = createIterator();
+ return iterator.hasNext();
+ }
+
+ public OnDiskAtomIterator next()
+ {
+ if (iterator == null)
+ iterator = createIterator();
+ return iterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private Iterator<OnDiskAtomIterator> createIterator()
+ {
+ return new KeyScanningIterator();
+ }
+
+ protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
+ {
+ private DecoratedKey nextKey;
+ private RowIndexEntry nextEntry;
+ private DecoratedKey currentKey;
+ private RowIndexEntry currentEntry;
+
+ protected OnDiskAtomIterator computeNext()
+ {
+ try
+ {
+ if (nextEntry == null)
+ {
+ do
+ {
+ // we're starting the first range or we just passed the end of the previous range
+ if (!rangeIterator.hasNext())
+ return endOfData();
+
+ currentRange = rangeIterator.next();
+ seekToCurrentRangeStart();
+
+ if (ifile.isEOF())
+ return endOfData();
+
+ currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+ } while (!currentRange.contains(currentKey));
+ }
+ else
+ {
+ // we're in the middle of a range
+ currentKey = nextKey;
+ currentEntry = nextEntry;
+ }
+
+ long readEnd;
+ if (ifile.isEOF())
+ {
+ nextEntry = null;
+ nextKey = null;
+ readEnd = dfile.length();
+ }
+ else
+ {
+ // we need the position of the start of the next key, regardless of whether it falls in the current range
+ nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+ readEnd = nextEntry.position;
+
+ if (!currentRange.contains(nextKey))
+ {
+ nextKey = null;
+ nextEntry = null;
+ }
+ }
+
+ if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
+ {
+ dfile.seek(currentEntry.position + currentEntry.headerOffset());
+ ByteBufferUtil.readWithShortLength(dfile); // key
+ return new SSTableIdentityIterator(sstable, dfile, currentKey);
+ }
+
+ return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
+ {
+ public OnDiskAtomIterator create()
+ {
+ return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+ }
+ });
+
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "(" +
+ "dfile=" + dfile +
+ " ifile=" + ifile +
+ " sstable=" + sstable +
+ ")";
+ }
+
+ public static class EmptySSTableScanner implements ISSTableScanner
+ {
+ private final String filename;
+
+ public EmptySSTableScanner(String filename)
+ {
+ this.filename = filename;
+ }
+
+ public long getLengthInBytes()
+ {
+ return 0;
+ }
+
+ public long getCurrentPosition()
+ {
+ return 0;
+ }
+
+ public String getBackingFiles()
+ {
+ return filename;
+ }
+
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public OnDiskAtomIterator next()
+ {
+ return null;
+ }
+
+ public void close() throws IOException { }
+
+ public void remove() { }
+ }
+
+
+}