You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/21 18:46:40 UTC
svn commit: r1138084 - in /cassandra/trunk/src/java/org/apache/cassandra:
db/compaction/CompactionManager.java io/sstable/IndexWriter.java
io/sstable/Rebuilder.java io/sstable/SSTableWriter.java
Author: jbellis
Date: Tue Jun 21 16:46:40 2011
New Revision: 1138084
URL: http://svn.apache.org/viewvc?rev=1138084&view=rev
Log:
extract indexwriter, builder -> rebuilder
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1138084&r1=1138083&r2=1138084&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Jun 21 16:46:40 2011
@@ -832,7 +832,7 @@ public class CompactionManager implement
public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
{
// invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
- final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc, type);
+ final Rebuilder builder = SSTableWriter.createBuilder(desc, type);
Callable<SSTableReader> callable = new Callable<SSTableReader>()
{
public SSTableReader call() throws IOException
Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java?rev=1138084&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java Tue Jun 21 16:46:40 2011
@@ -0,0 +1,93 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+ */
+class IndexWriter
+{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
+
+ private final BufferedRandomAccessFile indexFile;
+ public final Descriptor desc;
+ public final IPartitioner partitioner;
+ public final SegmentedFile.Builder builder;
+ public final IndexSummary summary;
+ public final BloomFilter bf;
+ private FileMark mark;
+
+ IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
+ {
+ this.desc = desc;
+ this.partitioner = part;
+ indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
+ builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ summary = new IndexSummary(keyCount);
+ bf = BloomFilter.getFilter(keyCount, 15);
+ }
+
+ public void afterAppend(DecoratedKey key, long dataPosition) throws IOException
+ {
+ bf.add(key.key);
+ long indexPosition = indexFile.getFilePointer();
+ ByteBufferUtil.writeWithShortLength(key.key, indexFile);
+ indexFile.writeLong(dataPosition);
+ if (logger.isTraceEnabled())
+ logger.trace("wrote index of " + key + " at " + indexPosition);
+
+ summary.maybeAddEntry(key, indexPosition);
+ builder.addPotentialBoundary(indexPosition);
+ }
+
+ /**
+ * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+ */
+ public void close() throws IOException
+ {
+ // bloom filter
+ FileOutputStream fos = new FileOutputStream(desc.filenameFor(SSTable.COMPONENT_FILTER));
+ DataOutputStream stream = new DataOutputStream(fos);
+ BloomFilter.serializer().serialize(bf, stream);
+ stream.flush();
+ fos.getFD().sync();
+ stream.close();
+
+ // index
+ long position = indexFile.getFilePointer();
+ indexFile.close(); // calls force
+ FileUtils.truncate(indexFile.getPath(), position);
+
+ // finalize in-memory index state
+ summary.complete();
+ }
+
+ public void mark()
+ {
+ mark = indexFile.mark();
+ }
+
+ public void reset() throws IOException
+ {
+ // we can't un-set the bloom filter addition, but extra keys in there are harmless.
+ // we can't reset dbuilder either, but that is the last thing called in afterappend so
+ // we assume that if that worked then we won't be trying to reset.
+ indexFile.reset(mark);
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java?rev=1138084&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java Tue Jun 21 16:46:40 2011
@@ -0,0 +1,92 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionType;
+import org.apache.cassandra.streaming.OperationType;
+
+/**
+ * Removes the given SSTable from temporary status and opens it, rebuilding the
+ * bloom filter and row index from the data file.
+ */
+public class Rebuilder implements CompactionInfo.Holder
+{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
+
+ private final Descriptor desc;
+ private final OperationType type;
+ private final ColumnFamilyStore cfs;
+ private SSTableWriter.RowIndexer indexer;
+
+ public Rebuilder(Descriptor desc, OperationType type)
+ {
+ this.desc = desc;
+ this.type = type;
+ cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ maybeOpenIndexer();
+ try
+ {
+ // both file offsets are still valid post-close
+ return new CompactionInfo(desc.ksname,
+ desc.cfname,
+ CompactionType.SSTABLE_BUILD,
+ indexer.dfile.getFilePointer(),
+ indexer.dfile.length());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
+ // since the 8MB buffers can use up heap quickly
+ private void maybeOpenIndexer()
+ {
+ if (indexer != null)
+ return;
+ try
+ {
+ if (cfs.metadata.getDefaultValidator().isCommutative())
+ indexer = new SSTableWriter.CommutativeRowIndexer(desc, cfs, type);
+ else
+ indexer = new SSTableWriter.RowIndexer(desc, cfs, type);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public SSTableReader build() throws IOException
+ {
+ if (cfs.isInvalid())
+ return null;
+ maybeOpenIndexer();
+
+ File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+ File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+ assert !ifile.exists();
+ assert !ffile.exists();
+
+ long estimatedRows = indexer.prepareIndexing();
+
+ // build the index and filter
+ long rows = indexer.index();
+
+ logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
+ return SSTableReader.open(SSTableWriter.rename(desc, SSTable.componentsFor(desc, false)));
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1138084&r1=1138083&r2=1138084&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jun 21 16:46:40 2011
@@ -19,7 +19,9 @@
package org.apache.cassandra.io.sstable;
-import java.io.*;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -27,10 +29,6 @@ import java.util.HashSet;
import java.util.Set;
import com.google.common.collect.Sets;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +37,10 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.PrecompactedRow;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileMark;
@@ -47,7 +48,7 @@ import org.apache.cassandra.io.util.File
import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.FBUtilities;
@@ -232,7 +233,7 @@ public class SSTableWriter extends SSTab
return dataFile.getFilePointer();
}
- public static Builder createBuilder(Descriptor desc, OperationType type)
+ public static Rebuilder createBuilder(Descriptor desc, OperationType type)
{
if (!desc.isLatestVersion)
// TODO: streaming between different versions will fail: need support for
@@ -240,83 +241,7 @@ public class SSTableWriter extends SSTab
throw new RuntimeException(String.format("Cannot recover SSTable with version %s (current version %s).",
desc.version, Descriptor.CURRENT_VERSION));
- return new Builder(desc, type);
- }
-
- /**
- * Removes the given SSTable from temporary status and opens it, rebuilding the
- * bloom filter and row index from the data file.
- */
- public static class Builder implements CompactionInfo.Holder
- {
- private final Descriptor desc;
- private final OperationType type;
- private final ColumnFamilyStore cfs;
- private RowIndexer indexer;
-
- public Builder(Descriptor desc, OperationType type)
- {
- this.desc = desc;
- this.type = type;
- cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
- }
-
- public CompactionInfo getCompactionInfo()
- {
- maybeOpenIndexer();
- try
- {
- // both file offsets are still valid post-close
- return new CompactionInfo(desc.ksname,
- desc.cfname,
- CompactionType.SSTABLE_BUILD,
- indexer.dfile.getFilePointer(),
- indexer.dfile.length());
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
- // since the 8MB buffers can use up heap quickly
- private void maybeOpenIndexer()
- {
- if (indexer != null)
- return;
- try
- {
- if (cfs.metadata.getDefaultValidator().isCommutative())
- indexer = new CommutativeRowIndexer(desc, cfs, type);
- else
- indexer = new RowIndexer(desc, cfs, type);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- public SSTableReader build() throws IOException
- {
- if (cfs.isInvalid())
- return null;
- maybeOpenIndexer();
-
- File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
- File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
- assert !ifile.exists();
- assert !ffile.exists();
-
- long estimatedRows = indexer.prepareIndexing();
-
- // build the index and filter
- long rows = indexer.index();
-
- logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
- return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, false)));
- }
+ return new Rebuilder(desc, type);
}
static class RowIndexer
@@ -540,75 +465,4 @@ public class SSTableWriter extends SSTab
}
}
- /**
- * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
- */
- static class IndexWriter
- {
- private final BufferedRandomAccessFile indexFile;
- public final Descriptor desc;
- public final IPartitioner partitioner;
- public final SegmentedFile.Builder builder;
- public final IndexSummary summary;
- public final BloomFilter bf;
- private FileMark mark;
-
- IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
- {
- this.desc = desc;
- this.partitioner = part;
- indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
- builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
- summary = new IndexSummary(keyCount);
- bf = BloomFilter.getFilter(keyCount, 15);
- }
-
- public void afterAppend(DecoratedKey key, long dataPosition) throws IOException
- {
- bf.add(key.key);
- long indexPosition = indexFile.getFilePointer();
- ByteBufferUtil.writeWithShortLength(key.key, indexFile);
- indexFile.writeLong(dataPosition);
- if (logger.isTraceEnabled())
- logger.trace("wrote index of " + key + " at " + indexPosition);
-
- summary.maybeAddEntry(key, indexPosition);
- builder.addPotentialBoundary(indexPosition);
- }
-
- /**
- * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
- */
- public void close() throws IOException
- {
- // bloom filter
- FileOutputStream fos = new FileOutputStream(desc.filenameFor(SSTable.COMPONENT_FILTER));
- DataOutputStream stream = new DataOutputStream(fos);
- BloomFilter.serializer().serialize(bf, stream);
- stream.flush();
- fos.getFD().sync();
- stream.close();
-
- // index
- long position = indexFile.getFilePointer();
- indexFile.close(); // calls force
- FileUtils.truncate(indexFile.getPath(), position);
-
- // finalize in-memory index state
- summary.complete();
- }
-
- public void mark()
- {
- mark = indexFile.mark();
- }
-
- public void reset() throws IOException
- {
- // we can't un-set the bloom filter addition, but extra keys in there are harmless.
- // we can't reset dbuilder either, but that is the last thing called in afterappend so
- // we assume that if that worked then we won't be trying to reset.
- indexFile.reset(mark);
- }
- }
}