You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/22 04:32:30 UTC
svn commit: r1138282 - in /cassandra/trunk/src/java/org/apache/cassandra:
db/compaction/CompactionManager.java io/sstable/IndexWriter.java
io/sstable/Rebuilder.java io/sstable/SSTableWriter.java
Author: jbellis
Date: Wed Jun 22 02:32:30 2011
New Revision: 1138282
URL: http://svn.apache.org/viewvc?rev=1138282&view=rev
Log:
revert r1138084
Removed:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1138282&r1=1138281&r2=1138282&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Jun 22 02:32:30 2011
@@ -832,7 +832,7 @@ public class CompactionManager implement
public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
{
// invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
- final Rebuilder builder = SSTableWriter.createBuilder(desc, type);
+ final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc, type);
Callable<SSTableReader> callable = new Callable<SSTableReader>()
{
public SSTableReader call() throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1138282&r1=1138281&r2=1138282&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Wed Jun 22 02:32:30 2011
@@ -19,9 +19,7 @@
package org.apache.cassandra.io.sstable;
-import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -29,6 +27,10 @@ import java.util.HashSet;
import java.util.Set;
import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,10 +39,7 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileMark;
@@ -48,7 +47,7 @@ import org.apache.cassandra.io.util.File
import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.FBUtilities;
@@ -233,7 +232,7 @@ public class SSTableWriter extends SSTab
return dataFile.getFilePointer();
}
- public static Rebuilder createBuilder(Descriptor desc, OperationType type)
+ public static Builder createBuilder(Descriptor desc, OperationType type)
{
if (!desc.isLatestVersion)
// TODO: streaming between different versions will fail: need support for
@@ -241,7 +240,83 @@ public class SSTableWriter extends SSTab
throw new RuntimeException(String.format("Cannot recover SSTable with version %s (current version %s).",
desc.version, Descriptor.CURRENT_VERSION));
- return new Rebuilder(desc, type);
+ return new Builder(desc, type);
+ }
+
+ /**
+ * Removes the given SSTable from temporary status and opens it, rebuilding the
+ * bloom filter and row index from the data file.
+ */
+ public static class Builder implements CompactionInfo.Holder
+ {
+ private final Descriptor desc;
+ private final OperationType type;
+ private final ColumnFamilyStore cfs;
+ private RowIndexer indexer;
+
+ public Builder(Descriptor desc, OperationType type)
+ {
+ this.desc = desc;
+ this.type = type;
+ cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ maybeOpenIndexer();
+ try
+ {
+ // both file offsets are still valid post-close
+ return new CompactionInfo(desc.ksname,
+ desc.cfname,
+ CompactionType.SSTABLE_BUILD,
+ indexer.dfile.getFilePointer(),
+ indexer.dfile.length());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
+ // since the 8MB buffers can use up heap quickly
+ private void maybeOpenIndexer()
+ {
+ if (indexer != null)
+ return;
+ try
+ {
+ if (cfs.metadata.getDefaultValidator().isCommutative())
+ indexer = new CommutativeRowIndexer(desc, cfs, type);
+ else
+ indexer = new RowIndexer(desc, cfs, type);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public SSTableReader build() throws IOException
+ {
+ if (cfs.isInvalid())
+ return null;
+ maybeOpenIndexer();
+
+ File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+ File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+ assert !ifile.exists();
+ assert !ffile.exists();
+
+ long estimatedRows = indexer.prepareIndexing();
+
+ // build the index and filter
+ long rows = indexer.index();
+
+ logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
+ return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, false)));
+ }
}
static class RowIndexer
@@ -465,4 +540,75 @@ public class SSTableWriter extends SSTab
}
}
+ /**
+ * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+ */
+ static class IndexWriter
+ {
+ private final BufferedRandomAccessFile indexFile;
+ public final Descriptor desc;
+ public final IPartitioner partitioner;
+ public final SegmentedFile.Builder builder;
+ public final IndexSummary summary;
+ public final BloomFilter bf;
+ private FileMark mark;
+
+ IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
+ {
+ this.desc = desc;
+ this.partitioner = part;
+ indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
+ builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ summary = new IndexSummary(keyCount);
+ bf = BloomFilter.getFilter(keyCount, 15);
+ }
+
+ public void afterAppend(DecoratedKey key, long dataPosition) throws IOException
+ {
+ bf.add(key.key);
+ long indexPosition = indexFile.getFilePointer();
+ ByteBufferUtil.writeWithShortLength(key.key, indexFile);
+ indexFile.writeLong(dataPosition);
+ if (logger.isTraceEnabled())
+ logger.trace("wrote index of " + key + " at " + indexPosition);
+
+ summary.maybeAddEntry(key, indexPosition);
+ builder.addPotentialBoundary(indexPosition);
+ }
+
+ /**
+ * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+ */
+ public void close() throws IOException
+ {
+ // bloom filter
+ FileOutputStream fos = new FileOutputStream(desc.filenameFor(SSTable.COMPONENT_FILTER));
+ DataOutputStream stream = new DataOutputStream(fos);
+ BloomFilter.serializer().serialize(bf, stream);
+ stream.flush();
+ fos.getFD().sync();
+ stream.close();
+
+ // index
+ long position = indexFile.getFilePointer();
+ indexFile.close(); // calls force
+ FileUtils.truncate(indexFile.getPath(), position);
+
+ // finalize in-memory index state
+ summary.complete();
+ }
+
+ public void mark()
+ {
+ mark = indexFile.mark();
+ }
+
+ public void reset() throws IOException
+ {
+ // we can't un-set the bloom filter addition, but extra keys in there are harmless.
+ // we can't reset dbuilder either, but that is the last thing called in afterappend so
+ // we assume that if that worked then we won't be trying to reset.
+ indexFile.reset(mark);
+ }
+ }
}