You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/21 18:46:40 UTC

svn commit: r1138084 - in /cassandra/trunk/src/java/org/apache/cassandra: db/compaction/CompactionManager.java io/sstable/IndexWriter.java io/sstable/Rebuilder.java io/sstable/SSTableWriter.java

Author: jbellis
Date: Tue Jun 21 16:46:40 2011
New Revision: 1138084

URL: http://svn.apache.org/viewvc?rev=1138084&view=rev
Log:
extract indexwriter, builder -> rebuilder

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1138084&r1=1138083&r2=1138084&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Jun 21 16:46:40 2011
@@ -832,7 +832,7 @@ public class CompactionManager implement
     public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
     {
         // invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
-        final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc, type);
+        final Rebuilder builder = SSTableWriter.createBuilder(desc, type);
         Callable<SSTableReader> callable = new Callable<SSTableReader>()
         {
             public SSTableReader call() throws IOException

Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java?rev=1138084&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexWriter.java Tue Jun 21 16:46:40 2011
@@ -0,0 +1,93 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+ */
+class IndexWriter
+{
+    private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
+
+    private final BufferedRandomAccessFile indexFile;
+    public final Descriptor desc;
+    public final IPartitioner partitioner;
+    public final SegmentedFile.Builder builder;
+    public final IndexSummary summary;
+    public final BloomFilter bf;
+    private FileMark mark;
+
+    IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
+    {
+        this.desc = desc;
+        this.partitioner = part;
+        indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
+        builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+        summary = new IndexSummary(keyCount);
+        bf = BloomFilter.getFilter(keyCount, 15);
+    }
+
+    public void afterAppend(DecoratedKey key, long dataPosition) throws IOException
+    {
+        bf.add(key.key);
+        long indexPosition = indexFile.getFilePointer();
+        ByteBufferUtil.writeWithShortLength(key.key, indexFile);
+        indexFile.writeLong(dataPosition);
+        if (logger.isTraceEnabled())
+            logger.trace("wrote index of " + key + " at " + indexPosition);
+
+        summary.maybeAddEntry(key, indexPosition);
+        builder.addPotentialBoundary(indexPosition);
+    }
+
+    /**
+     * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+     */
+    public void close() throws IOException
+    {
+        // bloom filter
+        FileOutputStream fos = new FileOutputStream(desc.filenameFor(SSTable.COMPONENT_FILTER));
+        DataOutputStream stream = new DataOutputStream(fos);
+        BloomFilter.serializer().serialize(bf, stream);
+        stream.flush();
+        fos.getFD().sync();
+        stream.close();
+
+        // index
+        long position = indexFile.getFilePointer();
+        indexFile.close(); // calls force
+        FileUtils.truncate(indexFile.getPath(), position);
+
+        // finalize in-memory index state
+        summary.complete();
+    }
+
+    public void mark()
+    {
+        mark = indexFile.mark();
+    }
+
+    public void reset() throws IOException
+    {
+        // we can't un-set the bloom filter addition, but extra keys in there are harmless.
+        // we can't reset dbuilder either, but that is the last thing called in afterappend so
+        // we assume that if that worked then we won't be trying to reset.
+        indexFile.reset(mark);
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java?rev=1138084&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Rebuilder.java Tue Jun 21 16:46:40 2011
@@ -0,0 +1,92 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionType;
+import org.apache.cassandra.streaming.OperationType;
+
+/**
+ * Removes the given SSTable from temporary status and opens it, rebuilding the
+ * bloom filter and row index from the data file.
+ */
+public class Rebuilder implements CompactionInfo.Holder
+{
+    private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
+
+    private final Descriptor desc;
+    private final OperationType type;
+    private final ColumnFamilyStore cfs;
+    private SSTableWriter.RowIndexer indexer;
+
+    public Rebuilder(Descriptor desc, OperationType type)
+    {
+        this.desc = desc;
+        this.type = type;
+        cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        maybeOpenIndexer();
+        try
+        {
+            // both file offsets are still valid post-close
+            return new CompactionInfo(desc.ksname,
+                                      desc.cfname,
+                                      CompactionType.SSTABLE_BUILD,
+                                      indexer.dfile.getFilePointer(),
+                                      indexer.dfile.length());
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
+    // since the 8MB buffers can use up heap quickly
+    private void maybeOpenIndexer()
+    {
+        if (indexer != null)
+            return;
+        try
+        {
+            if (cfs.metadata.getDefaultValidator().isCommutative())
+                indexer = new SSTableWriter.CommutativeRowIndexer(desc, cfs, type);
+            else
+                indexer = new SSTableWriter.RowIndexer(desc, cfs, type);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public SSTableReader build() throws IOException
+    {
+        if (cfs.isInvalid())
+            return null;
+        maybeOpenIndexer();
+
+        File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+        File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+        assert !ifile.exists();
+        assert !ffile.exists();
+
+        long estimatedRows = indexer.prepareIndexing();
+
+        // build the index and filter
+        long rows = indexer.index();
+
+        logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
+        return SSTableReader.open(SSTableWriter.rename(desc, SSTable.componentsFor(desc, false)));
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1138084&r1=1138083&r2=1138084&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jun 21 16:46:40 2011
@@ -19,7 +19,9 @@
 
 package org.apache.cassandra.io.sstable;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
@@ -27,10 +29,6 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +37,10 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.PrecompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileMark;
@@ -47,7 +48,7 @@ import org.apache.cassandra.io.util.File
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -232,7 +233,7 @@ public class SSTableWriter extends SSTab
         return dataFile.getFilePointer();
     }
     
-    public static Builder createBuilder(Descriptor desc, OperationType type)
+    public static Rebuilder createBuilder(Descriptor desc, OperationType type)
     {
         if (!desc.isLatestVersion)
             // TODO: streaming between different versions will fail: need support for
@@ -240,83 +241,7 @@ public class SSTableWriter extends SSTab
             throw new RuntimeException(String.format("Cannot recover SSTable with version %s (current version %s).",
                                                      desc.version, Descriptor.CURRENT_VERSION));
 
-        return new Builder(desc, type);
-    }
-
-    /**
-     * Removes the given SSTable from temporary status and opens it, rebuilding the
-     * bloom filter and row index from the data file.
-     */
-    public static class Builder implements CompactionInfo.Holder
-    {
-        private final Descriptor desc;
-        private final OperationType type;
-        private final ColumnFamilyStore cfs;
-        private RowIndexer indexer;
-
-        public Builder(Descriptor desc, OperationType type)
-        {
-            this.desc = desc;
-            this.type = type;
-            cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
-        }
-
-        public CompactionInfo getCompactionInfo()
-        {
-            maybeOpenIndexer();
-            try
-            {
-                // both file offsets are still valid post-close
-                return new CompactionInfo(desc.ksname,
-                                          desc.cfname,
-                                          CompactionType.SSTABLE_BUILD,
-                                          indexer.dfile.getFilePointer(),
-                                          indexer.dfile.length());
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
-        // since the 8MB buffers can use up heap quickly
-        private void maybeOpenIndexer()
-        {
-            if (indexer != null)
-                return;
-            try
-            {
-                if (cfs.metadata.getDefaultValidator().isCommutative())
-                    indexer = new CommutativeRowIndexer(desc, cfs, type);
-                else
-                    indexer = new RowIndexer(desc, cfs, type);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        public SSTableReader build() throws IOException
-        {
-            if (cfs.isInvalid())
-                return null;
-            maybeOpenIndexer();
-
-            File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
-            File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
-            assert !ifile.exists();
-            assert !ffile.exists();
-
-            long estimatedRows = indexer.prepareIndexing();
-
-            // build the index and filter
-            long rows = indexer.index();
-
-            logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
-            return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, false)));
-        }
+        return new Rebuilder(desc, type);
     }
 
     static class RowIndexer
@@ -540,75 +465,4 @@ public class SSTableWriter extends SSTab
         }
     }
 
-    /**
-     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
-     */
-    static class IndexWriter
-    {
-        private final BufferedRandomAccessFile indexFile;
-        public final Descriptor desc;
-        public final IPartitioner partitioner;
-        public final SegmentedFile.Builder builder;
-        public final IndexSummary summary;
-        public final BloomFilter bf;
-        private FileMark mark;
-
-        IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
-        {
-            this.desc = desc;
-            this.partitioner = part;
-            indexFile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
-            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
-            summary = new IndexSummary(keyCount);
-            bf = BloomFilter.getFilter(keyCount, 15);
-        }
-
-        public void afterAppend(DecoratedKey key, long dataPosition) throws IOException
-        {
-            bf.add(key.key);
-            long indexPosition = indexFile.getFilePointer();
-            ByteBufferUtil.writeWithShortLength(key.key, indexFile);
-            indexFile.writeLong(dataPosition);
-            if (logger.isTraceEnabled())
-                logger.trace("wrote index of " + key + " at " + indexPosition);
-
-            summary.maybeAddEntry(key, indexPosition);
-            builder.addPotentialBoundary(indexPosition);
-        }
-
-        /**
-         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
-         */
-        public void close() throws IOException
-        {
-            // bloom filter
-            FileOutputStream fos = new FileOutputStream(desc.filenameFor(SSTable.COMPONENT_FILTER));
-            DataOutputStream stream = new DataOutputStream(fos);
-            BloomFilter.serializer().serialize(bf, stream);
-            stream.flush();
-            fos.getFD().sync();
-            stream.close();
-
-            // index
-            long position = indexFile.getFilePointer();
-            indexFile.close(); // calls force
-            FileUtils.truncate(indexFile.getPath(), position);
-
-            // finalize in-memory index state
-            summary.complete();
-        }
-
-        public void mark()
-        {
-            mark = indexFile.mark();
-        }
-
-        public void reset() throws IOException
-        {
-            // we can't un-set the bloom filter addition, but extra keys in there are harmless.
-            // we can't reset dbuilder either, but that is the last thing called in afterappend so
-            // we assume that if that worked then we won't be trying to reset.
-            indexFile.reset(mark);
-        }
-    }
 }