You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/18 07:33:44 UTC

svn commit: r955840 - in /cassandra/trunk: src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/streaming/

Author: jbellis
Date: Fri Jun 18 05:33:44 2010
New Revision: 955840

URL: http://svn.apache.org/viewvc?rev=955840&view=rev
Log:
Extract index/filter writing into IndexWriter; add recovery for non-essential sstable components; only send the datafile when streaming
patch by Stu Hood; reviewed by jbellis for CASSANDRA-579

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Fri Jun 18 05:33:44 2010
@@ -159,4 +159,9 @@ public class Descriptor
     {
         return version.compareTo("d") < 0;
     }
+
+    public boolean isLatestVersion()
+    {
+        return version.compareTo(CURRENT_VERSION) == 0;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Fri Jun 18 05:33:44 2010
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.io.util.FileUtils;
 
 /**
@@ -58,8 +57,6 @@ public abstract class SSTable
 
     protected Descriptor desc;
     protected IPartitioner partitioner;
-    protected BloomFilter bf;
-    protected IndexSummary indexSummary;
 
     public static final String TEMPFILE_MARKER = "tmp";
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Jun 18 05:33:44 2010
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.utils.BloomFilter;
 
 /**
  * SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
@@ -109,6 +110,9 @@ public class SSTableReader extends SSTab
     private SegmentedFile ifile;
     private SegmentedFile dfile;
 
+    private IndexSummary indexSummary;
+    private BloomFilter bf;
+
     private InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
 
     private volatile SSTableDeletingReference phantomReference;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Jun 18 05:33:44 2010
@@ -37,10 +37,7 @@
 
 package org.apache.cassandra.io.sstable;
 
-import java.io.DataOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
 
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.slf4j.Logger;
@@ -52,6 +49,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -59,22 +57,17 @@ public class SSTableWriter extends SSTab
 {
     private static Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
 
-    private SegmentedFile.Builder ibuilder;
+    private IndexWriter iwriter;
     private SegmentedFile.Builder dbuilder;
     private final BufferedRandomAccessFile dataFile;
-    private final BufferedRandomAccessFile indexFile;
-    private final BloomFilter bf;
     private DecoratedKey lastWrittenKey;
 
     public SSTableWriter(String filename, long keyCount, IPartitioner partitioner) throws IOException
     {
         super(filename, partitioner);
-        indexSummary = new IndexSummary();
-        ibuilder = SegmentedFile.getBuilder();
+        iwriter = new IndexWriter(desc, partitioner, keyCount);
         dbuilder = SegmentedFile.getBuilder();
         dataFile = new BufferedRandomAccessFile(getFilename(), "rw", (int)(DatabaseDescriptor.getFlushDataBufferSizeInMB() * 1024 * 1024));
-        indexFile = new BufferedRandomAccessFile(indexFilename(), "rw", (int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB() * 1024 * 1024));
-        bf = BloomFilter.getFilter(keyCount, 15);
     }
 
     private long beforeAppend(DecoratedKey decoratedKey) throws IOException
@@ -95,20 +88,12 @@ public class SSTableWriter extends SSTab
 
     private void afterAppend(DecoratedKey decoratedKey, long dataPosition) throws IOException
     {
-        byte[] diskKey = partitioner.convertToDiskFormat(decoratedKey);
-        bf.add(diskKey);
         lastWrittenKey = decoratedKey;
-        long indexPosition = indexFile.getFilePointer();
-        FBUtilities.writeShortByteArray(diskKey, indexFile);
-        indexFile.writeLong(dataPosition);
+
         if (logger.isTraceEnabled())
             logger.trace("wrote " + decoratedKey + " at " + dataPosition);
-        if (logger.isTraceEnabled())
-            logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
-
-        indexSummary.maybeAddEntry(decoratedKey, indexPosition);
-        ibuilder.addPotentialBoundary(indexPosition);
         dbuilder.addPotentialBoundary(dataPosition);
+        iwriter.afterAppend(decoratedKey, dataPosition);
     }
 
     public void append(AbstractCompactedRow row) throws IOException
@@ -148,17 +133,8 @@ public class SSTableWriter extends SSTab
 
     public SSTableReader closeAndOpenReader(long maxDataAge) throws IOException
     {
-        // bloom filter
-        FileOutputStream fos = new FileOutputStream(filterFilename());
-        DataOutputStream stream = new DataOutputStream(fos);
-        BloomFilter.serializer().serialize(bf, stream);
-        stream.flush();
-        fos.getFD().sync();
-        stream.close();
-
-        // index
-        indexFile.getChannel().force(true);
-        indexFile.close();
+        // index and filter
+        iwriter.close();
 
         // main data
         dataFile.close(); // calls force
@@ -167,12 +143,12 @@ public class SSTableWriter extends SSTab
         Descriptor newdesc = rename(desc);
 
         // finalize in-memory state for the reader
-        indexSummary.complete();
-        SegmentedFile ifile = ibuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
+        SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
         SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
-        ibuilder = null;
+        SSTableReader sstable = SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge);
+        iwriter = null;
         dbuilder = null;
-        return SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge);
+        return sstable;
     }
 
     static Descriptor rename(Descriptor tmpdesc)
@@ -195,8 +171,160 @@ public class SSTableWriter extends SSTab
         return dataFile.getFilePointer();
     }
     
-    public static SSTableReader renameAndOpen(Descriptor tmpdesc) throws IOException
-    {
-        return SSTableReader.open(rename(tmpdesc));
+    /**
+     * @return An estimate of the number of keys contained in the given data file.
+     */
+    private static long estimateRows(Descriptor desc, BufferedRandomAccessFile dfile) throws IOException
+    {
+        // collect sizes for the first 1000 keys, or first 100 megabytes of data
+        final int SAMPLES_CAP = 1000, BYTES_CAP = (int)Math.min(100000000, dfile.length());
+        int keys = 0;
+        long dataPosition = 0;
+        while (dataPosition < BYTES_CAP && keys < SAMPLES_CAP)
+        {
+            dfile.seek(dataPosition);
+            FBUtilities.readShortByteArray(dfile);
+            long dataSize = SSTableReader.readRowSize(dfile, desc);
+            dataPosition = dfile.getFilePointer() + dataSize;
+            keys++;
+        }
+        dfile.seek(0);
+        return dfile.length() / (dataPosition / keys);
+    }
+
+    /**
+     * If either of the index or filter files are missing, rebuilds both.
+     * TODO: Builds most of the in-memory state of the sstable, but doesn't actually open it.
+     */
+    private static void maybeRecover(Descriptor desc) throws IOException
+    {
+        File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+        File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+        if (ifile.exists() && ffile.exists())
+            // nothing to do
+            return;
+
+        // remove existing files
+        ifile.delete();
+        ffile.delete();
+
+        // open the data file for input, and an IndexWriter for output
+        BufferedRandomAccessFile dfile = new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA), "r", (int)(DatabaseDescriptor.getFlushDataBufferSizeInMB() * 1024 * 1024));
+        IndexWriter iwriter;
+        long estimatedRows;
+        try
+        {
+            estimatedRows = estimateRows(desc, dfile);
+            iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
+        }
+        catch(IOException e)
+        {
+            dfile.close();
+            throw e;
+        }
+
+        // build the index and filter
+        long rows = 0;
+        try
+        {
+            DecoratedKey key;
+            long dataPosition = 0;
+            while (dataPosition < dfile.length())
+            {
+                key = StorageService.getPartitioner().convertFromDiskFormat(FBUtilities.readShortByteArray(dfile));
+                long dataSize = SSTableReader.readRowSize(dfile, desc);
+                iwriter.afterAppend(key, dataPosition);
+                dataPosition = dfile.getFilePointer() + dataSize;
+                dfile.seek(dataPosition);
+                rows++;
+            }
+        }
+        finally
+        {
+            try
+            {
+                dfile.close();
+                iwriter.close();
+            }
+            catch (IOException e)
+            {
+                logger.error("Failed to close data or index file during recovery of " + desc, e);
+            }
+        }
+
+        logger.debug("estimated row count was %s of real count", ((double)estimatedRows) / rows);
+    }
+
+    /**
+     * Removes the given SSTable from temporary status and opens it, rebuilding the non-essential portions of the
+     * file if necessary.
+     */
+    public static SSTableReader recoverAndOpen(Descriptor desc) throws IOException
+    {
+        if (!desc.isLatestVersion())
+            throw new RuntimeException(String.format("Cannot recover SSTable with version %s (current version %s).",
+                                                     desc.version, Descriptor.CURRENT_VERSION));
+
+        maybeRecover(desc);
+        return SSTableReader.open(rename(desc));
+    }
+
+    /**
+     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+     */
+    static class IndexWriter
+    {
+        private final BufferedRandomAccessFile indexFile;
+        public final Descriptor desc;
+        public final IPartitioner partitioner;
+        public final SegmentedFile.Builder builder;
+        public final IndexSummary summary;
+        public final BloomFilter bf;
+        
+        IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
+        {
+            this.desc = desc;
+            this.partitioner = part;
+            int bufferbytes = (int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB() * 1024 * 1024);
+            indexFile = new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_INDEX), "rw", bufferbytes);
+            builder = SegmentedFile.getBuilder();
+            summary = new IndexSummary();
+            bf = BloomFilter.getFilter(keyCount, 15);
+        }
+
+        public void afterAppend(DecoratedKey key, long dataPosition) throws IOException
+        {
+            byte[] diskKey = partitioner.convertToDiskFormat(key);
+            bf.add(diskKey);
+            long indexPosition = indexFile.getFilePointer();
+            FBUtilities.writeShortByteArray(diskKey, indexFile);
+            indexFile.writeLong(dataPosition);
+            if (logger.isTraceEnabled())
+                logger.trace("wrote index of " + key + " at " + indexPosition);
+
+            summary.maybeAddEntry(key, indexPosition);
+            builder.addPotentialBoundary(indexPosition);
+        }
+
+        /**
+         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+         */
+        public void close() throws IOException
+        {
+            // bloom filter
+            FileOutputStream fos = new FileOutputStream(desc.filenameFor(SSTable.COMPONENT_FILTER));
+            DataOutputStream stream = new DataOutputStream(fos);
+            BloomFilter.serializer().serialize(bf, stream);
+            stream.flush();
+            fos.getFD().sync();
+            stream.close();
+
+            // index
+            indexFile.getChannel().force(true);
+            indexFile.close();
+
+            // finalize in-memory index state
+            summary.complete();
+        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java Fri Jun 18 05:33:44 2010
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.net.MessagingService;
@@ -52,26 +54,17 @@ class FileStatusHandler
         assert FileStatus.Action.DELETE == streamStatus.getAction() :
             "Unknown stream action: " + streamStatus.getAction();
 
-        // file was successfully streamed: if it was the last component of an sstable, assume that the rest
-        // have already arrived
-        if (pendingFile.getFilename().endsWith("-Data.db"))
+        // file was successfully streamed
+        Descriptor desc = pendingFile.getDescriptor();
+        try
         {
-            // last component triggers add: see TODO in SSTable.getAllComponents()
-            String tableName = pendingFile.getDescriptor().ksname;
-            File file = new File(pendingFile.getFilename());
-            String fileName = file.getName();
-            String [] temp = fileName.split("-");
-
-            try
-            {
-                SSTableReader sstable = SSTableWriter.renameAndOpen(pendingFile.getDescriptor());
-                Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
-                logger.info("Streaming added " + sstable.getFilename());
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException("Not able to add streamed file " + pendingFile.getFilename(), e);
-            }
+            SSTableReader sstable = SSTableWriter.recoverAndOpen(pendingFile.getDescriptor());
+            Table.open(desc.ksname).getColumnFamilyStore(desc.cfname).addSSTable(sstable);
+            logger.info("Streaming added " + sstable);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Not able to add streamed file " + pendingFile.getFilename(), e);
         }
 
         // send a StreamStatus message telling the source node it can delete this file

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Fri Jun 18 05:33:44 2010
@@ -103,20 +103,15 @@ public class StreamInitiateVerbHandler i
     {
         /* Create a local sstable for each remote sstable */
         LinkedHashMap<PendingFile, PendingFile> mapping = new LinkedHashMap<PendingFile, PendingFile>();
-        Map<Descriptor, Descriptor> sstables = new HashMap<Descriptor, Descriptor>();
         for (PendingFile remote : remoteFiles)
         {
             Descriptor remotedesc = remote.getDescriptor();
-            Descriptor localdesc = sstables.get(remotedesc);
-            if (localdesc == null)
-            {
-                // new local sstable
-                Table table = Table.open(remotedesc.ksname);
-                ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
 
-                localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
-                sstables.put(remotedesc, localdesc);
-            }
+            // new local sstable
+            Table table = Table.open(remotedesc.ksname);
+            ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
+
+            Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
 
             // add a local file for this component
             mapping.put(remote, new PendingFile(localdesc, remote.getComponent(), remote.getExpectedBytes()));

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Fri Jun 18 05:33:44 2010
@@ -117,16 +117,13 @@ public class StreamOut
      */
     public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
     {
-        PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK * sstables.size()];
+        PendingFile[] pendingFiles = new PendingFile[sstables.size()];
         int i = 0;
         for (SSTableReader sstable : sstables)
         {
-            for (String component : SSTable.components)
-            {
-                Descriptor desc = sstable.getDescriptor();
-                long filelen = new File(desc.filenameFor(component)).length();
-                pendingFiles[i++] = new PendingFile(desc, component, filelen);
-            }
+            Descriptor desc = sstable.getDescriptor();
+            long filelen = new File(desc.filenameFor(SSTable.COMPONENT_DATA)).length();
+            pendingFiles[i++] = new PendingFile(desc, SSTable.COMPONENT_DATA, filelen);
         }
         logger.info("Stream context metadata " + StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
         StreamOutManager.get(target).addFilesToStream(pendingFiles);
@@ -145,5 +142,4 @@ public class StreamOut
             logger.info("Done with transfer to " + target);
         }
     }
-
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Fri Jun 18 05:33:44 2010
@@ -35,9 +35,7 @@ public class BootstrapTest extends Schem
     public void testGetNewNames() throws IOException
     {
         Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Standard1-500-Data.db").toString());
-        PendingFile[] pendingFiles = new PendingFile[]{ new PendingFile(desc, "Data.db", 100),
-                                                        new PendingFile(desc, "Index.db", 100),
-                                                        new PendingFile(desc, "Filter.db", 100) };
+        PendingFile[] pendingFiles = new PendingFile[]{ new PendingFile(desc, "Data.db", 100) };
         StreamInitiateVerbHandler bivh = new StreamInitiateVerbHandler();
 
         // map the input (remote) contexts to output (local) contexts