You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/10/30 20:11:46 UTC
[3/6] git commit: Use non-pooled SegmentedFile builders when bulk
loading Patch by Sam Tunnicliffe,
reviewed by brandonwilliams for CASSANDRA-6272
Use non-pooled SegmentedFile builders when bulk loading
Patch by Sam Tunnicliffe, reviewed by brandonwilliams for CASSANDRA-6272
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18f79353
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18f79353
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18f79353
Branch: refs/heads/trunk
Commit: 18f7935370b1628cba8fa35819c05f235ba918aa
Parents: 620bb80
Author: Brandon Williams <br...@apache.org>
Authored: Wed Oct 30 14:01:44 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Oct 30 14:03:08 2013 -0500
----------------------------------------------------------------------
.../cassandra/io/sstable/SSTableReader.java | 44 ++++++--------------
.../cassandra/io/sstable/SSTableReaderTest.java | 38 +++++++++++++++++
2 files changed, 50 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f79353/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index ed221d9..4af1cdc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -166,8 +166,19 @@ public class SSTableReader extends SSTable implements Closeable
partitioner,
System.currentTimeMillis(),
sstableMetadata);
+
+ // special implementation of load to use non-pooled SegmentedFile builders
+ SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+ SegmentedFile.Builder dbuilder = sstable.compression
+ ? new CompressedSegmentedFile.Builder()
+ : new BufferedSegmentedFile.Builder();
+
+ if (!loadSummary(sstable, ibuilder, dbuilder))
+ sstable.buildSummary(false, ibuilder, dbuilder, false);
+ sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
+ sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
+
sstable.bf = new AlwaysPresentFilter();
- sstable.loadForBatch();
return sstable;
}
@@ -409,37 +420,6 @@ public class SSTableReader extends SSTable implements Closeable
saveSummary(this, ibuilder, dbuilder);
}
- /**
- * A simplified load that creates a minimal partition index
- */
- private void loadForBatch() throws IOException
- {
- SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
- SegmentedFile.Builder dbuilder = compression
- ? new CompressedSegmentedFile.Builder()
- : new BufferedSegmentedFile.Builder();
-
- // build a bare-bones IndexSummary
- IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1);
- RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
- try
- {
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- first = decodeKey(partitioner, descriptor, key);
- summaryBuilder.maybeAddEntry(first, 0);
- indexSummary = summaryBuilder.build(partitioner);
- }
- finally
- {
- FileUtils.closeQuietly(in);
- }
-
- last = null; // shouldn't need this for batch operations
-
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
- dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- }
-
private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
{
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/18f79353/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 02b6855..fef27f1 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -27,8 +27,10 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
+import com.google.common.collect.Sets;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -315,6 +317,42 @@ public class SSTableReaderTest extends SchemaLoader
assert reopened.first.token instanceof LocalToken;
}
+ @Test
+ public void testGetPositionsForRangesFromTableOpenedForBulkLoading() throws IOException, ExecutionException, InterruptedException
+ {
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard2");
+
+ // insert data and compact to a single sstable. The
+ // number of keys inserted is greater than index_interval
+ // to ensure multiple segments in the index file
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 130; j++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath("Standard2", null, ByteBufferUtil.bytes("0")), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store);
+
+ // construct a range which is present in the sstable, but whose
+ // keys are not found in the first segment of the index.
+ List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+ ranges.add(new Range<Token>(t(98), t(99)));
+
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
+ assert sections.size() == 1 : "Expected to find range in sstable" ;
+
+ // re-open the same sstable as it would be during bulk loading
+ Set<Component> components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
+ SSTableReader bulkLoaded = SSTableReader.openForBatch(sstable.descriptor, components, sstable.partitioner);
+ sections = bulkLoaded.getPositionsForRanges(ranges);
+ assert sections.size() == 1 : "Expected to find range in sstable opened for bulk loading";
+ }
+
private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS) throws IOException
{
assert "Indexed1".equals(indexedCFS.getColumnFamilyName());