You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2014/11/13 19:07:42 UTC
[1/2] cassandra git commit: Fix ordering on SSTableScanner /
SSTableReader close
Repository: cassandra
Updated Branches:
refs/heads/trunk 4ff6e7dff -> 9c66482e1
Fix ordering on SSTableScanner / SSTableReader close
Patch by jmckenzie; reviewed by marcuse for CASSANDRA-8019
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3484181e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3484181e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3484181e
Branch: refs/heads/trunk
Commit: 3484181e2b68ce04acfe4e9028d5d1d76335094a
Parents: 5615a79
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Nov 13 11:56:34 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Nov 13 11:56:34 2014 -0600
----------------------------------------------------------------------
.../cassandra/io/sstable/SSTableScanner.java | 3 +
.../io/sstable/SSTableRewriterTest.java | 190 ++++++++++---------
2 files changed, 106 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3484181e/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 62ac175..3f1f1f0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -63,6 +63,7 @@ public class SSTableScanner implements ICompactionScanner
SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
{
assert sstable != null;
+ sstable.acquireReference();
this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader();
@@ -92,6 +93,7 @@ public class SSTableScanner implements ICompactionScanner
SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
{
assert sstable != null;
+ sstable.acquireReference();
this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader();
@@ -159,6 +161,7 @@ public class SSTableScanner implements ICompactionScanner
public void close() throws IOException
{
FileUtils.close(dfile, ifile);
+ sstable.releaseReference();
}
public long getLengthInBytes()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3484181e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 4d248bd..8a494a6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -69,14 +69,16 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
- AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
- ICompactionScanner scanner = scanners.scanners.get(0);
- CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
- writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
- while(scanner.hasNext())
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
{
- AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
- writer.append(row);
+ ICompactionScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ while(scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+ writer.append(row);
+ }
}
cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
@@ -144,7 +146,7 @@ public class SSTableRewriterTest extends SchemaLoader
@Test
- public void testNumberOfFilesAndSizes() throws InterruptedException
+ public void testNumberOfFilesAndSizes() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -158,20 +160,22 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- ICompactionScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0);
int files = 1;
- while(scanner.hasNext())
+ try (ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
{
- rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
- if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ while(scanner.hasNext())
{
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- files++;
- assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
- assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
- assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
-
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
+
+ }
}
}
List<SSTableReader> sstables = rewriter.finish();
@@ -191,7 +195,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
@Test
- public void testNumberOfFiles_dont_clean_readers() throws InterruptedException
+ public void testNumberOfFiles_dont_clean_readers() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -205,17 +209,19 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- ICompactionScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0);
int files = 1;
- while(scanner.hasNext())
+ try (ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
{
- rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
- if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ while(scanner.hasNext())
{
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- files++;
- assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
}
}
List<SSTableReader> sstables = rewriter.finish();
@@ -230,7 +236,7 @@ public class SSTableRewriterTest extends SchemaLoader
@Test
- public void testNumberOfFiles_abort() throws InterruptedException
+ public void testNumberOfFiles_abort() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -246,17 +252,19 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- ICompactionScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0);
int files = 1;
- while(scanner.hasNext())
+ try (ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
{
- rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
- if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ while(scanner.hasNext())
{
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- files++;
- assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
}
}
rewriter.abort();
@@ -271,7 +279,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
@Test
- public void testNumberOfFiles_abort2() throws InterruptedException
+ public void testNumberOfFiles_abort2() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -287,23 +295,25 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- ICompactionScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0);
int files = 1;
- while(scanner.hasNext())
+ try (ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
{
- rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
- if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ while(scanner.hasNext())
{
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- files++;
- assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
- }
- if (files == 3)
- {
- //testing to abort when we have nothing written in the new file
- rewriter.abort();
- break;
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ if (files == 3)
+ {
+ //testing to abort when we have nothing written in the new file
+ rewriter.abort();
+ break;
+ }
}
}
Thread.sleep(1000);
@@ -316,7 +326,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
@Test
- public void testNumberOfFiles_finish_empty_new_writer() throws InterruptedException
+ public void testNumberOfFiles_finish_empty_new_writer() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -330,24 +340,26 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- ICompactionScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0);
int files = 1;
- while(scanner.hasNext())
+ try (ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
{
- rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
- if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
- {
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- files++;
- assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
- }
- if (files == 3)
+ while(scanner.hasNext())
{
- //testing to finish when we have nothing written in the new file
- List<SSTableReader> sstables = rewriter.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
- break;
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
+ if (files == 3)
+ {
+ //testing to finish when we have nothing written in the new file
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ break;
+ }
}
}
Thread.sleep(1000);
@@ -357,7 +369,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
@Test
- public void testNumberOfFiles_truncate() throws InterruptedException
+ public void testNumberOfFiles_truncate() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -371,17 +383,19 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- ICompactionScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0);
int files = 1;
- while(scanner.hasNext())
+ try (ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
{
- rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
- if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ while(scanner.hasNext())
{
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- files++;
- assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ }
}
}
List<SSTableReader> sstables = rewriter.finish();
@@ -393,7 +407,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
@Test
- public void testSmallFiles() throws InterruptedException
+ public void testSmallFiles() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -408,18 +422,20 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- ICompactionScanner scanner = s.getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0);
int files = 1;
- while(scanner.hasNext())
+ try (ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
{
- rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
- if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+ while(scanner.hasNext())
{
- assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
- assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- files++;
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+ {
+ assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
+ assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ }
}
}
List<SSTableReader> sstables = rewriter.finish();
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c66482e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c66482e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c66482e
Branch: refs/heads/trunk
Commit: 9c66482e16b2fe45605dae8132fec8aff5eda470
Parents: 4ff6e7d 3484181
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Nov 13 12:01:19 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Nov 13 12:01:19 2014 -0600
----------------------------------------------------------------------
.../io/sstable/format/big/BigTableScanner.java | 3 +
.../io/sstable/SSTableRewriterTest.java | 190 ++++++++++---------
2 files changed, 106 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c66482e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index c1fc079,0000000..db55353
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -1,299 -1,0 +1,302 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
+import org.apache.cassandra.db.columniterator.LazyColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class BigTableScanner implements ICompactionScanner
+{
+ protected final RandomAccessReader dfile;
+ protected final RandomAccessReader ifile;
+ public final SSTableReader sstable;
+
+ private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
+ private AbstractBounds<RowPosition> currentRange;
+
+ private final DataRange dataRange;
+ private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+
+ protected Iterator<OnDiskAtomIterator> iterator;
+
+ /**
+ * @param sstable SSTable to scan; must not be null
+ * @param dataRange a single range to scan; must not be null
+ * @param limiter background i/o RateLimiter; may be null
+ */
+ public BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+ {
+ assert sstable != null;
++ sstable.acquireReference();
+
+ this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
+ this.ifile = sstable.openIndexReader();
+ this.sstable = sstable;
+ this.dataRange = dataRange;
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+ List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
+ if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum(sstable.partitioner))
+ {
+ // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
+ // 2) the part that comes before the wrap-around
+ boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey(), sstable.partitioner));
+ boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound(), sstable.partitioner));
+ }
+ else
+ {
+ boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey(), sstable.partitioner));
+ }
+ this.rangeIterator = boundsList.iterator();
+ }
+
+ /**
+ * @param sstable SSTable to scan; must not be null
+ * @param tokenRanges A set of token ranges to scan
+ * @param limiter background i/o RateLimiter; may be null
+ */
+ public BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+ {
+ assert sstable != null;
++ sstable.acquireReference();
+
+ this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
+ this.ifile = sstable.openIndexReader();
+ this.sstable = sstable;
+ this.dataRange = null;
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+ List<Range<Token>> normalized = Range.normalize(tokenRanges);
+ List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
+ for (Range<Token> range : normalized)
+ boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner),
+ range.right.maxKeyBound(sstable.partitioner),
+ sstable.partitioner));
+
+ this.rangeIterator = boundsList.iterator();
+ }
+
+ private void seekToCurrentRangeStart()
+ {
+ if (currentRange.left.isMinimum(sstable.partitioner))
+ return;
+
+ long indexPosition = sstable.getIndexScanPosition(currentRange.left);
+ // -1 means the key is before everything in the sstable. So just start from the beginning.
+ if (indexPosition == -1)
+ {
+ // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and
+ // the seeks are no-op anyway if we are.
+ ifile.seek(0);
+ dfile.seek(0);
+ return;
+ }
+
+ ifile.seek(indexPosition);
+ try
+ {
+
+ while (!ifile.isEOF())
+ {
+ indexPosition = ifile.getFilePointer();
+ DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ int comparison = indexDecoratedKey.compareTo(currentRange.left);
+ // because our range start may be inclusive or exclusive, we need to also contains()
+ // instead of just checking (comparison >= 0)
+ if (comparison > 0 || currentRange.contains(indexDecoratedKey))
+ {
+ // Found, just read the dataPosition and seek into index and data files
+ long dataPosition = ifile.readLong();
+ ifile.seek(indexPosition);
+ dfile.seek(dataPosition);
+ break;
+ }
+ else
+ {
+ RowIndexEntry.Serializer.skip(ifile);
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ }
+
+ public void close() throws IOException
+ {
+ FileUtils.close(dfile, ifile);
++ sstable.releaseReference();
+ }
+
+ public long getLengthInBytes()
+ {
+ return dfile.length();
+ }
+
+ public long getCurrentPosition()
+ {
+ return dfile.getFilePointer();
+ }
+
+ public String getBackingFiles()
+ {
+ return sstable.toString();
+ }
+
+ public boolean hasNext()
+ {
+ if (iterator == null)
+ iterator = createIterator();
+ return iterator.hasNext();
+ }
+
+ public OnDiskAtomIterator next()
+ {
+ if (iterator == null)
+ iterator = createIterator();
+ return iterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private Iterator<OnDiskAtomIterator> createIterator()
+ {
+ return new KeyScanningIterator();
+ }
+
+ protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
+ {
+ private DecoratedKey nextKey;
+ private RowIndexEntry nextEntry;
+ private DecoratedKey currentKey;
+ private RowIndexEntry currentEntry;
+
+ protected OnDiskAtomIterator computeNext()
+ {
+ try
+ {
+ if (nextEntry == null)
+ {
+ do
+ {
+ // we're starting the first range or we just passed the end of the previous range
+ if (!rangeIterator.hasNext())
+ return endOfData();
+
+ currentRange = rangeIterator.next();
+ seekToCurrentRangeStart();
+
+ if (ifile.isEOF())
+ return endOfData();
+
+ currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+ } while (!currentRange.contains(currentKey));
+ }
+ else
+ {
+ // we're in the middle of a range
+ currentKey = nextKey;
+ currentEntry = nextEntry;
+ }
+
+ long readEnd;
+ if (ifile.isEOF())
+ {
+ nextEntry = null;
+ nextKey = null;
+ readEnd = dfile.length();
+ }
+ else
+ {
+ // we need the position of the start of the next key, regardless of whether it falls in the current range
+ nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+ nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+ readEnd = nextEntry.position;
+
+ if (!currentRange.contains(nextKey))
+ {
+ nextKey = null;
+ nextEntry = null;
+ }
+ }
+
+ if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
+ {
+ dfile.seek(currentEntry.position + currentEntry.headerOffset());
+ ByteBufferUtil.readWithShortLength(dfile); // key
+ return new SSTableIdentityIterator(sstable, dfile, currentKey);
+ }
+
+ return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
+ {
+ public OnDiskAtomIterator create()
+ {
+ return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+ }
+ });
+
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "(" +
+ "dfile=" + dfile +
+ " ifile=" + ifile +
+ " sstable=" + sstable +
+ ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c66482e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------