You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/02 01:30:11 UTC
[2/4] git commit: Reduce SSTableLoader memory usage patch by jbellis;
reviewed by dbrosius for CASSANDRA-5555
Reduce SSTableLoader memory usage
patch by jbellis; reviewed by dbrosius for CASSANDRA-5555
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/079ae68f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/079ae68f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/079ae68f
Branch: refs/heads/trunk
Commit: 079ae68fb7086259439491e6a10bc2d8a947f52c
Parents: 2f72f8b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 30 23:14:40 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 30 23:14:40 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../apache/cassandra/io/sstable/SSTableLoader.java | 2 +-
.../apache/cassandra/io/sstable/SSTableReader.java | 150 ++++++++++-----
4 files changed, 108 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9d53d17..a746c09 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.6
+ * Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
* Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 429859e..81ced05 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -232,7 +232,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (loadSSTables)
{
Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
- Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner);
+ Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
if (metadata.getDefaultValidator().isCommutative())
{
// Filter non-compacted sstables, remove compacted ones
http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 9965138..67c6a02 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -95,7 +95,7 @@ public class SSTableLoader
try
{
- sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner()));
+ sstables.add(SSTableReader.openForBatch(desc, components, client.getPartitioner()));
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index ea9c451..574465d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.*;
-import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,41 +153,33 @@ public class SSTableReader extends SSTable
return open(descriptor, components, metadata, partitioner, true);
}
+ public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
+ {
+ SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+ SSTableReader sstable = new SSTableReader(descriptor,
+ components,
+ null,
+ partitioner,
+ System.currentTimeMillis(),
+ sstableMetadata);
+ sstable.bf = new AlwaysPresentFilter();
+ sstable.loadForBatch();
+ return sstable;
+ }
+
private static SSTableReader open(Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
IPartitioner partitioner,
boolean validate) throws IOException
{
- assert partitioner != null;
- // Minimum components without which we can't do anything
- assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
- assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
-
long start = System.currentTimeMillis();
- logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
-
- SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
-
- // Check if sstable is created using same partitioner.
- // Partitioner can be null, which indicates older version of sstable or no stats available.
- // In that case, we skip the check.
- String partitionerName = partitioner.getClass().getCanonicalName();
- if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
- {
- logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
- descriptor, sstableMetadata.partitioner, partitionerName));
- System.exit(1);
- }
+ SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
SSTableReader sstable = new SSTableReader(descriptor,
components,
metadata,
partitioner,
- null,
- null,
- null,
- null,
System.currentTimeMillis(),
sstableMetadata);
// versions before 'c' encoded keys as utf-16 before hashing to the filter
@@ -214,6 +205,30 @@ public class SSTableReader extends SSTable
return sstable;
}
+ private static SSTableMetadata openMetadata(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
+ {
+ assert partitioner != null;
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
+
+ SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, sstableMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
+ return sstableMetadata;
+ }
+
public static void logOpenException(Descriptor descriptor, IOException e)
{
if (e instanceof FileNotFoundException)
@@ -222,9 +237,9 @@ public class SSTableReader extends SSTable
logger.error("Corrupt sstable " + descriptor + "; skipped", e);
}
- public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
- final CFMetaData metadata,
- final IPartitioner partitioner)
+ public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+ final CFMetaData metadata,
+ final IPartitioner partitioner)
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
@@ -295,6 +310,20 @@ public class SSTableReader extends SSTable
Set<Component> components,
CFMetaData metadata,
IPartitioner partitioner,
+ long maxDataAge,
+ SSTableMetadata sstableMetadata)
+ {
+ super(desc, components, metadata, partitioner);
+ this.sstableMetadata = sstableMetadata;
+ this.maxDataAge = maxDataAge;
+
+ this.deletingTask = new SSTableDeletingTask(this);
+ }
+
+ private SSTableReader(Descriptor desc,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
SegmentedFile ifile,
SegmentedFile dfile,
IndexSummary indexSummary,
@@ -302,15 +331,12 @@ public class SSTableReader extends SSTable
long maxDataAge,
SSTableMetadata sstableMetadata)
{
- super(desc, components, metadata, partitioner);
- this.sstableMetadata = sstableMetadata;
- this.maxDataAge = maxDataAge;
+ this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata);
this.ifile = ifile;
this.dfile = dfile;
this.indexSummary = indexSummary;
this.bf = bloomFilter;
- this.deletingTask = new SSTableDeletingTask(this);
}
public void setTrackedBy(DataTracker tracker)
@@ -349,16 +375,56 @@ public class SSTableReader extends SSTable
{
SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
SegmentedFile.Builder dbuilder = compression
- ? SegmentedFile.getCompressedBuilder()
- : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+
+ boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
+ if (recreatebloom || !summaryLoaded)
+ buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded);
+
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ if (recreatebloom || !summaryLoaded) // save summary information to disk
+ saveSummary(this, ibuilder, dbuilder);
+ }
+
+ /**
+ * A simplified load that creates a minimal partition index
+ */
+ private void loadForBatch() throws IOException
+ {
+ // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments
+ SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+ SegmentedFile.Builder dbuilder = compression
+ ? SegmentedFile.getCompressedBuilder()
+ : new BufferedSegmentedFile.Builder();
+
+ // build a bare-bones IndexSummary
+ IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1);
+ RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+ try
+ {
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+ first = decodeKey(partitioner, descriptor, key);
+ summaryBuilder.maybeAddEntry(first, 0);
+ indexSummary = summaryBuilder.build(partitioner);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(in);
+ }
+
+ last = null; // shouldn't need this for batch operations
+
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ }
+
+ private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
+ {
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
-
- // try to load summaries from the disk and check if we need
- // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
- final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
- final boolean readIndex = recreatebloom || !summaryLoaded;
try
{
long indexSize = primaryIndex.length();
@@ -374,7 +440,7 @@ public class SSTableReader extends SSTable
summaryBuilder = new IndexSummaryBuilder(estimatedKeys);
long indexPosition;
- while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
@@ -405,12 +471,6 @@ public class SSTableReader extends SSTable
first = getMinimalKey(first);
last = getMinimalKey(last);
- // finalize the load.
- // finalize the state of the reader
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
- dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- if (readIndex) // save summary information to disk
- saveSummary(this, ibuilder, dbuilder);
}
public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)