You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/06 21:58:06 UTC
svn commit: r791591 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
src/java/org/apache/cassandra/test/ test/unit/org/apache/cassandra/db/
test/unit/org/apache/cassandra/io/
Author: jbellis
Date: Mon Jul 6 19:58:05 2009
New Revision: 791591
URL: http://svn.apache.org/viewvc?rev=791591&view=rev
Log:
refactor sstable into SSTable, SSTableReader, and SSTableWriter.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-259
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
- copied, changed from r791590, incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Mon Jul 6 19:58:05 2009
@@ -28,6 +28,8 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.service.StorageService;
@@ -143,7 +145,7 @@
*/
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
- SSTable ssTable = new SSTable(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
+ SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
Collections.sort(keys);
/* Use this BloomFilter to decide if a key exists in a SSTable */
for ( String key : keys )
@@ -152,11 +154,10 @@
if ( bytes.length > 0 )
{
/* Now write the key and value to disk */
- ssTable.append(key, bytes);
+ writer.append(key, bytes);
}
}
- ssTable.close();
- cfStore.storeLocation(ssTable);
+ cfStore.storeLocation(writer.closeAndOpenReader());
columnFamilies_.clear();
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jul 6 19:58:05 2009
@@ -81,7 +81,7 @@
private AtomicReference<BinaryMemtable> binaryMemtable_;
/* SSTables on disk for this column family */
- private SortedMap<String, SSTable> ssTables_ = new TreeMap<String, SSTable>(new FileNameComparator(FileNameComparator.Descending));
+ private SortedMap<String, SSTableReader> ssTables_ = new TreeMap<String, SSTableReader>(new FileNameComparator(FileNameComparator.Descending));
/* Modification lock used for protecting reads from compactions. */
private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -157,7 +157,7 @@
for (File file : files)
{
String filename = file.getName();
- if (((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_))) && (filename.contains(columnFamily_)))
+ if (((file.length() == 0) || (filename.contains("-" + SSTable.TEMPFILE_MARKER))) && (filename.contains(columnFamily_)))
{
file.delete();
continue;
@@ -180,7 +180,7 @@
String filename = file.getAbsolutePath();
try
{
- SSTable sstable = SSTable.open(filename, StorageService.getPartitioner());
+ SSTableReader sstable = SSTableReader.open(filename, StorageService.getPartitioner());
ssTables_.put(filename, sstable);
}
catch (IOException ex)
@@ -231,7 +231,7 @@
sb.append("Number of files on disk : " + ssTables_.size());
sb.append(newLineSeparator);
double totalSpace = 0d;
- for (SSTable sstable: ssTables_.values())
+ for (SSTableReader sstable: ssTables_.values())
{
File f = new File(sstable.getFilename());
totalSpace += f.length();
@@ -248,7 +248,7 @@
* This is called after bootstrap to add the files
* to the list of files maintained.
*/
- void addToList(SSTable file)
+ void addToList(SSTableReader file)
{
lock_.writeLock().lock();
try
@@ -376,7 +376,7 @@
{
fileIndexGenerator_.incrementAndGet();
return String.format("%s-%s-%s-%s-Data.db",
- table_, columnFamily_, SSTable.temporaryFile_, fileIndexGenerator_.incrementAndGet());
+ table_, columnFamily_, SSTable.TEMPFILE_MARKER, fileIndexGenerator_.incrementAndGet());
}
/*
@@ -401,7 +401,7 @@
index = lowestIndex + 1;
return String.format("%s-%s-%s-%s-Data.db",
- table_, columnFamily_, SSTable.temporaryFile_, index);
+ table_, columnFamily_, SSTable.TEMPFILE_MARKER, index);
}
void switchMemtable()
@@ -549,7 +549,7 @@
lock_.readLock().lock();
try
{
- for (SSTable sstable : ssTables_.values())
+ for (SSTableReader sstable : ssTables_.values())
{
ColumnFamily columnFamily = null;
try
@@ -573,7 +573,7 @@
}
}
- private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, SSTable ssTable) throws IOException
+ private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, SSTableReader ssTable) throws IOException
{
DataInputBuffer bufIn;
bufIn = filter.next(key, cf, ssTable);
@@ -720,7 +720,7 @@
* param @ filename - filename just flushed to disk
* param @ bf - bloom filter which indicates the keys that are in this file.
*/
- void storeLocation(SSTable sstable)
+ void storeLocation(SSTableReader sstable)
{
int ssTableCount;
lock_.writeLock().lock();
@@ -756,7 +756,7 @@
{
try
{
- fs = SSTable.get(file).getFileStruct();
+ fs = SSTableReader.get(file).getFileStruct();
fs.advance();
if (fs.isExhausted())
{
@@ -985,9 +985,9 @@
if (logger_.isDebugEnabled())
logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
assert newfile != null;
- ssTables_.put(newfile, SSTable.open(newfile, StorageService.getPartitioner()));
+ ssTables_.put(newfile, SSTableReader.open(newfile, StorageService.getPartitioner()));
}
- SSTable.get(file).delete();
+ SSTableReader.get(file).delete();
}
finally
{
@@ -1036,12 +1036,12 @@
}
mergedFileName = getTempSSTableFileName();
- SSTable ssTableRange = null;
+ SSTableWriter rangeWriter = null;
String lastkey = null;
List<FileStruct> lfs = new ArrayList<FileStruct>();
DataOutputBuffer bufOut = new DataOutputBuffer();
- int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
- expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
if (logger_.isDebugEnabled())
logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
@@ -1108,7 +1108,7 @@
}
if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
{
- if (ssTableRange == null)
+ if (rangeWriter == null)
{
if (target != null)
{
@@ -1116,11 +1116,11 @@
}
FileUtils.createDirectory(rangeFileLocation);
String fname = new File(rangeFileLocation, mergedFileName).getAbsolutePath();
- ssTableRange = new SSTable(fname, expectedBloomFilterSize, StorageService.getPartitioner());
+ rangeWriter = new SSTableWriter(fname, expectedBloomFilterSize, StorageService.getPartitioner());
}
try
{
- ssTableRange.append(lastkey, bufOut);
+ rangeWriter.append(lastkey, bufOut);
}
catch (Exception ex)
{
@@ -1172,12 +1172,12 @@
}
}
- if (ssTableRange != null)
+ if (rangeWriter != null)
{
- ssTableRange.close();
+ rangeWriter.closeAndOpenReader();
if (fileList != null)
{
- fileList.add(ssTableRange.getFilename());
+ fileList.add(rangeWriter.getFilename());
}
}
@@ -1235,12 +1235,13 @@
}
String mergedFileName = getTempFileName(files);
- SSTable ssTable = null;
+ SSTableWriter writer = null;
+ SSTableReader ssTable = null;
String lastkey = null;
List<FileStruct> lfs = new ArrayList<FileStruct>();
DataOutputBuffer bufOut = new DataOutputBuffer();
- int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
- expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
if (logger_.isDebugEnabled())
logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
@@ -1305,12 +1306,12 @@
bufOut.write(filestruct.getBufIn(), size);
}
- if (ssTable == null)
+ if (writer == null)
{
String fname = new File(compactionFileLocation, mergedFileName).getAbsolutePath();
- ssTable = new SSTable(fname, expectedBloomFilterSize, StorageService.getPartitioner());
+ writer = new SSTableWriter(fname, expectedBloomFilterSize, StorageService.getPartitioner());
}
- ssTable.append(lastkey, bufOut);
+ writer.append(lastkey, bufOut);
totalkeysWritten++;
for (FileStruct filestruct : lfs)
@@ -1343,11 +1344,11 @@
}
}
}
- if (ssTable != null)
+ if (writer != null)
{
// TODO if all the keys were the same nothing will be done here
- ssTable.close();
- newfile = ssTable.getFilename();
+ ssTable = writer.closeAndOpenReader();
+ newfile = writer.getFilename();
}
lock_.writeLock().lock();
try
@@ -1363,7 +1364,7 @@
}
for (String file : files)
{
- SSTable.get(file).delete();
+ SSTableReader.get(file).delete();
}
}
finally
@@ -1498,7 +1499,7 @@
}
/** not threadsafe. caller must have lock_ acquired. */
- public Collection<SSTable> getSSTables()
+ public Collection<SSTableReader> getSSTables()
{
return Collections.unmodifiableCollection(ssTables_.values());
}
@@ -1560,8 +1561,8 @@
}
/* add the SSTables on disk */
- List<SSTable> sstables = new ArrayList<SSTable>(ssTables_.values());
- for (SSTable sstable : sstables)
+ List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_.values());
+ for (SSTableReader sstable : sstables)
{
iter = new SSTableColumnIterator(sstable.getFilename(), key, cfName, startColumn, isAscending);
if (iter.hasNext())
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java Mon Jul 6 19:58:05 2009
@@ -23,7 +23,7 @@
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
import org.apache.cassandra.service.StorageService;
import com.google.common.collect.AbstractIterator;
@@ -58,7 +58,7 @@
throws IOException
{
this.isAscending = isAscending;
- SSTable ssTable = SSTable.open(filename, StorageService.getPartitioner());
+ SSTableReader ssTable = SSTableReader.open(filename, StorageService.getPartitioner());
reader = ssTable.getColumnGroupReader(key, cfName, startColumn, isAscending);
this.startColumn = startColumn;
curColumnIndex = isAscending ? 0 : -1;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java Mon Jul 6 19:58:05 2009
@@ -23,13 +23,11 @@
import java.io.IOException;
import java.util.List;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
import org.apache.log4j.Logger;
@@ -46,11 +44,11 @@
try
{
- List<SSTable> ssTables = Table.open(table).getAllSSTablesOnDisk();
+ List<SSTableReader> ssTables = Table.open(table).getAllSSTablesOnDisk();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeInt(ssTables.size());
- for (SSTable sstable : ssTables)
+ for (SSTableReader sstable : ssTables)
{
dos.writeUTF(sstable.getFilename());
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java Mon Jul 6 19:58:05 2009
@@ -22,12 +22,12 @@
import java.io.IOException;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
public interface IFilter
{
public ColumnFamily filter(String cfName, ColumnFamily cf);
public IColumn filter(IColumn column, DataInputStream dis) throws IOException;
- public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException;
+ public DataInputBuffer next(String key, String cf, SSTableReader ssTable) throws IOException;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java Mon Jul 6 19:58:05 2009
@@ -20,10 +20,9 @@
import java.io.DataInputStream;
import java.io.IOException;
-import java.util.Collection;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
public class IdentityFilter implements IFilter
@@ -38,7 +37,7 @@
return column;
}
- public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+ public DataInputBuffer next(String key, String cf, SSTableReader ssTable) throws IOException
{
return ssTable.next(key, cf);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Jul 6 19:58:05 2009
@@ -29,7 +29,8 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.DestructivePQIterator;
import org.apache.log4j.Logger;
@@ -250,7 +251,7 @@
logger_.info("Flushing " + this);
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
- SSTable ssTable = new SSTable(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
+ SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
// sort keys in the order they would be in when decorated
final IPartitioner partitioner = StorageService.getPartitioner();
@@ -273,10 +274,10 @@
/* serialize the cf with column indexes */
ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
/* Now write the key and value to disk */
- ssTable.append(partitioner.decorateKey(key), buffer);
+ writer.append(partitioner.decorateKey(key), buffer);
}
}
- ssTable.close();
+ SSTableReader ssTable = writer.closeAndOpenReader();
cfStore.onMemtableFlush(cLogCtx);
cfStore.storeLocation(ssTable);
buffer.close();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java Mon Jul 6 19:58:05 2009
@@ -23,9 +23,8 @@
import java.util.Collection;
import java.util.List;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
@@ -92,7 +91,7 @@
return column;
}
- public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+ public DataInputBuffer next(String key, String cf, SSTableReader ssTable) throws IOException
{
return ssTable.next(key, cf, names_, null);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java Mon Jul 6 19:58:05 2009
@@ -23,7 +23,7 @@
import java.util.Collection;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
/**
* Filters columns to satisfy colmin <= colname <= colmax
@@ -86,7 +86,7 @@
return null;
}
- public DataInputBuffer next(String key, String cf, SSTable ssTable)
+ public DataInputBuffer next(String key, String cf, SSTableReader ssTable)
throws IOException
{
return ssTable.next(key, cf);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Jul 6 19:58:05 2009
@@ -33,8 +33,7 @@
import org.apache.cassandra.dht.BootstrapInitiateMessage;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.FileStruct;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
@@ -186,7 +185,7 @@
* list of the associated Column Family. Also merge the CBF into the
* sampler.
*/
- SSTable sstable = SSTable.open(streamContext.getTargetFile(), StorageService.getPartitioner());
+ SSTableReader sstable = SSTableReader.open(streamContext.getTargetFile(), StorageService.getPartitioner());
logger_.debug("Merging the counting bloom filter in the sampler ...");
String[] peices = FBUtilities.strip(fileName, "-");
Table.open(peices[0]).getColumnFamilyStore(peices[1]).addToList(sstable);
@@ -454,9 +453,9 @@
/*
* Get the list of all SSTables on disk. Not safe unless you aquire the CFS readlocks!
*/
- public List<SSTable> getAllSSTablesOnDisk()
+ public List<SSTableReader> getAllSSTablesOnDisk()
{
- List<SSTable> list = new ArrayList<SSTable>();
+ List<SSTableReader> list = new ArrayList<SSTableReader>();
Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
for ( String columnFamily : columnFamilies )
{
@@ -778,7 +777,7 @@
}
// sstables
- for (SSTable sstable : cfs.getSSTables())
+ for (SSTableReader sstable : cfs.getSSTables())
{
FileStruct fs = sstable.getFileStruct();
fs.seekTo(startWith);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java Mon Jul 6 19:58:05 2009
@@ -21,10 +21,9 @@
import java.io.IOException;
import java.util.Collection;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.IndexHelper;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
/**
@@ -122,7 +121,7 @@
return column;
}
- public DataInputBuffer next(String key, String cfName, SSTable ssTable) throws IOException
+ public DataInputBuffer next(String key, String cfName, SSTableReader ssTable) throws IOException
{
return ssTable.next(key, cfName, null, new IndexHelper.TimeRange(timeLimit_, Long.MAX_VALUE));
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Mon Jul 6 19:58:05 2009
@@ -24,8 +24,8 @@
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.SSTableReader;
+
import org.apache.log4j.Logger;
import com.google.common.collect.AbstractIterator;
@@ -39,10 +39,10 @@
private IFileReader reader;
private DataInputBuffer bufIn;
private DataOutputBuffer bufOut;
- private SSTable sstable;
+ private SSTableReader sstable;
private FileStructIterator iterator;
- FileStruct(SSTable sstable) throws IOException
+ FileStruct(SSTableReader sstable) throws IOException
{
this.reader = SequenceFile.bufferedReader(sstable.getFilename(), 1024 * 1024);
this.sstable = sstable;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Mon Jul 6 19:58:05 2009
@@ -1,596 +1,104 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.FileUtils;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
-
-/**
- * This class is built on top of the SequenceFile. It stores
- * data on disk in sorted fashion. However the sorting is upto
- * the application. This class expects keys to be handed to it
- * in sorted order.
- *
- * A separate index file is maintained as well, containing the
- * SSTable keys and the offset into the SSTable at which they are found.
- * Every 1/indexInterval key is read into memory when the SSTable is opened.
- *
- * Finally, a bloom filter file is also kept for the keys in each SSTable.
- */
-
-public class SSTable
-{
- private static Logger logger_ = Logger.getLogger(SSTable.class);
- /* Every 128th index entry is loaded into memory so we know where to start looking for the actual key w/o seeking */
- private static final int indexInterval_ = 128;
- /* Required extension for temporary files created during compactions. */
- public static final String temporaryFile_ = "tmp";
-
- private static FileSSTableMap openedFiles = new FileSSTableMap();
-
- private ConcurrentLinkedHashMap<String, Long> keyCache = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, 1000);
-
-
- public static int indexInterval()
- {
- return indexInterval_;
- }
-
- // todo can we refactor to take list of sstables?
- public static int getApproximateKeyCount(List<String> dataFiles)
- {
- int count = 0;
-
- for (String dataFileName : dataFiles)
- {
- SSTable sstable = openedFiles.get(dataFileName);
- assert sstable != null;
- int indexKeyCount = sstable.getIndexPositions().size();
- count = count + (indexKeyCount + 1) * indexInterval_;
- if (logger_.isDebugEnabled())
- logger_.debug("index size for bloom filter calc for file : " + dataFileName + " : " + count);
- }
-
- return count;
- }
-
- /**
- * Get all indexed keys in the SSTable.
- */
- public static List<String> getIndexedKeys()
- {
- List<String> indexedKeys = new ArrayList<String>();
-
- for (SSTable sstable : openedFiles.values())
- {
- for (KeyPosition kp : sstable.getIndexPositions())
- {
- indexedKeys.add(kp.key);
- }
- }
- Collections.sort(indexedKeys);
-
- return indexedKeys;
- }
-
- String dataFile_;
- private long keysWritten;
- private IFileWriter dataWriter_;
- private BufferedRandomAccessFile indexRAF_;
- private String lastWrittenKey_;
- private IPartitioner partitioner_;
- List<KeyPosition> indexPositions_;
- BloomFilter bf;
-
- public static synchronized SSTable open(String dataFileName, IPartitioner partitioner) throws IOException
- {
- SSTable sstable = openedFiles.get(dataFileName);
- if (sstable == null)
- {
- assert partitioner != null;
- sstable = new SSTable(dataFileName, partitioner);
-
- long start = System.currentTimeMillis();
- sstable.loadIndexFile();
- sstable.loadBloomFilter();
- if (logger_.isDebugEnabled())
- logger_.debug("INDEX LOAD TIME for " + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
-
- openedFiles.put(dataFileName, sstable);
- }
- return sstable;
- }
-
- public static synchronized SSTable get(String dataFileName) throws IOException
- {
- SSTable sstable = openedFiles.get(dataFileName);
- assert sstable != null;
- return sstable;
- }
-
- private SSTable(String filename, IPartitioner partitioner)
- {
- assert filename.endsWith("-Data.db");
- dataFile_ = filename;
- partitioner_ = partitioner;
- }
-
- public SSTable(String filename, int keyCount, IPartitioner partitioner) throws IOException
- {
- assert filename.endsWith("-Data.db");
- dataFile_ = filename;
- partitioner_ = partitioner;
- dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4 * 1024 * 1024);
- indexRAF_ = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
- bf = new BloomFilter(keyCount, 15);
- }
-
- static String parseTableName(String filename)
- {
- String[] parts = new File(filename).getName().split("-"); // table, cf, index, [filetype]
- return parts[0];
- }
-
- public List<KeyPosition> getIndexPositions()
- {
- return indexPositions_;
- }
-
- private void loadBloomFilter() throws IOException
- {
- DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename()));
- bf = BloomFilter.serializer().deserialize(stream);
- }
-
- private void loadIndexFile() throws IOException
- {
- BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
- indexPositions_ = new ArrayList<KeyPosition>();
-
- int i = 0;
- long indexSize = input.length();
- while (true)
- {
- long indexPosition = input.getFilePointer();
- if (indexPosition == indexSize)
- {
- break;
- }
- String decoratedKey = input.readUTF();
- input.readLong();
- if (i++ % indexInterval_ == 0)
- {
- indexPositions_.add(new KeyPosition(decoratedKey, indexPosition));
- }
- }
- }
-
- private static String indexFilename(String dataFile)
- {
- String[] parts = dataFile.split("-");
- parts[parts.length - 1] = "Index.db";
- return StringUtils.join(parts, "-");
- }
- private String indexFilename()
- {
- return indexFilename(dataFile_);
- }
-
- private static String filterFilename(String dataFile)
- {
- String[] parts = dataFile.split("-");
- parts[parts.length - 1] = "Filter.db";
- return StringUtils.join(parts, "-");
- }
- private String filterFilename()
- {
- return filterFilename(dataFile_);
- }
-
- public String getFilename()
- {
- return dataFile_;
- }
-
- private long beforeAppend(String decoratedKey) throws IOException
- {
- if (decoratedKey == null)
- {
- throw new IOException("Keys must not be null.");
- }
- Comparator<String> c = partitioner_.getDecoratedKeyComparator();
- if (lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) > 0)
- {
- logger_.info("Last written key : " + lastWrittenKey_);
- logger_.info("Current key : " + decoratedKey);
- logger_.info("Writing into file " + dataFile_);
- throw new IOException("Keys must be written in ascending order.");
- }
- return (lastWrittenKey_ == null) ? 0 : dataWriter_.getCurrentPosition();
- }
-
- private void afterAppend(String decoratedKey, long position) throws IOException
- {
- bf.add(decoratedKey);
- lastWrittenKey_ = decoratedKey;
- long indexPosition = indexRAF_.getFilePointer();
- indexRAF_.writeUTF(decoratedKey);
- indexRAF_.writeLong(position);
- logger_.trace("wrote " + decoratedKey + " at " + position);
-
- if (keysWritten++ % indexInterval_ != 0)
- return;
- if (indexPositions_ == null)
- {
- indexPositions_ = new ArrayList<KeyPosition>();
- }
- indexPositions_.add(new KeyPosition(decoratedKey, indexPosition));
- logger_.trace("wrote index of " + decoratedKey + " at " + indexPosition);
- }
-
- // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
- public void append(String decoratedKey, DataOutputBuffer buffer) throws IOException
- {
- long currentPosition = beforeAppend(decoratedKey);
- dataWriter_.append(decoratedKey, buffer);
- afterAppend(decoratedKey, currentPosition);
- }
-
- public void append(String decoratedKey, byte[] value) throws IOException
- {
- long currentPosition = beforeAppend(decoratedKey);
- dataWriter_.append(decoratedKey, value);
- afterAppend(decoratedKey, currentPosition);
- }
-
- /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
- private long getIndexScanPosition(String decoratedKey, IPartitioner partitioner)
- {
- assert indexPositions_ != null && indexPositions_.size() > 0;
- int index = Collections.binarySearch(indexPositions_, new KeyPosition(decoratedKey, -1));
- if (index < 0)
- {
- // binary search gives us the first index _greater_ than the key searched for,
- // i.e., its insertion position
- int greaterThan = (index + 1) * -1;
- if (greaterThan == 0)
- return -1;
- return indexPositions_.get(greaterThan - 1).position;
- }
- else
- {
- return indexPositions_.get(index).position;
- }
- }
-
- /**
- * returns the position in the data file to find the given key, or -1 if the key is not present
- */
- public long getPosition(String decoratedKey, IPartitioner partitioner) throws IOException
- {
- if (!bf.isPresent(decoratedKey))
- return -1;
- Long cachedPosition = keyCache.get(decoratedKey);
- if (cachedPosition != null)
- {
- return cachedPosition;
- }
- long start = getIndexScanPosition(decoratedKey, partitioner);
- if (start < 0)
- {
- return -1;
- }
-
- // TODO mmap the index file?
- BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile_), "r");
- input.seek(start);
- int i = 0;
- try
- {
- do
- {
- String indexDecoratedKey;
- try
- {
- indexDecoratedKey = input.readUTF();
- }
- catch (EOFException e)
- {
- return -1;
- }
- long position = input.readLong();
- int v = partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey);
- if (v == 0)
- {
- keyCache.put(decoratedKey, position);
- return position;
- }
- if (v > 0)
- return -1;
- } while (++i < indexInterval_);
- }
- finally
- {
- input.close();
- }
- return -1;
- }
-
- /** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */
- public long getNearestPosition(String decoratedKey) throws IOException
- {
- long start = getIndexScanPosition(decoratedKey, partitioner_);
- if (start < 0)
- {
- return 0;
- }
- BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile_), "r");
- input.seek(start);
- try
- {
- while (true)
- {
- String indexDecoratedKey;
- try
- {
- indexDecoratedKey = input.readUTF();
- }
- catch (EOFException e)
- {
- return -1;
- }
- long position = input.readLong();
- int v = partitioner_.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey);
- if (v >= 0)
- return position;
- }
- }
- finally
- {
- input.close();
- }
- }
-
- public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames) throws IOException
- {
- return next(clientKey, cfName, columnNames, null);
- }
-
- public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames, IndexHelper.TimeRange timeRange) throws IOException
- {
- IFileReader dataReader = null;
- try
- {
- dataReader = SequenceFile.reader(dataFile_);
- String decoratedKey = partitioner_.decorateKey(clientKey);
- long position = getPosition(decoratedKey, partitioner_);
-
- DataOutputBuffer bufOut = new DataOutputBuffer();
- DataInputBuffer bufIn = new DataInputBuffer();
- long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, timeRange, position);
- if (bytesRead != -1L)
- {
- if (bufOut.getLength() > 0)
- {
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- /* read the key even though we do not use it */
- bufIn.readUTF();
- bufIn.readInt();
- }
- }
- return bufIn;
- }
- finally
- {
- if (dataReader != null)
- {
- dataReader.close();
- }
- }
- }
-
- public DataInputBuffer next(String clientKey, String columnFamilyColumn) throws IOException
- {
- String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
- String columnFamilyName = values[0];
- List<String> cnNames = (values.length == 1) ? null : Arrays.asList(values[1]);
- return next(clientKey, columnFamilyName, cnNames);
- }
-
- private static String rename(String tmpFilename)
- {
- String filename = tmpFilename.replace("-" + temporaryFile_, "");
- new File(tmpFilename).renameTo(new File(filename));
- return filename;
- }
-
- /**
- * Renames temporary SSTable files to valid data, index, and bloom filter files
- */
- public void close() throws IOException
- {
- // bloom filter
- FileOutputStream fos = new FileOutputStream(filterFilename());
- DataOutputStream stream = new DataOutputStream(fos);
- BloomFilter.serializer().serialize(bf, stream);
- stream.flush();
- fos.getFD().sync();
- stream.close();
-
- // index
- indexRAF_.getChannel().force(true);
- indexRAF_.close();
-
- // main data
- dataWriter_.close(); // calls force
-
- rename(indexFilename());
- rename(filterFilename());
- dataFile_ = rename(dataFile_); // important to do this last since index & filter file names are derived from it
-
- openedFiles.put(dataFile_, this);
- }
-
- /**
- * obtain a BlockReader for the getColumnSlice call.
- */
- public ColumnGroupReader getColumnGroupReader(String key, String cfName, String startColumn, boolean isAscending) throws IOException
- {
- IFileReader dataReader = SequenceFile.reader(dataFile_);
-
- try
- {
- /* Morph key into actual key based on the partition type. */
- String decoratedKey = partitioner_.decorateKey(key);
- long position = getPosition(decoratedKey, partitioner_);
- return new ColumnGroupReader(dataFile_, decoratedKey, cfName, startColumn, isAscending, position);
- }
- finally
- {
- dataReader.close();
- }
- }
-
- public void delete() throws IOException
- {
- FileUtils.deleteWithConfirm(new File(dataFile_));
- FileUtils.deleteWithConfirm(new File(indexFilename(dataFile_)));
- FileUtils.deleteWithConfirm(new File(filterFilename(dataFile_)));
- openedFiles.remove(dataFile_);
- }
-
- /** obviously only for testing */
- public void forceBloomFilterFailures()
- {
- bf = BloomFilter.alwaysMatchingBloomFilter();
- }
-
- static void reopenUnsafe() throws IOException // testing only
- {
- Collection<SSTable> sstables = new ArrayList<SSTable>(openedFiles.values());
- openedFiles.clear();
- for (SSTable sstable : sstables)
- {
- SSTable.open(sstable.dataFile_, sstable.partitioner_);
- }
- }
-
- IPartitioner getPartitioner()
- {
- return partitioner_;
- }
-
- public FileStruct getFileStruct() throws IOException
- {
- return new FileStruct(this);
- }
-
- public static void deleteAll() throws IOException
- {
- for (SSTable sstable : openedFiles.values())
- {
- sstable.delete();
- }
- }
-
- /**
- * This is a simple container for the index Key and its corresponding position
- * in the data file. Binary search is performed on a list of these objects
- * to lookup keys within the SSTable data file.
- *
- * All keys are decorated.
- */
- class KeyPosition implements Comparable<KeyPosition>
- {
- public final String key; // decorated
- public final long position;
-
- public KeyPosition(String key, long position)
- {
- this.key = key;
- this.position = position;
- }
-
- public int compareTo(KeyPosition kp)
- {
- return partitioner_.getDecoratedKeyComparator().compare(key, kp.key);
- }
-
- public String toString()
- {
- return key + ":" + position;
- }
- }
-
-}
-
-class FileSSTableMap
-{
- private final Map<String, SSTable> map = new NonBlockingHashMap<String, SSTable>();
-
- public SSTable get(String filename)
- {
- try
- {
- return map.get(new File(filename).getCanonicalPath());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public SSTable put(String filename, SSTable value)
- {
- try
- {
- return map.put(new File(filename).getCanonicalPath(), value);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public Collection<SSTable> values()
- {
- return map.values();
- }
-
- public void clear()
- {
- map.clear();
- }
-
- public void remove(String filename) throws IOException
- {
- map.remove(new File(filename).getCanonicalPath());
- }
-}
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.utils.BloomFilter;
+
+/**
+ * This class is built on top of the SequenceFile. It stores
+ * data on disk in sorted fashion. However the sorting is upto
+ * the application. This class expects keys to be handed to it
+ * in sorted order.
+ *
+ * A separate index file is maintained as well, containing the
+ * SSTable keys and the offset into the SSTable at which they are found.
+ * Every 1/indexInterval key is read into memory when the SSTable is opened.
+ *
+ * Finally, a bloom filter file is also kept for the keys in each SSTable.
+ */
+public abstract class SSTable
+{
+ protected String dataFile;
+ protected IPartitioner partitioner;
+ protected BloomFilter bf;
+ protected List<KeyPosition> indexPositions;
+
+ /* Every 128th index entry is loaded into memory so we know where to start looking for the actual key w/o seeking */
+ public static final int INDEX_INTERVAL = 128;/* Required extension for temporary files created during compactions. */
+ public static final String TEMPFILE_MARKER = "tmp";
+
+ public SSTable(String filename, IPartitioner partitioner)
+ {
+ assert filename.endsWith("-Data.db");
+ this.dataFile = filename;
+ this.partitioner = partitioner;
+ }
+
+ protected static String indexFilename(String dataFile)
+ {
+ String[] parts = dataFile.split("-");
+ parts[parts.length - 1] = "Index.db";
+ return StringUtils.join(parts, "-");
+ }
+
+ protected String indexFilename()
+ {
+ return indexFilename(dataFile);
+ }
+
+ protected static String filterFilename(String dataFile)
+ {
+ String[] parts = dataFile.split("-");
+ parts[parts.length - 1] = "Filter.db";
+ return StringUtils.join(parts, "-");
+ }
+
+ protected String filterFilename()
+ {
+ return filterFilename(dataFile);
+ }
+
+ public String getFilename()
+ {
+ return dataFile;
+ }
+
+ static String parseTableName(String filename)
+ {
+ String[] parts = new File(filename).getName().split("-"); // table, cf, index, [filetype]
+ return parts[0];
+ }
+
+ /**
+ * This is a simple container for the index Key and its corresponding position
+ * in the data file. Binary search is performed on a list of these objects
+ * to lookup keys within the SSTable data file.
+ *
+ * All keys are decorated.
+ */
+ class KeyPosition implements Comparable<KeyPosition>
+ {
+ public final String key; // decorated
+ public final long position;
+
+ public KeyPosition(String key, long position)
+ {
+ this.key = key;
+ this.position = position;
+ }
+
+ public int compareTo(KeyPosition kp)
+ {
+ return partitioner.getDecoratedKeyComparator().compare(key, kp.key);
+ }
+
+ public String toString()
+ {
+ return key + ":" + position;
+ }
+ }
+}
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (from r791590, incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java&r1=791590&r2=791591&rev=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Mon Jul 6 19:58:05 2009
@@ -22,7 +22,6 @@
import java.util.*;
import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.dht.IPartitioner;
@@ -32,35 +31,15 @@
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
-/**
- * This class is built on top of the SequenceFile. It stores
- * data on disk in sorted fashion. However the sorting is upto
- * the application. This class expects keys to be handed to it
- * in sorted order.
- *
- * A separate index file is maintained as well, containing the
- * SSTable keys and the offset into the SSTable at which they are found.
- * Every 1/indexInterval key is read into memory when the SSTable is opened.
- *
- * Finally, a bloom filter file is also kept for the keys in each SSTable.
- */
-
-public class SSTable
+public class SSTableReader extends SSTable
{
- private static Logger logger_ = Logger.getLogger(SSTable.class);
- /* Every 128th index entry is loaded into memory so we know where to start looking for the actual key w/o seeking */
- private static final int indexInterval_ = 128;
- /* Required extension for temporary files created during compactions. */
- public static final String temporaryFile_ = "tmp";
+ private static Logger logger = Logger.getLogger(SSTableReader.class);
private static FileSSTableMap openedFiles = new FileSSTableMap();
- private ConcurrentLinkedHashMap<String, Long> keyCache = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, 1000);
-
-
public static int indexInterval()
{
- return indexInterval_;
+ return INDEX_INTERVAL;
}
// todo can we refactor to take list of sstables?
@@ -70,12 +49,11 @@
for (String dataFileName : dataFiles)
{
- SSTable sstable = openedFiles.get(dataFileName);
+ SSTableReader sstable = openedFiles.get(dataFileName);
assert sstable != null;
int indexKeyCount = sstable.getIndexPositions().size();
- count = count + (indexKeyCount + 1) * indexInterval_;
- if (logger_.isDebugEnabled())
- logger_.debug("index size for bloom filter calc for file : " + dataFileName + " : " + count);
+ count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
+ logger.debug("index size for bloom filter calc for file : " + dataFileName + " : " + count);
}
return count;
@@ -88,7 +66,7 @@
{
List<String> indexedKeys = new ArrayList<String>();
- for (SSTable sstable : openedFiles.values())
+ for (SSTableReader sstable : openedFiles.values())
{
for (KeyPosition kp : sstable.getIndexPositions())
{
@@ -100,67 +78,53 @@
return indexedKeys;
}
- String dataFile_;
- private long keysWritten;
- private IFileWriter dataWriter_;
- private BufferedRandomAccessFile indexRAF_;
- private String lastWrittenKey_;
- private IPartitioner partitioner_;
- List<KeyPosition> indexPositions_;
- BloomFilter bf;
-
- public static synchronized SSTable open(String dataFileName, IPartitioner partitioner) throws IOException
+ public static synchronized SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
{
- SSTable sstable = openedFiles.get(dataFileName);
+ SSTableReader sstable = openedFiles.get(dataFileName);
if (sstable == null)
{
assert partitioner != null;
- sstable = new SSTable(dataFileName, partitioner);
+ sstable = new SSTableReader(dataFileName, partitioner);
long start = System.currentTimeMillis();
sstable.loadIndexFile();
sstable.loadBloomFilter();
- if (logger_.isDebugEnabled())
- logger_.debug("INDEX LOAD TIME for " + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
+ logger.debug("INDEX LOAD TIME for " + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
openedFiles.put(dataFileName, sstable);
}
return sstable;
}
- public static synchronized SSTable get(String dataFileName) throws IOException
+ public static synchronized SSTableReader get(String dataFileName) throws IOException
{
- SSTable sstable = openedFiles.get(dataFileName);
+ SSTableReader sstable = openedFiles.get(dataFileName);
assert sstable != null;
return sstable;
}
- private SSTable(String filename, IPartitioner partitioner)
- {
- assert filename.endsWith("-Data.db");
- dataFile_ = filename;
- partitioner_ = partitioner;
- }
- public SSTable(String filename, int keyCount, IPartitioner partitioner) throws IOException
+ private ConcurrentLinkedHashMap<String, Long> keyCache = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, 1000);
+
+ SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter)
{
- assert filename.endsWith("-Data.db");
- dataFile_ = filename;
- partitioner_ = partitioner;
- dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4 * 1024 * 1024);
- indexRAF_ = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
- bf = new BloomFilter(keyCount, 15);
+ super(filename, partitioner);
+ this.indexPositions = indexPositions;
+ this.bf = bloomFilter;
+ synchronized (SSTableReader.this)
+ {
+ openedFiles.put(filename, this);
+ }
}
- static String parseTableName(String filename)
+ private SSTableReader(String filename, IPartitioner partitioner)
{
- String[] parts = new File(filename).getName().split("-"); // table, cf, index, [filetype]
- return parts[0];
+ super(filename, partitioner);
}
public List<KeyPosition> getIndexPositions()
{
- return indexPositions_;
+ return indexPositions;
}
private void loadBloomFilter() throws IOException
@@ -172,7 +136,7 @@
private void loadIndexFile() throws IOException
{
BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
- indexPositions_ = new ArrayList<KeyPosition>();
+ indexPositions = new ArrayList<KeyPosition>();
int i = 0;
long indexSize = input.length();
@@ -185,96 +149,18 @@
}
String decoratedKey = input.readUTF();
input.readLong();
- if (i++ % indexInterval_ == 0)
+ if (i++ % INDEX_INTERVAL == 0)
{
- indexPositions_.add(new KeyPosition(decoratedKey, indexPosition));
+ indexPositions.add(new KeyPosition(decoratedKey, indexPosition));
}
}
}
- private static String indexFilename(String dataFile)
- {
- String[] parts = dataFile.split("-");
- parts[parts.length - 1] = "Index.db";
- return StringUtils.join(parts, "-");
- }
- private String indexFilename()
- {
- return indexFilename(dataFile_);
- }
-
- private static String filterFilename(String dataFile)
- {
- String[] parts = dataFile.split("-");
- parts[parts.length - 1] = "Filter.db";
- return StringUtils.join(parts, "-");
- }
- private String filterFilename()
- {
- return filterFilename(dataFile_);
- }
-
- public String getFilename()
- {
- return dataFile_;
- }
-
- private long beforeAppend(String decoratedKey) throws IOException
- {
- if (decoratedKey == null)
- {
- throw new IOException("Keys must not be null.");
- }
- Comparator<String> c = partitioner_.getDecoratedKeyComparator();
- if (lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) > 0)
- {
- logger_.info("Last written key : " + lastWrittenKey_);
- logger_.info("Current key : " + decoratedKey);
- logger_.info("Writing into file " + dataFile_);
- throw new IOException("Keys must be written in ascending order.");
- }
- return (lastWrittenKey_ == null) ? 0 : dataWriter_.getCurrentPosition();
- }
-
- private void afterAppend(String decoratedKey, long position) throws IOException
- {
- bf.add(decoratedKey);
- lastWrittenKey_ = decoratedKey;
- long indexPosition = indexRAF_.getFilePointer();
- indexRAF_.writeUTF(decoratedKey);
- indexRAF_.writeLong(position);
- logger_.trace("wrote " + decoratedKey + " at " + position);
-
- if (keysWritten++ % indexInterval_ != 0)
- return;
- if (indexPositions_ == null)
- {
- indexPositions_ = new ArrayList<KeyPosition>();
- }
- indexPositions_.add(new KeyPosition(decoratedKey, indexPosition));
- logger_.trace("wrote index of " + decoratedKey + " at " + indexPosition);
- }
-
- // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
- public void append(String decoratedKey, DataOutputBuffer buffer) throws IOException
- {
- long currentPosition = beforeAppend(decoratedKey);
- dataWriter_.append(decoratedKey, buffer);
- afterAppend(decoratedKey, currentPosition);
- }
-
- public void append(String decoratedKey, byte[] value) throws IOException
- {
- long currentPosition = beforeAppend(decoratedKey);
- dataWriter_.append(decoratedKey, value);
- afterAppend(decoratedKey, currentPosition);
- }
-
/** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
private long getIndexScanPosition(String decoratedKey, IPartitioner partitioner)
{
- assert indexPositions_ != null && indexPositions_.size() > 0;
- int index = Collections.binarySearch(indexPositions_, new KeyPosition(decoratedKey, -1));
+ assert indexPositions != null && indexPositions.size() > 0;
+ int index = Collections.binarySearch(indexPositions, new KeyPosition(decoratedKey, -1));
if (index < 0)
{
// binary search gives us the first index _greater_ than the key searched for,
@@ -282,11 +168,11 @@
int greaterThan = (index + 1) * -1;
if (greaterThan == 0)
return -1;
- return indexPositions_.get(greaterThan - 1).position;
+ return indexPositions.get(greaterThan - 1).position;
}
else
{
- return indexPositions_.get(index).position;
+ return indexPositions.get(index).position;
}
}
@@ -309,7 +195,7 @@
}
// TODO mmap the index file?
- BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile_), "r");
+ BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile), "r");
input.seek(start);
int i = 0;
try
@@ -334,7 +220,7 @@
}
if (v > 0)
return -1;
- } while (++i < indexInterval_);
+ } while (++i < INDEX_INTERVAL);
}
finally
{
@@ -346,12 +232,12 @@
/** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */
public long getNearestPosition(String decoratedKey) throws IOException
{
- long start = getIndexScanPosition(decoratedKey, partitioner_);
+ long start = getIndexScanPosition(decoratedKey, partitioner);
if (start < 0)
{
return 0;
}
- BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile_), "r");
+ BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile), "r");
input.seek(start);
try
{
@@ -367,7 +253,7 @@
return -1;
}
long position = input.readLong();
- int v = partitioner_.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey);
+ int v = partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey);
if (v >= 0)
return position;
}
@@ -388,9 +274,9 @@
IFileReader dataReader = null;
try
{
- dataReader = SequenceFile.reader(dataFile_);
- String decoratedKey = partitioner_.decorateKey(clientKey);
- long position = getPosition(decoratedKey, partitioner_);
+ dataReader = SequenceFile.reader(dataFile);
+ String decoratedKey = partitioner.decorateKey(clientKey);
+ long position = getPosition(decoratedKey, partitioner);
DataOutputBuffer bufOut = new DataOutputBuffer();
DataInputBuffer bufIn = new DataInputBuffer();
@@ -424,53 +310,19 @@
return next(clientKey, columnFamilyName, cnNames);
}
- private static String rename(String tmpFilename)
- {
- String filename = tmpFilename.replace("-" + temporaryFile_, "");
- new File(tmpFilename).renameTo(new File(filename));
- return filename;
- }
-
- /**
- * Renames temporary SSTable files to valid data, index, and bloom filter files
- */
- public void close() throws IOException
- {
- // bloom filter
- FileOutputStream fos = new FileOutputStream(filterFilename());
- DataOutputStream stream = new DataOutputStream(fos);
- BloomFilter.serializer().serialize(bf, stream);
- stream.flush();
- fos.getFD().sync();
- stream.close();
-
- // index
- indexRAF_.getChannel().force(true);
- indexRAF_.close();
-
- // main data
- dataWriter_.close(); // calls force
-
- rename(indexFilename());
- rename(filterFilename());
- dataFile_ = rename(dataFile_); // important to do this last since index & filter file names are derived from it
-
- openedFiles.put(dataFile_, this);
- }
-
/**
* obtain a BlockReader for the getColumnSlice call.
*/
public ColumnGroupReader getColumnGroupReader(String key, String cfName, String startColumn, boolean isAscending) throws IOException
{
- IFileReader dataReader = SequenceFile.reader(dataFile_);
+ IFileReader dataReader = SequenceFile.reader(dataFile);
try
{
/* Morph key into actual key based on the partition type. */
- String decoratedKey = partitioner_.decorateKey(key);
- long position = getPosition(decoratedKey, partitioner_);
- return new ColumnGroupReader(dataFile_, decoratedKey, cfName, startColumn, isAscending, position);
+ String decoratedKey = partitioner.decorateKey(key);
+ long position = getPosition(decoratedKey, partitioner);
+ return new ColumnGroupReader(dataFile, decoratedKey, cfName, startColumn, isAscending, position);
}
finally
{
@@ -480,10 +332,10 @@
public void delete() throws IOException
{
- FileUtils.deleteWithConfirm(new File(dataFile_));
- FileUtils.deleteWithConfirm(new File(indexFilename(dataFile_)));
- FileUtils.deleteWithConfirm(new File(filterFilename(dataFile_)));
- openedFiles.remove(dataFile_);
+ FileUtils.deleteWithConfirm(new File(dataFile));
+ FileUtils.deleteWithConfirm(new File(indexFilename(dataFile)));
+ FileUtils.deleteWithConfirm(new File(filterFilename(dataFile)));
+ openedFiles.remove(dataFile);
}
/** obviously only for testing */
@@ -494,17 +346,17 @@
static void reopenUnsafe() throws IOException // testing only
{
- Collection<SSTable> sstables = new ArrayList<SSTable>(openedFiles.values());
+ Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(openedFiles.values());
openedFiles.clear();
- for (SSTable sstable : sstables)
+ for (SSTableReader sstable : sstables)
{
- SSTable.open(sstable.dataFile_, sstable.partitioner_);
+ SSTableReader.open(sstable.dataFile, sstable.partitioner);
}
}
IPartitioner getPartitioner()
{
- return partitioner_;
+ return partitioner;
}
public FileStruct getFileStruct() throws IOException
@@ -514,48 +366,18 @@
public static void deleteAll() throws IOException
{
- for (SSTable sstable : openedFiles.values())
+ for (SSTableReader sstable : openedFiles.values())
{
sstable.delete();
}
}
-
- /**
- * This is a simple container for the index Key and its corresponding position
- * in the data file. Binary search is performed on a list of these objects
- * to lookup keys within the SSTable data file.
- *
- * All keys are decorated.
- */
- class KeyPosition implements Comparable<KeyPosition>
- {
- public final String key; // decorated
- public final long position;
-
- public KeyPosition(String key, long position)
- {
- this.key = key;
- this.position = position;
- }
-
- public int compareTo(KeyPosition kp)
- {
- return partitioner_.getDecoratedKeyComparator().compare(key, kp.key);
- }
-
- public String toString()
- {
- return key + ":" + position;
- }
- }
-
}
class FileSSTableMap
{
- private final Map<String, SSTable> map = new NonBlockingHashMap<String, SSTable>();
+ private final Map<String, SSTableReader> map = new NonBlockingHashMap<String, SSTableReader>();
- public SSTable get(String filename)
+ public SSTableReader get(String filename)
{
try
{
@@ -567,7 +389,7 @@
}
}
- public SSTable put(String filename, SSTable value)
+ public SSTableReader put(String filename, SSTableReader value)
{
try
{
@@ -579,7 +401,7 @@
}
}
- public Collection<SSTable> values()
+ public Collection<SSTableReader> values()
{
return map.values();
}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=791591&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Mon Jul 6 19:58:05 2009
@@ -0,0 +1,118 @@
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.DataOutputStream;
+import java.util.Comparator;
+import java.util.ArrayList;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.utils.BloomFilter;
+
+public class SSTableWriter extends SSTable
+{
+ private static Logger logger = Logger.getLogger(SSTableWriter.class);
+
+ private long keysWritten;
+ private IFileWriter dataWriter;
+ private BufferedRandomAccessFile indexRAF;
+ private String lastWrittenKey;
+ private BloomFilter bf;
+
+ public SSTableWriter(String filename, int keyCount, IPartitioner partitioner) throws IOException
+ {
+ super(filename, partitioner);
+ dataWriter = SequenceFile.bufferedWriter(dataFile, 4 * 1024 * 1024);
+ indexRAF = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
+ bf = new BloomFilter(keyCount, 15);
+ }
+
+ private long beforeAppend(String decoratedKey) throws IOException
+ {
+ if (decoratedKey == null)
+ {
+ throw new IOException("Keys must not be null.");
+ }
+ Comparator<String> c = partitioner.getDecoratedKeyComparator();
+ if (lastWrittenKey != null && c.compare(lastWrittenKey, decoratedKey) > 0)
+ {
+ logger.info("Last written key : " + lastWrittenKey);
+ logger.info("Current key : " + decoratedKey);
+ logger.info("Writing into file " + dataFile);
+ throw new IOException("Keys must be written in ascending order.");
+ }
+ return (lastWrittenKey == null) ? 0 : dataWriter.getCurrentPosition();
+ }
+
+ private void afterAppend(String decoratedKey, long position) throws IOException
+ {
+ bf.add(decoratedKey);
+ lastWrittenKey = decoratedKey;
+ long indexPosition = indexRAF.getFilePointer();
+ indexRAF.writeUTF(decoratedKey);
+ indexRAF.writeLong(position);
+ logger.trace("wrote " + decoratedKey + " at " + position);
+
+ if (keysWritten++ % INDEX_INTERVAL != 0)
+ return;
+ if (indexPositions == null)
+ {
+ indexPositions = new ArrayList<KeyPosition>();
+ }
+ indexPositions.add(new KeyPosition(decoratedKey, indexPosition));
+ logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
+ }
+
+ // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
+ public void append(String decoratedKey, DataOutputBuffer buffer) throws IOException
+ {
+ long currentPosition = beforeAppend(decoratedKey);
+ dataWriter.append(decoratedKey, buffer);
+ afterAppend(decoratedKey, currentPosition);
+ }
+
+ public void append(String decoratedKey, byte[] value) throws IOException
+ {
+ long currentPosition = beforeAppend(decoratedKey);
+ dataWriter.append(decoratedKey, value);
+ afterAppend(decoratedKey, currentPosition);
+ }
+
+ private static String rename(String tmpFilename)
+ {
+ String filename = tmpFilename.replace("-" + TEMPFILE_MARKER, "");
+ new File(tmpFilename).renameTo(new File(filename));
+ return filename;
+ }
+
+ /**
+ * Renames temporary SSTable files to valid data, index, and bloom filter files
+ */
+ public SSTableReader closeAndOpenReader() throws IOException
+ {
+ // bloom filter
+ FileOutputStream fos = new FileOutputStream(filterFilename());
+ DataOutputStream stream = new DataOutputStream(fos);
+ BloomFilter.serializer().serialize(bf, stream);
+ stream.flush();
+ fos.getFD().sync();
+ stream.close();
+
+ // index
+ indexRAF.getChannel().force(true);
+ indexRAF.close();
+
+ // main data
+ dataWriter.close(); // calls force
+
+ rename(indexFilename());
+ rename(filterFilename());
+ dataFile = rename(dataFile); // important to do this last since index & filter file names are derived from it
+
+ return new SSTableReader(dataFile, partitioner, indexPositions, bf);
+ }
+
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Mon Jul 6 19:58:05 2009
@@ -19,12 +19,7 @@
package org.apache.cassandra.io;
import java.io.*;
-import java.lang.reflect.Method;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -32,7 +27,6 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java Mon Jul 6 19:58:05 2009
@@ -18,30 +18,17 @@
package org.apache.cassandra.test;
-import java.io.FileInputStream;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Scanner;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.IFileWriter;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.LogUtil;
public class DBTest
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Mon Jul 6 19:58:05 2009
@@ -30,9 +30,7 @@
import static junit.framework.Assert.assertEquals;
import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.io.SSTableReader;
public class ColumnFamilyStoreTest extends CleanupHelper
{
@@ -114,7 +112,7 @@
rm.apply();
store.forceBlockingFlush();
- List<SSTable> ssTables = table.getAllSSTablesOnDisk();
+ List<SSTableReader> ssTables = table.getAllSSTablesOnDisk();
assertEquals(1, ssTables.size());
ssTables.get(0).forceBloomFilterFailures();
ColumnFamily cf = store.getColumnFamily("key2", "Standard1:Column1", new IdentityFilter());
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Mon Jul 6 19:58:05 2009
@@ -27,7 +27,7 @@
import org.junit.Test;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.CleanupHelper;
import static junit.framework.Assert.assertEquals;
@@ -42,7 +42,7 @@
final int ROWS_PER_SSTABLE = 10;
Set<String> inserted = new HashSet<String>();
- for (int j = 0; j < (SSTable.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+ for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
String key = String.valueOf(i % 2);
RowMutation rm = new RowMutation("Table1", key);
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Mon Jul 6 19:58:05 2009
@@ -26,7 +26,7 @@
import static junit.framework.Assert.*;
import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
public class TableTest extends CleanupHelper
{
@@ -134,7 +134,7 @@
table.getColumnFamilyStore("Standard2").forceBlockingFlush();
validateGetSliceNoMatch(table);
- Collection<SSTable> ssTables = table.getColumnFamilyStore("Standard2").getSSTables();
+ Collection<SSTableReader> ssTables = table.getColumnFamilyStore("Standard2").getSSTables();
assertEquals(1, ssTables.size());
ssTables.iterator().next().forceBloomFilterFailures();
validateGetSliceNoMatch(table);
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java Mon Jul 6 19:58:05 2009
@@ -35,27 +35,27 @@
File f = tempSSTableFileName();
// write test data
- SSTable ssTable = new SSTable(f.getAbsolutePath(), 1, new OrderPreservingPartitioner());
+ SSTableWriter writer = new SSTableWriter(f.getAbsolutePath(), 1, new OrderPreservingPartitioner());
Random random = new Random();
byte[] bytes = new byte[1024];
random.nextBytes(bytes);
String key = Integer.toString(1);
- ssTable.append(key, bytes);
- ssTable.close();
+ writer.append(key, bytes);
+ SSTableReader ssTable = writer.closeAndOpenReader();
// verify
verifySingle(ssTable, bytes, key);
- SSTable.reopenUnsafe(); // force reloading the index
+ SSTableReader.reopenUnsafe(); // force reloading the index
verifySingle(ssTable, bytes, key);
}
private File tempSSTableFileName() throws IOException
{
- return File.createTempFile("sstable", "-" + SSTable.temporaryFile_ + "-Data.db");
+ return File.createTempFile("sstable", "-" + SSTable.TEMPFILE_MARKER + "-Data.db");
}
- private void verifySingle(SSTable sstable, byte[] bytes, String key) throws IOException
+ private void verifySingle(SSTableReader sstable, byte[] bytes, String key) throws IOException
{
FileStruct fs = sstable.getFileStruct();
fs.seekTo(key);
@@ -76,22 +76,22 @@
}
// write
- SSTable ssTable = new SSTable(f.getAbsolutePath(), 1000, new OrderPreservingPartitioner());
+ SSTableWriter writer = new SSTableWriter(f.getAbsolutePath(), 1000, new OrderPreservingPartitioner());
for (String key: map.navigableKeySet())
{
- ssTable.append(key, map.get(key));
+ writer.append(key, map.get(key));
}
- ssTable.close();
+ SSTableReader ssTable = writer.closeAndOpenReader();
// verify
verifyMany(ssTable, map);
- SSTable.reopenUnsafe(); // force reloading the index
+ SSTableReader.reopenUnsafe(); // force reloading the index
verifyMany(ssTable, map);
}
- private void verifyMany(SSTable sstable, TreeMap<String, byte[]> map) throws IOException
+ private void verifyMany(SSTableReader sstable, TreeMap<String, byte[]> map) throws IOException
{
- List<String> keys = new ArrayList(map.keySet());
+ List<String> keys = new ArrayList<String>(map.keySet());
Collections.shuffle(keys);
FileStruct fs = sstable.getFileStruct();
for (String key : keys)