You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/22 04:32:30 UTC

svn commit: r1138282 - in /cassandra/trunk/src/java/org/apache/cassandra: db/compaction/CompactionManager.java io/sstable/IndexWriter.java io/sstable/Rebuilder.java io/sstable/SSTableWriter.java

Author: jbellis
Date: Wed Jun 22 02:32:30 2011
New Revision: 1138282

URL: http://svn.apache.org/viewvc?rev=1138282&view=rev
Log:
revert r1138084

Removed:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1138282&r1=1138281&r2=1138282&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Jun 22 02:32:30 2011
@@ -832,7 +832,7 @@ public class CompactionManager implement
     public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
     {
         // invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
-        final Rebuilder builder = SSTableWriter.createBuilder(desc, type);
+        final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc, type);
         Callable<SSTableReader> callable = new Callable<SSTableReader>()
         {
             public SSTableReader call() throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1138282&r1=1138281&r2=1138282&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Wed Jun 22 02:32:30 2011
@@ -19,9 +19,7 @@
 
 package org.apache.cassandra.io.sstable;
 
-import java.io.File;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,6 +27,10 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,10 +39,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileMark;
@@ -48,7 +47,7 @@ import org.apache.cassandra.io.util.File
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -233,7 +232,7 @@ public class SSTableWriter extends SSTab
         return dataFile.getFilePointer();
     }
     
-    public static Rebuilder createBuilder(Descriptor desc, OperationType type)
+    public static Builder createBuilder(Descriptor desc, OperationType type)
     {
         if (!desc.isLatestVersion)
             // TODO: streaming between different versions will fail: need support for
@@ -241,7 +240,83 @@ public class SSTableWriter extends SSTab
             throw new RuntimeException(String.format("Cannot recover SSTable with version %s (current version %s).",
                                                      desc.version, Descriptor.CURRENT_VERSION));
 
-        return new Rebuilder(desc, type);
+        return new Builder(desc, type);
+    }
+
+    /**
+     * Removes the given SSTable from temporary status and opens it, rebuilding the
+     * bloom filter and row index from the data file.
+     */
+    public static class Builder implements CompactionInfo.Holder
+    {
+        private final Descriptor desc;
+        private final OperationType type;
+        private final ColumnFamilyStore cfs;
+        private RowIndexer indexer;
+
+        public Builder(Descriptor desc, OperationType type)
+        {
+            this.desc = desc;
+            this.type = type;
+            cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
+        }
+
+        public CompactionInfo getCompactionInfo()
+        {
+            maybeOpenIndexer();
+            try
+            {
+                // both file offsets are still valid post-close
+                return new CompactionInfo(desc.ksname,
+                                          desc.cfname,
+                                          CompactionType.SSTABLE_BUILD,
+                                          indexer.dfile.getFilePointer(),
+                                          indexer.dfile.length());
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+        // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
+        // since the 8MB buffers can use up heap quickly
+        private void maybeOpenIndexer()
+        {
+            if (indexer != null)
+                return;
+            try
+            {
+                if (cfs.metadata.getDefaultValidator().isCommutative())
+                    indexer = new CommutativeRowIndexer(desc, cfs, type);
+                else
+                    indexer = new RowIndexer(desc, cfs, type);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+        public SSTableReader build() throws IOException
+        {
+            if (cfs.isInvalid())
+                return null;
+            maybeOpenIndexer();
+
+            File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+            File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+            assert !ifile.exists();
+            assert !ffile.exists();
+
+            long estimatedRows = indexer.prepareIndexing();
+
+            // build the index and filter
+            long rows = indexer.index();
+
+            logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
+            return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, false)));
+        }
     }
 
     static class RowIndexer
@@ -465,4 +540,75 @@ public class SSTableWriter extends SSTab
         }
     }
 
+    /**
+     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+     */
+    static class IndexWriter
+    {
+        private final BufferedRandomAccessFile indexFile;
+        public final Descriptor desc;
+        public final IPartitioner partitioner;
+        public final SegmentedFile.Builder builder;
+        public final IndexSummary summary;
+        public final BloomFilter bf;
+        private FileMark mark;
+
+        IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
+        {
+            this.desc = desc;
+            this.partitioner = part;
+            indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
+            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+            summary = new IndexSummary(keyCount);
+            bf = BloomFilter.getFilter(keyCount, 15);
+        }
+
+        public void afterAppend(DecoratedKey key, long dataPosition) throws IOException
+        {
+            bf.add(key.key);
+            long indexPosition = indexFile.getFilePointer();
+            ByteBufferUtil.writeWithShortLength(key.key, indexFile);
+            indexFile.writeLong(dataPosition);
+            if (logger.isTraceEnabled())
+                logger.trace("wrote index of " + key + " at " + indexPosition);
+
+            summary.maybeAddEntry(key, indexPosition);
+            builder.addPotentialBoundary(indexPosition);
+        }
+
+        /**
+         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+         */
+        public void close() throws IOException
+        {
+            // bloom filter
+            FileOutputStream fos = new FileOutputStream(desc.filenameFor(SSTable.COMPONENT_FILTER));
+            DataOutputStream stream = new DataOutputStream(fos);
+            BloomFilter.serializer().serialize(bf, stream);
+            stream.flush();
+            fos.getFD().sync();
+            stream.close();
+
+            // index
+            long position = indexFile.getFilePointer();
+            indexFile.close(); // calls force
+            FileUtils.truncate(indexFile.getPath(), position);
+
+            // finalize in-memory index state
+            summary.complete();
+        }
+
+        public void mark()
+        {
+            mark = indexFile.mark();
+        }
+
+        public void reset() throws IOException
+        {
+            // we can't un-set the bloom filter addition, but extra keys in there are harmless.
+            // we can't reset dbuilder either, but that is the last thing called in afterappend so
+            // we assume that if that worked then we won't be trying to reset.
+            indexFile.reset(mark);
+        }
+    }
 }