You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/18 07:33:44 UTC
svn commit: r955840 - in /cassandra/trunk:
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/streaming/
test/unit/org/apache/cassandra/streaming/
Author: jbellis
Date: Fri Jun 18 05:33:44 2010
New Revision: 955840
URL: http://svn.apache.org/viewvc?rev=955840&view=rev
Log:
Extract index/filter writing into IndexWriter; add recovery for non-essential sstable components; only send the datafile when streaming
patch by Stu Hood; reviewed by jbellis for CASSANDRA-579
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Fri Jun 18 05:33:44 2010
@@ -159,4 +159,9 @@ public class Descriptor
{
return version.compareTo("d") < 0;
}
+
+ public boolean isLatestVersion()
+ {
+ return version.compareTo(CURRENT_VERSION) == 0;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Fri Jun 18 05:33:44 2010
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.io.util.FileUtils;
/**
@@ -58,8 +57,6 @@ public abstract class SSTable
protected Descriptor desc;
protected IPartitioner partitioner;
- protected BloomFilter bf;
- protected IndexSummary indexSummary;
public static final String TEMPFILE_MARKER = "tmp";
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Jun 18 05:33:44 2010
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.utils.BloomFilter;
/**
* SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
@@ -109,6 +110,9 @@ public class SSTableReader extends SSTab
private SegmentedFile ifile;
private SegmentedFile dfile;
+ private IndexSummary indexSummary;
+ private BloomFilter bf;
+
private InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
private volatile SSTableDeletingReference phantomReference;
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Jun 18 05:33:44 2010
@@ -37,10 +37,7 @@
package org.apache.cassandra.io.sstable;
-import java.io.DataOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
import org.apache.cassandra.io.AbstractCompactedRow;
import org.slf4j.Logger;
@@ -52,6 +49,7 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FBUtilities;
@@ -59,22 +57,17 @@ public class SSTableWriter extends SSTab
{
private static Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
- private SegmentedFile.Builder ibuilder;
+ private IndexWriter iwriter;
private SegmentedFile.Builder dbuilder;
private final BufferedRandomAccessFile dataFile;
- private final BufferedRandomAccessFile indexFile;
- private final BloomFilter bf;
private DecoratedKey lastWrittenKey;
public SSTableWriter(String filename, long keyCount, IPartitioner partitioner) throws IOException
{
super(filename, partitioner);
- indexSummary = new IndexSummary();
- ibuilder = SegmentedFile.getBuilder();
+ iwriter = new IndexWriter(desc, partitioner, keyCount);
dbuilder = SegmentedFile.getBuilder();
dataFile = new BufferedRandomAccessFile(getFilename(), "rw", (int)(DatabaseDescriptor.getFlushDataBufferSizeInMB() * 1024 * 1024));
- indexFile = new BufferedRandomAccessFile(indexFilename(), "rw", (int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB() * 1024 * 1024));
- bf = BloomFilter.getFilter(keyCount, 15);
}
private long beforeAppend(DecoratedKey decoratedKey) throws IOException
@@ -95,20 +88,12 @@ public class SSTableWriter extends SSTab
private void afterAppend(DecoratedKey decoratedKey, long dataPosition) throws IOException
{
- byte[] diskKey = partitioner.convertToDiskFormat(decoratedKey);
- bf.add(diskKey);
lastWrittenKey = decoratedKey;
- long indexPosition = indexFile.getFilePointer();
- FBUtilities.writeShortByteArray(diskKey, indexFile);
- indexFile.writeLong(dataPosition);
+
if (logger.isTraceEnabled())
logger.trace("wrote " + decoratedKey + " at " + dataPosition);
- if (logger.isTraceEnabled())
- logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
-
- indexSummary.maybeAddEntry(decoratedKey, indexPosition);
- ibuilder.addPotentialBoundary(indexPosition);
dbuilder.addPotentialBoundary(dataPosition);
+ iwriter.afterAppend(decoratedKey, dataPosition);
}
public void append(AbstractCompactedRow row) throws IOException
@@ -148,17 +133,8 @@ public class SSTableWriter extends SSTab
public SSTableReader closeAndOpenReader(long maxDataAge) throws IOException
{
- // bloom filter
- FileOutputStream fos = new FileOutputStream(filterFilename());
- DataOutputStream stream = new DataOutputStream(fos);
- BloomFilter.serializer().serialize(bf, stream);
- stream.flush();
- fos.getFD().sync();
- stream.close();
-
- // index
- indexFile.getChannel().force(true);
- indexFile.close();
+ // index and filter
+ iwriter.close();
// main data
dataFile.close(); // calls force
@@ -167,12 +143,12 @@ public class SSTableWriter extends SSTab
Descriptor newdesc = rename(desc);
// finalize in-memory state for the reader
- indexSummary.complete();
- SegmentedFile ifile = ibuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
+ SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
- ibuilder = null;
+ SSTableReader sstable = SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge);
+ iwriter = null;
dbuilder = null;
- return SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge);
+ return sstable;
}
static Descriptor rename(Descriptor tmpdesc)
@@ -195,8 +171,160 @@ public class SSTableWriter extends SSTab
return dataFile.getFilePointer();
}
- public static SSTableReader renameAndOpen(Descriptor tmpdesc) throws IOException
- {
- return SSTableReader.open(rename(tmpdesc));
+ /**
+ * @return An estimate of the number of keys contained in the given data file.
+ */
+ private static long estimateRows(Descriptor desc, BufferedRandomAccessFile dfile) throws IOException
+ {
+ // collect sizes for the first 1000 keys, or first 100 megabytes of data
+ final int SAMPLES_CAP = 1000, BYTES_CAP = (int)Math.min(100000000, dfile.length());
+ int keys = 0;
+ long dataPosition = 0;
+ while (dataPosition < BYTES_CAP && keys < SAMPLES_CAP)
+ {
+ dfile.seek(dataPosition);
+ FBUtilities.readShortByteArray(dfile);
+ long dataSize = SSTableReader.readRowSize(dfile, desc);
+ dataPosition = dfile.getFilePointer() + dataSize;
+ keys++;
+ }
+ dfile.seek(0);
+ return dfile.length() / (dataPosition / keys);
+ }
+
+ /**
+ * If either of the index or filter files are missing, rebuilds both.
+ * TODO: Builds most of the in-memory state of the sstable, but doesn't actually open it.
+ */
+ private static void maybeRecover(Descriptor desc) throws IOException
+ {
+ File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+ File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+ if (ifile.exists() && ffile.exists())
+ // nothing to do
+ return;
+
+ // remove existing files
+ ifile.delete();
+ ffile.delete();
+
+ // open the data file for input, and an IndexWriter for output
+ BufferedRandomAccessFile dfile = new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA), "r", (int)(DatabaseDescriptor.getFlushDataBufferSizeInMB() * 1024 * 1024));
+ IndexWriter iwriter;
+ long estimatedRows;
+ try
+ {
+ estimatedRows = estimateRows(desc, dfile);
+ iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
+ }
+ catch(IOException e)
+ {
+ dfile.close();
+ throw e;
+ }
+
+ // build the index and filter
+ long rows = 0;
+ try
+ {
+ DecoratedKey key;
+ long dataPosition = 0;
+ while (dataPosition < dfile.length())
+ {
+ key = StorageService.getPartitioner().convertFromDiskFormat(FBUtilities.readShortByteArray(dfile));
+ long dataSize = SSTableReader.readRowSize(dfile, desc);
+ iwriter.afterAppend(key, dataPosition);
+ dataPosition = dfile.getFilePointer() + dataSize;
+ dfile.seek(dataPosition);
+ rows++;
+ }
+ }
+ finally
+ {
+ try
+ {
+ dfile.close();
+ iwriter.close();
+ }
+ catch (IOException e)
+ {
+ logger.error("Failed to close data or index file during recovery of " + desc, e);
+ }
+ }
+
+ logger.debug("estimated row count was %s of real count", ((double)estimatedRows) / rows);
+ }
+
+ /**
+ * Removes the given SSTable from temporary status and opens it, rebuilding the non-essential portions of the
+ * file if necessary.
+ */
+ public static SSTableReader recoverAndOpen(Descriptor desc) throws IOException
+ {
+ if (!desc.isLatestVersion())
+ throw new RuntimeException(String.format("Cannot recover SSTable with version %s (current version %s).",
+ desc.version, Descriptor.CURRENT_VERSION));
+
+ maybeRecover(desc);
+ return SSTableReader.open(rename(desc));
+ }
+
+ /**
+ * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+ */
+ static class IndexWriter
+ {
+ private final BufferedRandomAccessFile indexFile;
+ public final Descriptor desc;
+ public final IPartitioner partitioner;
+ public final SegmentedFile.Builder builder;
+ public final IndexSummary summary;
+ public final BloomFilter bf;
+
+ IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws IOException
+ {
+ this.desc = desc;
+ this.partitioner = part;
+ int bufferbytes = (int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB() * 1024 * 1024);
+ indexFile = new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_INDEX), "rw", bufferbytes);
+ builder = SegmentedFile.getBuilder();
+ summary = new IndexSummary();
+ bf = BloomFilter.getFilter(keyCount, 15);
+ }
+
+ public void afterAppend(DecoratedKey key, long dataPosition) throws IOException
+ {
+ byte[] diskKey = partitioner.convertToDiskFormat(key);
+ bf.add(diskKey);
+ long indexPosition = indexFile.getFilePointer();
+ FBUtilities.writeShortByteArray(diskKey, indexFile);
+ indexFile.writeLong(dataPosition);
+ if (logger.isTraceEnabled())
+ logger.trace("wrote index of " + key + " at " + indexPosition);
+
+ summary.maybeAddEntry(key, indexPosition);
+ builder.addPotentialBoundary(indexPosition);
+ }
+
+ /**
+ * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+ */
+ public void close() throws IOException
+ {
+ // bloom filter
+ FileOutputStream fos = new FileOutputStream(desc.filenameFor(SSTable.COMPONENT_FILTER));
+ DataOutputStream stream = new DataOutputStream(fos);
+ BloomFilter.serializer().serialize(bf, stream);
+ stream.flush();
+ fos.getFD().sync();
+ stream.close();
+
+ // index
+ indexFile.getChannel().force(true);
+ indexFile.close();
+
+ // finalize in-memory index state
+ summary.complete();
+ }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java Fri Jun 18 05:33:44 2010
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.net.MessagingService;
@@ -52,26 +54,17 @@ class FileStatusHandler
assert FileStatus.Action.DELETE == streamStatus.getAction() :
"Unknown stream action: " + streamStatus.getAction();
- // file was successfully streamed: if it was the last component of an sstable, assume that the rest
- // have already arrived
- if (pendingFile.getFilename().endsWith("-Data.db"))
+ // file was successfully streamed
+ Descriptor desc = pendingFile.getDescriptor();
+ try
{
- // last component triggers add: see TODO in SSTable.getAllComponents()
- String tableName = pendingFile.getDescriptor().ksname;
- File file = new File(pendingFile.getFilename());
- String fileName = file.getName();
- String [] temp = fileName.split("-");
-
- try
- {
- SSTableReader sstable = SSTableWriter.renameAndOpen(pendingFile.getDescriptor());
- Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
- logger.info("Streaming added " + sstable.getFilename());
- }
- catch (IOException e)
- {
- throw new RuntimeException("Not able to add streamed file " + pendingFile.getFilename(), e);
- }
+ SSTableReader sstable = SSTableWriter.recoverAndOpen(pendingFile.getDescriptor());
+ Table.open(desc.ksname).getColumnFamilyStore(desc.cfname).addSSTable(sstable);
+ logger.info("Streaming added " + sstable);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Not able to add streamed file " + pendingFile.getFilename(), e);
}
// send a StreamStatus message telling the source node it can delete this file
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Fri Jun 18 05:33:44 2010
@@ -103,20 +103,15 @@ public class StreamInitiateVerbHandler i
{
/* Create a local sstable for each remote sstable */
LinkedHashMap<PendingFile, PendingFile> mapping = new LinkedHashMap<PendingFile, PendingFile>();
- Map<Descriptor, Descriptor> sstables = new HashMap<Descriptor, Descriptor>();
for (PendingFile remote : remoteFiles)
{
Descriptor remotedesc = remote.getDescriptor();
- Descriptor localdesc = sstables.get(remotedesc);
- if (localdesc == null)
- {
- // new local sstable
- Table table = Table.open(remotedesc.ksname);
- ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
- localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
- sstables.put(remotedesc, localdesc);
- }
+ // new local sstable
+ Table table = Table.open(remotedesc.ksname);
+ ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
+
+ Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
// add a local file for this component
mapping.put(remote, new PendingFile(localdesc, remote.getComponent(), remote.getExpectedBytes()));
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Fri Jun 18 05:33:44 2010
@@ -117,16 +117,13 @@ public class StreamOut
*/
public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
{
- PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK * sstables.size()];
+ PendingFile[] pendingFiles = new PendingFile[sstables.size()];
int i = 0;
for (SSTableReader sstable : sstables)
{
- for (String component : SSTable.components)
- {
- Descriptor desc = sstable.getDescriptor();
- long filelen = new File(desc.filenameFor(component)).length();
- pendingFiles[i++] = new PendingFile(desc, component, filelen);
- }
+ Descriptor desc = sstable.getDescriptor();
+ long filelen = new File(desc.filenameFor(SSTable.COMPONENT_DATA)).length();
+ pendingFiles[i++] = new PendingFile(desc, SSTable.COMPONENT_DATA, filelen);
}
logger.info("Stream context metadata " + StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
StreamOutManager.get(target).addFilesToStream(pendingFiles);
@@ -145,5 +142,4 @@ public class StreamOut
logger.info("Done with transfer to " + target);
}
}
-
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=955840&r1=955839&r2=955840&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Fri Jun 18 05:33:44 2010
@@ -35,9 +35,7 @@ public class BootstrapTest extends Schem
public void testGetNewNames() throws IOException
{
Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Standard1-500-Data.db").toString());
- PendingFile[] pendingFiles = new PendingFile[]{ new PendingFile(desc, "Data.db", 100),
- new PendingFile(desc, "Index.db", 100),
- new PendingFile(desc, "Filter.db", 100) };
+ PendingFile[] pendingFiles = new PendingFile[]{ new PendingFile(desc, "Data.db", 100) };
StreamInitiateVerbHandler bivh = new StreamInitiateVerbHandler();
// map the input (remote) contexts to output (local) contexts