You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/06 21:58:06 UTC

svn commit: r791591 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/test/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/io/

Author: jbellis
Date: Mon Jul  6 19:58:05 2009
New Revision: 791591

URL: http://svn.apache.org/viewvc?rev=791591&view=rev
Log:
refactor sstable into SSTable, SSTableReader, and SSTableWriter.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-259

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
      - copied, changed from r791590, incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Mon Jul  6 19:58:05 2009
@@ -28,6 +28,8 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.service.StorageService;
 
@@ -143,7 +145,7 @@
         */
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
         List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
-        SSTable ssTable = new SSTable(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
+        SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
         Collections.sort(keys);
         /* Use this BloomFilter to decide if a key exists in a SSTable */
         for ( String key : keys )
@@ -152,11 +154,10 @@
             if ( bytes.length > 0 )
             {            	
                 /* Now write the key and value to disk */
-                ssTable.append(key, bytes);
+                writer.append(key, bytes);
             }
         }
-        ssTable.close();
-        cfStore.storeLocation(ssTable);
+        cfStore.storeLocation(writer.closeAndOpenReader());
         columnFamilies_.clear();       
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jul  6 19:58:05 2009
@@ -81,7 +81,7 @@
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
     /* SSTables on disk for this column family */
-    private SortedMap<String, SSTable> ssTables_ = new TreeMap<String, SSTable>(new FileNameComparator(FileNameComparator.Descending));
+    private SortedMap<String, SSTableReader> ssTables_ = new TreeMap<String, SSTableReader>(new FileNameComparator(FileNameComparator.Descending));
 
     /* Modification lock used for protecting reads from compactions. */
     private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -157,7 +157,7 @@
             for (File file : files)
             {
                 String filename = file.getName();
-                if (((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_))) && (filename.contains(columnFamily_)))
+                if (((file.length() == 0) || (filename.contains("-" + SSTable.TEMPFILE_MARKER))) && (filename.contains(columnFamily_)))
                 {
                     file.delete();
                     continue;
@@ -180,7 +180,7 @@
             String filename = file.getAbsolutePath();
             try
             {
-                SSTable sstable = SSTable.open(filename, StorageService.getPartitioner());
+                SSTableReader sstable = SSTableReader.open(filename, StorageService.getPartitioner());
                 ssTables_.put(filename, sstable);
             }
             catch (IOException ex)
@@ -231,7 +231,7 @@
         sb.append("Number of files on disk : " + ssTables_.size());
         sb.append(newLineSeparator);
         double totalSpace = 0d;
-        for (SSTable sstable: ssTables_.values())
+        for (SSTableReader sstable: ssTables_.values())
         {
             File f = new File(sstable.getFilename());
             totalSpace += f.length();
@@ -248,7 +248,7 @@
      * This is called after bootstrap to add the files
      * to the list of files maintained.
     */
-    void addToList(SSTable file)
+    void addToList(SSTableReader file)
     {
         lock_.writeLock().lock();
         try
@@ -376,7 +376,7 @@
     {
         fileIndexGenerator_.incrementAndGet();
         return String.format("%s-%s-%s-%s-Data.db",
-                             table_, columnFamily_, SSTable.temporaryFile_, fileIndexGenerator_.incrementAndGet());
+                             table_, columnFamily_, SSTable.TEMPFILE_MARKER, fileIndexGenerator_.incrementAndGet());
     }
 
     /*
@@ -401,7 +401,7 @@
         index = lowestIndex + 1;
 
         return String.format("%s-%s-%s-%s-Data.db",
-                             table_, columnFamily_, SSTable.temporaryFile_, index);
+                             table_, columnFamily_, SSTable.TEMPFILE_MARKER, index);
     }
 
     void switchMemtable()
@@ -549,7 +549,7 @@
         lock_.readLock().lock();
         try
         {
-            for (SSTable sstable : ssTables_.values())
+            for (SSTableReader sstable : ssTables_.values())
             {
                 ColumnFamily columnFamily = null;
                 try
@@ -573,7 +573,7 @@
         }
     }
 
-    private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, SSTable ssTable) throws IOException
+    private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, SSTableReader ssTable) throws IOException
     {
         DataInputBuffer bufIn;
         bufIn = filter.next(key, cf, ssTable);
@@ -720,7 +720,7 @@
      * param @ filename - filename just flushed to disk
      * param @ bf - bloom filter which indicates the keys that are in this file.
     */
-    void storeLocation(SSTable sstable)
+    void storeLocation(SSTableReader sstable)
     {
         int ssTableCount;
         lock_.writeLock().lock();
@@ -756,7 +756,7 @@
             {
                 try
                 {
-                    fs = SSTable.get(file).getFileStruct();
+                    fs = SSTableReader.get(file).getFileStruct();
                     fs.advance();
                     if (fs.isExhausted())
                     {
@@ -985,9 +985,9 @@
                 if (logger_.isDebugEnabled())
                   logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
                 assert newfile != null;
-                ssTables_.put(newfile, SSTable.open(newfile, StorageService.getPartitioner()));
+                ssTables_.put(newfile, SSTableReader.open(newfile, StorageService.getPartitioner()));
             }
-            SSTable.get(file).delete();
+            SSTableReader.get(file).delete();
         }
         finally
         {
@@ -1036,12 +1036,12 @@
         }
 
         mergedFileName = getTempSSTableFileName();
-        SSTable ssTableRange = null;
+        SSTableWriter rangeWriter = null;
         String lastkey = null;
         List<FileStruct> lfs = new ArrayList<FileStruct>();
         DataOutputBuffer bufOut = new DataOutputBuffer();
-        int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
-        expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+        int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(files);
+        expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
         if (logger_.isDebugEnabled())
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
         List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
@@ -1108,7 +1108,7 @@
                 }
                 if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
                 {
-                    if (ssTableRange == null)
+                    if (rangeWriter == null)
                     {
                         if (target != null)
                         {
@@ -1116,11 +1116,11 @@
                         }
                         FileUtils.createDirectory(rangeFileLocation);
                         String fname = new File(rangeFileLocation, mergedFileName).getAbsolutePath();
-                        ssTableRange = new SSTable(fname, expectedBloomFilterSize, StorageService.getPartitioner());
+                        rangeWriter = new SSTableWriter(fname, expectedBloomFilterSize, StorageService.getPartitioner());
                     }
                     try
                     {
-                        ssTableRange.append(lastkey, bufOut);
+                        rangeWriter.append(lastkey, bufOut);
                     }
                     catch (Exception ex)
                     {
@@ -1172,12 +1172,12 @@
             }
         }
 
-        if (ssTableRange != null)
+        if (rangeWriter != null)
         {
-            ssTableRange.close();
+            rangeWriter.closeAndOpenReader();
             if (fileList != null)
             {
-                fileList.add(ssTableRange.getFilename());
+                fileList.add(rangeWriter.getFilename());
             }
         }
 
@@ -1235,12 +1235,13 @@
         }
 
         String mergedFileName = getTempFileName(files);
-        SSTable ssTable = null;
+        SSTableWriter writer = null;
+        SSTableReader ssTable = null;
         String lastkey = null;
         List<FileStruct> lfs = new ArrayList<FileStruct>();
         DataOutputBuffer bufOut = new DataOutputBuffer();
-        int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
-        expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+        int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(files);
+        expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
         if (logger_.isDebugEnabled())
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
         List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
@@ -1305,12 +1306,12 @@
                     bufOut.write(filestruct.getBufIn(), size);
                 }
 
-                if (ssTable == null)
+                if (writer == null)
                 {
                     String fname = new File(compactionFileLocation, mergedFileName).getAbsolutePath();
-                    ssTable = new SSTable(fname, expectedBloomFilterSize, StorageService.getPartitioner());
+                    writer = new SSTableWriter(fname, expectedBloomFilterSize, StorageService.getPartitioner());
                 }
-                ssTable.append(lastkey, bufOut);
+                writer.append(lastkey, bufOut);
                 totalkeysWritten++;
 
                 for (FileStruct filestruct : lfs)
@@ -1343,11 +1344,11 @@
                 }
             }
         }
-        if (ssTable != null)
+        if (writer != null)
         {
             // TODO if all the keys were the same nothing will be done here
-            ssTable.close();
-            newfile = ssTable.getFilename();
+            ssTable = writer.closeAndOpenReader();
+            newfile = writer.getFilename();
         }
         lock_.writeLock().lock();
         try
@@ -1363,7 +1364,7 @@
             }
             for (String file : files)
             {
-                SSTable.get(file).delete();
+                SSTableReader.get(file).delete();
             }
         }
         finally
@@ -1498,7 +1499,7 @@
     }
 
     /** not threadsafe.  caller must have lock_ acquired. */
-    public Collection<SSTable> getSSTables()
+    public Collection<SSTableReader> getSSTables()
     {
         return Collections.unmodifiableCollection(ssTables_.values());
     }
@@ -1560,8 +1561,8 @@
             }
 
             /* add the SSTables on disk */
-            List<SSTable> sstables = new ArrayList<SSTable>(ssTables_.values());
-            for (SSTable sstable : sstables)
+            List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_.values());
+            for (SSTableReader sstable : sstables)
             {
                 iter = new SSTableColumnIterator(sstable.getFilename(), key, cfName, startColumn, isAscending);
                 if (iter.hasNext())

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIterator.java Mon Jul  6 19:58:05 2009
@@ -23,7 +23,7 @@
 
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
 import org.apache.cassandra.service.StorageService;
 import com.google.common.collect.AbstractIterator;
@@ -58,7 +58,7 @@
     throws IOException
     {
         this.isAscending = isAscending;
-        SSTable ssTable = SSTable.open(filename, StorageService.getPartitioner());
+        SSTableReader ssTable = SSTableReader.open(filename, StorageService.getPartitioner());
         reader = ssTable.getColumnGroupReader(key, cfName, startColumn, isAscending);
         this.startColumn = startColumn;
         curColumnIndex = isAscending ? 0 : -1;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java Mon Jul  6 19:58:05 2009
@@ -23,13 +23,11 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 
 import org.apache.log4j.Logger;
 
@@ -46,11 +44,11 @@
         
         try
         {
-            List<SSTable> ssTables = Table.open(table).getAllSSTablesOnDisk();
+            List<SSTableReader> ssTables = Table.open(table).getAllSSTablesOnDisk();
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream(bos);
             dos.writeInt(ssTables.size());
-            for (SSTable sstable : ssTables)
+            for (SSTableReader sstable : ssTables)
             {
                 dos.writeUTF(sstable.getFilename());
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFilter.java Mon Jul  6 19:58:05 2009
@@ -22,12 +22,12 @@
 import java.io.IOException;
 
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 
 
 public interface IFilter
 {
 	public ColumnFamily filter(String cfName, ColumnFamily cf);
     public IColumn filter(IColumn column, DataInputStream dis) throws IOException;
-    public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException;
+    public DataInputBuffer next(String key, String cf, SSTableReader ssTable) throws IOException;
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IdentityFilter.java Mon Jul  6 19:58:05 2009
@@ -20,10 +20,9 @@
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.util.Collection;
 
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 
 
 public class IdentityFilter implements IFilter
@@ -38,7 +37,7 @@
 		return column;
 	}
 
-	public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+	public DataInputBuffer next(String key, String cf, SSTableReader ssTable) throws IOException
 	{
 		return ssTable.next(key, cf);
 	}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Jul  6 19:58:05 2009
@@ -29,7 +29,8 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.DestructivePQIterator;
 import org.apache.log4j.Logger;
@@ -250,7 +251,7 @@
         logger_.info("Flushing " + this);
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
 
-        SSTable ssTable = new SSTable(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
+        SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
 
         // sort keys in the order they would be in when decorated
         final IPartitioner partitioner = StorageService.getPartitioner();
@@ -273,10 +274,10 @@
                 /* serialize the cf with column indexes */
                 ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
                 /* Now write the key and value to disk */
-                ssTable.append(partitioner.decorateKey(key), buffer);
+                writer.append(partitioner.decorateKey(key), buffer);
             }
         }
-        ssTable.close();
+        SSTableReader ssTable = writer.closeAndOpenReader();
         cfStore.onMemtableFlush(cLogCtx);
         cfStore.storeLocation(ssTable);
         buffer.close();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/NamesFilter.java Mon Jul  6 19:58:05 2009
@@ -23,9 +23,8 @@
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 
 
 
@@ -92,7 +91,7 @@
         return column;
     }
 
-    public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+    public DataInputBuffer next(String key, String cf, SSTableReader ssTable) throws IOException
     {
     	return ssTable.next(key, cf, names_, null);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeFilter.java Mon Jul  6 19:58:05 2009
@@ -23,7 +23,7 @@
 import java.util.Collection;
 
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 
 /**
  * Filters columns to satisfy colmin <= colname <= colmax
@@ -86,7 +86,7 @@
         return null;
     }
 
-    public DataInputBuffer next(String key, String cf, SSTable ssTable)
+    public DataInputBuffer next(String key, String cf, SSTableReader ssTable)
             throws IOException
     {
         return ssTable.next(key, cf);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Jul  6 19:58:05 2009
@@ -33,8 +33,7 @@
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.FileStruct;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
@@ -186,7 +185,7 @@
                  * list of the associated Column Family. Also merge the CBF into the
                  * sampler.
                 */                
-                SSTable sstable = SSTable.open(streamContext.getTargetFile(), StorageService.getPartitioner());
+                SSTableReader sstable = SSTableReader.open(streamContext.getTargetFile(), StorageService.getPartitioner());
                 logger_.debug("Merging the counting bloom filter in the sampler ...");                
                 String[] peices = FBUtilities.strip(fileName, "-");
                 Table.open(peices[0]).getColumnFamilyStore(peices[1]).addToList(sstable);                
@@ -454,9 +453,9 @@
     /*
      * Get the list of all SSTables on disk.  Not safe unless you aquire the CFS readlocks!
     */
-    public List<SSTable> getAllSSTablesOnDisk()
+    public List<SSTableReader> getAllSSTablesOnDisk()
     {
-        List<SSTable> list = new ArrayList<SSTable>();
+        List<SSTableReader> list = new ArrayList<SSTableReader>();
         Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
         for ( String columnFamily : columnFamilies )
         {
@@ -778,7 +777,7 @@
             }
 
             // sstables
-            for (SSTable sstable : cfs.getSSTables())
+            for (SSTableReader sstable : cfs.getSSTables())
             {
                 FileStruct fs = sstable.getFileStruct();
                 fs.seekTo(startWith);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TimeFilter.java Mon Jul  6 19:58:05 2009
@@ -21,10 +21,9 @@
 import java.io.IOException;
 import java.util.Collection;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.IndexHelper;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 
 
 /**
@@ -122,7 +121,7 @@
         return column;
     }
 
-    public DataInputBuffer next(String key, String cfName, SSTable ssTable) throws IOException
+    public DataInputBuffer next(String key, String cfName, SSTableReader ssTable) throws IOException
     {
         return ssTable.next(key, cfName, null, new IndexHelper.TimeRange(timeLimit_, Long.MAX_VALUE));
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Mon Jul  6 19:58:05 2009
@@ -24,8 +24,8 @@
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.SSTableReader;
+
 import org.apache.log4j.Logger;
 import com.google.common.collect.AbstractIterator;
 
@@ -39,10 +39,10 @@
     private IFileReader reader;
     private DataInputBuffer bufIn;
     private DataOutputBuffer bufOut;
-    private SSTable sstable;
+    private SSTableReader sstable;
     private FileStructIterator iterator;
 
-    FileStruct(SSTable sstable) throws IOException
+    FileStruct(SSTableReader sstable) throws IOException
     {
         this.reader = SequenceFile.bufferedReader(sstable.getFilename(), 1024 * 1024);
         this.sstable = sstable;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Mon Jul  6 19:58:05 2009
@@ -1,596 +1,104 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.FileUtils;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
-
-/**
- * This class is built on top of the SequenceFile. It stores
- * data on disk in sorted fashion. However the sorting is upto
- * the application. This class expects keys to be handed to it
- * in sorted order.
- *
- * A separate index file is maintained as well, containing the
- * SSTable keys and the offset into the SSTable at which they are found.
- * Every 1/indexInterval key is read into memory when the SSTable is opened.
- *
- * Finally, a bloom filter file is also kept for the keys in each SSTable.
- */
-
-public class SSTable
-{
-    private static Logger logger_ = Logger.getLogger(SSTable.class);
-    /* Every 128th index entry is loaded into memory so we know where to start looking for the actual key w/o seeking */
-    private static final int indexInterval_ = 128;
-    /* Required extension for temporary files created during compactions. */
-    public static final String temporaryFile_ = "tmp";
-
-    private static FileSSTableMap openedFiles = new FileSSTableMap();
-
-    private ConcurrentLinkedHashMap<String, Long> keyCache = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, 1000);
-
-
-    public static int indexInterval()
-    {
-        return indexInterval_;
-    }
-
-    // todo can we refactor to take list of sstables?
-    public static int getApproximateKeyCount(List<String> dataFiles)
-    {
-        int count = 0;
-
-        for (String dataFileName : dataFiles)
-        {
-            SSTable sstable = openedFiles.get(dataFileName);
-            assert sstable != null;
-            int indexKeyCount = sstable.getIndexPositions().size();
-            count = count + (indexKeyCount + 1) * indexInterval_;
-            if (logger_.isDebugEnabled())
-              logger_.debug("index size for bloom filter calc for file  : " + dataFileName + "   : " + count);
-        }
-
-        return count;
-    }
-
-    /**
-     * Get all indexed keys in the SSTable.
-     */
-    public static List<String> getIndexedKeys()
-    {
-        List<String> indexedKeys = new ArrayList<String>();
-
-        for (SSTable sstable : openedFiles.values())
-        {
-            for (KeyPosition kp : sstable.getIndexPositions())
-            {
-                indexedKeys.add(kp.key);
-            }
-        }
-        Collections.sort(indexedKeys);
-
-        return indexedKeys;
-    }
-
-    String dataFile_;
-    private long keysWritten;
-    private IFileWriter dataWriter_;
-    private BufferedRandomAccessFile indexRAF_;
-    private String lastWrittenKey_;
-    private IPartitioner partitioner_;
-    List<KeyPosition> indexPositions_;
-    BloomFilter bf;
-
-    public static synchronized SSTable open(String dataFileName, IPartitioner partitioner) throws IOException
-    {
-        SSTable sstable = openedFiles.get(dataFileName);
-        if (sstable == null)
-        {
-            assert partitioner != null;
-            sstable = new SSTable(dataFileName, partitioner);
-
-            long start = System.currentTimeMillis();
-            sstable.loadIndexFile();
-            sstable.loadBloomFilter();
-            if (logger_.isDebugEnabled())
-              logger_.debug("INDEX LOAD TIME for "  + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
-
-            openedFiles.put(dataFileName, sstable);
-        }
-        return sstable;
-    }
-
-    public static synchronized SSTable get(String dataFileName) throws IOException
-    {
-        SSTable sstable = openedFiles.get(dataFileName);
-        assert sstable != null;
-        return sstable;
-    }
-
-    private SSTable(String filename, IPartitioner partitioner)
-    {
-        assert filename.endsWith("-Data.db");
-        dataFile_ = filename;
-        partitioner_ = partitioner;
-    }
-
-    public SSTable(String filename, int keyCount, IPartitioner partitioner) throws IOException
-    {
-        assert filename.endsWith("-Data.db");
-        dataFile_ = filename;
-        partitioner_ = partitioner;
-        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4 * 1024 * 1024);
-        indexRAF_ = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
-        bf = new BloomFilter(keyCount, 15);
-    }
-
-    static String parseTableName(String filename)
-    {
-        String[] parts = new File(filename).getName().split("-"); // table, cf, index, [filetype]
-        return parts[0];
-    }
-
-    public List<KeyPosition> getIndexPositions()
-    {
-        return indexPositions_;
-    }
-
-    private void loadBloomFilter() throws IOException
-    {
-        DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename()));
-        bf = BloomFilter.serializer().deserialize(stream);
-    }
-
-    private void loadIndexFile() throws IOException
-    {
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
-        indexPositions_ = new ArrayList<KeyPosition>();
-
-        int i = 0;
-        long indexSize = input.length();
-        while (true)
-        {
-            long indexPosition = input.getFilePointer();
-            if (indexPosition == indexSize)
-            {
-                break;
-            }
-            String decoratedKey = input.readUTF();
-            input.readLong();
-            if (i++ % indexInterval_ == 0)
-            {
-                indexPositions_.add(new KeyPosition(decoratedKey, indexPosition));
-            }
-        }
-    }
-
-    private static String indexFilename(String dataFile)
-    {
-        String[] parts = dataFile.split("-");
-        parts[parts.length - 1] = "Index.db";
-        return StringUtils.join(parts, "-");
-    }
-    private String indexFilename()
-    {
-        return indexFilename(dataFile_);
-    }
-
-    private static String filterFilename(String dataFile)
-    {
-        String[] parts = dataFile.split("-");
-        parts[parts.length - 1] = "Filter.db";
-        return StringUtils.join(parts, "-");
-    }
-    private String filterFilename()
-    {
-        return filterFilename(dataFile_);
-    }
-
-    public String getFilename()
-    {
-        return dataFile_;
-    }
-
-    private long beforeAppend(String decoratedKey) throws IOException
-    {
-        if (decoratedKey == null)
-        {
-            throw new IOException("Keys must not be null.");
-        }
-        Comparator<String> c = partitioner_.getDecoratedKeyComparator();
-        if (lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) > 0)
-        {
-            logger_.info("Last written key : " + lastWrittenKey_);
-            logger_.info("Current key : " + decoratedKey);
-            logger_.info("Writing into file " + dataFile_);
-            throw new IOException("Keys must be written in ascending order.");
-        }
-        return (lastWrittenKey_ == null) ? 0 : dataWriter_.getCurrentPosition();
-    }
-
-    private void afterAppend(String decoratedKey, long position) throws IOException
-    {
-        bf.add(decoratedKey);
-        lastWrittenKey_ = decoratedKey;
-        long indexPosition = indexRAF_.getFilePointer();
-        indexRAF_.writeUTF(decoratedKey);
-        indexRAF_.writeLong(position);
-        logger_.trace("wrote " + decoratedKey + " at " + position);
-
-        if (keysWritten++ % indexInterval_ != 0)
-            return;
-        if (indexPositions_ == null)
-        {
-            indexPositions_ = new ArrayList<KeyPosition>();
-        }
-        indexPositions_.add(new KeyPosition(decoratedKey, indexPosition));
-        logger_.trace("wrote index of " + decoratedKey + " at " + indexPosition);
-    }
-
-    // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
-    public void append(String decoratedKey, DataOutputBuffer buffer) throws IOException
-    {
-        long currentPosition = beforeAppend(decoratedKey);
-        dataWriter_.append(decoratedKey, buffer);
-        afterAppend(decoratedKey, currentPosition);
-    }
-
-    public void append(String decoratedKey, byte[] value) throws IOException
-    {
-        long currentPosition = beforeAppend(decoratedKey);
-        dataWriter_.append(decoratedKey, value);
-        afterAppend(decoratedKey, currentPosition);
-    }
-
-    /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
-    private long getIndexScanPosition(String decoratedKey, IPartitioner partitioner)
-    {
-        assert indexPositions_ != null && indexPositions_.size() > 0;
-        int index = Collections.binarySearch(indexPositions_, new KeyPosition(decoratedKey, -1));
-        if (index < 0)
-        {
-            // binary search gives us the first index _greater_ than the key searched for,
-            // i.e., its insertion position
-            int greaterThan = (index + 1) * -1;
-            if (greaterThan == 0)
-                return -1;
-            return indexPositions_.get(greaterThan - 1).position;
-        }
-        else
-        {
-            return indexPositions_.get(index).position;
-        }
-    }
-
-    /**
-     * returns the position in the data file to find the given key, or -1 if the key is not present
-     */
-    public long getPosition(String decoratedKey, IPartitioner partitioner) throws IOException
-    {
-        if (!bf.isPresent(decoratedKey))
-            return -1;
-        Long cachedPosition = keyCache.get(decoratedKey);
-        if (cachedPosition != null)
-        {
-            return cachedPosition;
-        }
-        long start = getIndexScanPosition(decoratedKey, partitioner);
-        if (start < 0)
-        {
-            return -1;
-        }
-
-        // TODO mmap the index file?
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile_), "r");
-        input.seek(start);
-        int i = 0;
-        try
-        {
-            do
-            {
-                String indexDecoratedKey;
-                try
-                {
-                    indexDecoratedKey = input.readUTF();
-                }
-                catch (EOFException e)
-                {
-                    return -1;
-                }
-                long position = input.readLong();
-                int v = partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey);
-                if (v == 0)
-                {
-                    keyCache.put(decoratedKey, position);
-                    return position;
-                }
-                if (v > 0)
-                    return -1;
-            } while  (++i < indexInterval_);
-        }
-        finally
-        {
-            input.close();
-        }
-        return -1;
-    }
-
-    /** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */
-    public long getNearestPosition(String decoratedKey) throws IOException
-    {
-        long start = getIndexScanPosition(decoratedKey, partitioner_);
-        if (start < 0)
-        {
-            return 0;
-        }
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile_), "r");
-        input.seek(start);
-        try
-        {
-            while (true)
-            {
-                String indexDecoratedKey;
-                try
-                {
-                    indexDecoratedKey = input.readUTF();
-                }
-                catch (EOFException e)
-                {
-                    return -1;
-                }
-                long position = input.readLong();
-                int v = partitioner_.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey);
-                if (v >= 0)
-                    return position;
-            }
-        }
-        finally
-        {
-            input.close();
-        }
-    }
-
-    public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames) throws IOException
-    {
-        return next(clientKey, cfName, columnNames, null);
-    }
-
-    public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames, IndexHelper.TimeRange timeRange) throws IOException
-    {
-        IFileReader dataReader = null;
-        try
-        {
-            dataReader = SequenceFile.reader(dataFile_);
-            String decoratedKey = partitioner_.decorateKey(clientKey);
-            long position = getPosition(decoratedKey, partitioner_);
-
-            DataOutputBuffer bufOut = new DataOutputBuffer();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, timeRange, position);
-            if (bytesRead != -1L)
-            {
-                if (bufOut.getLength() > 0)
-                {
-                    bufIn.reset(bufOut.getData(), bufOut.getLength());
-                    /* read the key even though we do not use it */
-                    bufIn.readUTF();
-                    bufIn.readInt();
-                }
-            }
-            return bufIn;
-        }
-        finally
-        {
-            if (dataReader != null)
-            {
-                dataReader.close();
-            }
-        }
-    }
-
-    public DataInputBuffer next(String clientKey, String columnFamilyColumn) throws IOException
-    {
-        String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
-        String columnFamilyName = values[0];
-        List<String> cnNames = (values.length == 1) ? null : Arrays.asList(values[1]);
-        return next(clientKey, columnFamilyName, cnNames);
-    }
-
-    private static String rename(String tmpFilename)
-    {
-        String filename = tmpFilename.replace("-" + temporaryFile_, "");
-        new File(tmpFilename).renameTo(new File(filename));
-        return filename;
-    }
-
-    /**
-     * Renames temporary SSTable files to valid data, index, and bloom filter files
-     */
-    public void close() throws IOException
-    {
-        // bloom filter
-        FileOutputStream fos = new FileOutputStream(filterFilename());
-        DataOutputStream stream = new DataOutputStream(fos);
-        BloomFilter.serializer().serialize(bf, stream);
-        stream.flush();
-        fos.getFD().sync();
-        stream.close();
-
-        // index
-        indexRAF_.getChannel().force(true);
-        indexRAF_.close();
-
-        // main data
-        dataWriter_.close(); // calls force
-
-        rename(indexFilename());
-        rename(filterFilename());
-        dataFile_ = rename(dataFile_); // important to do this last since index & filter file names are derived from it
-
-        openedFiles.put(dataFile_, this);
-    }
-
-    /**
-     * obtain a BlockReader for the getColumnSlice call.
-     */
-    public ColumnGroupReader getColumnGroupReader(String key, String cfName, String startColumn, boolean isAscending) throws IOException
-    {
-        IFileReader dataReader = SequenceFile.reader(dataFile_);
-
-        try
-        {
-            /* Morph key into actual key based on the partition type. */
-            String decoratedKey = partitioner_.decorateKey(key);
-            long position = getPosition(decoratedKey, partitioner_);
-            return new ColumnGroupReader(dataFile_, decoratedKey, cfName, startColumn, isAscending, position);
-        }
-        finally
-        {
-            dataReader.close();
-        }
-    }
-
-    public void delete() throws IOException
-    {
-        FileUtils.deleteWithConfirm(new File(dataFile_));
-        FileUtils.deleteWithConfirm(new File(indexFilename(dataFile_)));
-        FileUtils.deleteWithConfirm(new File(filterFilename(dataFile_)));
-        openedFiles.remove(dataFile_);
-    }
-
-    /** obviously only for testing */
-    public void forceBloomFilterFailures()
-    {
-        bf = BloomFilter.alwaysMatchingBloomFilter();
-    }
-
-    static void reopenUnsafe() throws IOException // testing only
-    {
-        Collection<SSTable> sstables = new ArrayList<SSTable>(openedFiles.values());
-        openedFiles.clear();
-        for (SSTable sstable : sstables)
-        {
-            SSTable.open(sstable.dataFile_, sstable.partitioner_);
-        }
-    }
-
-    IPartitioner getPartitioner()
-    {
-        return partitioner_;
-    }
-
-    public FileStruct getFileStruct() throws IOException
-    {
-        return new FileStruct(this);
-    }
-
-    public static void deleteAll() throws IOException
-    {
-        for (SSTable sstable : openedFiles.values())
-        {
-            sstable.delete();
-        }
-    }
-
-    /**
-     * This is a simple container for the index Key and its corresponding position
-     * in the data file. Binary search is performed on a list of these objects
-     * to lookup keys within the SSTable data file.
-     *
-     * All keys are decorated.
-     */
-    class KeyPosition implements Comparable<KeyPosition>
-    {
-        public final String key; // decorated
-        public final long position;
-
-        public KeyPosition(String key, long position)
-        {
-            this.key = key;
-            this.position = position;
-        }
-
-        public int compareTo(KeyPosition kp)
-        {
-            return partitioner_.getDecoratedKeyComparator().compare(key, kp.key);
-        }
-
-        public String toString()
-        {
-            return key + ":" + position;
-        }
-    }
-
-}
-
-class FileSSTableMap
-{
-    private final Map<String, SSTable> map = new NonBlockingHashMap<String, SSTable>();
-
-    public SSTable get(String filename)
-    {
-        try
-        {
-            return map.get(new File(filename).getCanonicalPath());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public SSTable put(String filename, SSTable value)
-    {
-        try
-        {
-            return map.put(new File(filename).getCanonicalPath(), value);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Collection<SSTable> values()
-    {
-        return map.values();
-    }
-
-    public void clear()
-    {
-        map.clear();
-    }
-
-    public void remove(String filename) throws IOException
-    {
-        map.remove(new File(filename).getCanonicalPath());
-    }
-}
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.utils.BloomFilter;
+
+/**
+ * This class is built on top of the SequenceFile. It stores
+ * data on disk in sorted fashion. However the sorting is upto
+ * the application. This class expects keys to be handed to it
+ * in sorted order.
+ *
+ * A separate index file is maintained as well, containing the
+ * SSTable keys and the offset into the SSTable at which they are found.
+ * Every 1/indexInterval key is read into memory when the SSTable is opened.
+ *
+ * Finally, a bloom filter file is also kept for the keys in each SSTable.
+ */
+public abstract class SSTable
+{
+    protected String dataFile;
+    protected IPartitioner partitioner;
+    protected BloomFilter bf;
+    protected List<KeyPosition> indexPositions;
+
+    /* Every 128th index entry is loaded into memory so we know where to start looking for the actual key w/o seeking */
+    public static final int INDEX_INTERVAL = 128;/* Required extension for temporary files created during compactions. */
+    public static final String TEMPFILE_MARKER = "tmp";
+
+    public SSTable(String filename, IPartitioner partitioner)
+    {
+        assert filename.endsWith("-Data.db");
+        this.dataFile = filename;
+        this.partitioner = partitioner;
+    }
+
+    protected static String indexFilename(String dataFile)
+    {
+        String[] parts = dataFile.split("-");
+        parts[parts.length - 1] = "Index.db";
+        return StringUtils.join(parts, "-");
+    }
+
+    protected String indexFilename()
+    {
+        return indexFilename(dataFile);
+    }
+
+    protected static String filterFilename(String dataFile)
+    {
+        String[] parts = dataFile.split("-");
+        parts[parts.length - 1] = "Filter.db";
+        return StringUtils.join(parts, "-");
+    }
+
+    protected String filterFilename()
+    {
+        return filterFilename(dataFile);
+    }
+
+    public String getFilename()
+    {
+        return dataFile;
+    }
+
+    static String parseTableName(String filename)
+    {
+        String[] parts = new File(filename).getName().split("-"); // table, cf, index, [filetype]
+        return parts[0];
+    }
+
+    /**
+     * This is a simple container for the index Key and its corresponding position
+     * in the data file. Binary search is performed on a list of these objects
+     * to lookup keys within the SSTable data file.
+     *
+     * All keys are decorated.
+     */
+    class KeyPosition implements Comparable<KeyPosition>
+    {
+        public final String key; // decorated
+        public final long position;
+
+        public KeyPosition(String key, long position)
+        {
+            this.key = key;
+            this.position = position;
+        }
+
+        public int compareTo(KeyPosition kp)
+        {
+            return partitioner.getDecoratedKeyComparator().compare(key, kp.key);
+        }
+
+        public String toString()
+        {
+            return key + ":" + position;
+        }
+    }
+}

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (from r791590, incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java&r1=791590&r2=791591&rev=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Mon Jul  6 19:58:05 2009
@@ -22,7 +22,6 @@
 import java.util.*;
 
 import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.dht.IPartitioner;
@@ -32,35 +31,15 @@
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
 
-/**
- * This class is built on top of the SequenceFile. It stores
- * data on disk in sorted fashion. However the sorting is upto
- * the application. This class expects keys to be handed to it
- * in sorted order.
- *
- * A separate index file is maintained as well, containing the
- * SSTable keys and the offset into the SSTable at which they are found.
- * Every 1/indexInterval key is read into memory when the SSTable is opened.
- *
- * Finally, a bloom filter file is also kept for the keys in each SSTable.
- */
-
-public class SSTable
+public class SSTableReader extends SSTable
 {
-    private static Logger logger_ = Logger.getLogger(SSTable.class);
-    /* Every 128th index entry is loaded into memory so we know where to start looking for the actual key w/o seeking */
-    private static final int indexInterval_ = 128;
-    /* Required extension for temporary files created during compactions. */
-    public static final String temporaryFile_ = "tmp";
+    private static Logger logger = Logger.getLogger(SSTableReader.class);
 
     private static FileSSTableMap openedFiles = new FileSSTableMap();
 
-    private ConcurrentLinkedHashMap<String, Long> keyCache = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, 1000);
-
-
     public static int indexInterval()
     {
-        return indexInterval_;
+        return INDEX_INTERVAL;
     }
 
     // todo can we refactor to take list of sstables?
@@ -70,12 +49,11 @@
 
         for (String dataFileName : dataFiles)
         {
-            SSTable sstable = openedFiles.get(dataFileName);
+            SSTableReader sstable = openedFiles.get(dataFileName);
             assert sstable != null;
             int indexKeyCount = sstable.getIndexPositions().size();
-            count = count + (indexKeyCount + 1) * indexInterval_;
-            if (logger_.isDebugEnabled())
-              logger_.debug("index size for bloom filter calc for file  : " + dataFileName + "   : " + count);
+            count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
+            logger.debug("index size for bloom filter calc for file  : " + dataFileName + "   : " + count);
         }
 
         return count;
@@ -88,7 +66,7 @@
     {
         List<String> indexedKeys = new ArrayList<String>();
 
-        for (SSTable sstable : openedFiles.values())
+        for (SSTableReader sstable : openedFiles.values())
         {
             for (KeyPosition kp : sstable.getIndexPositions())
             {
@@ -100,67 +78,53 @@
         return indexedKeys;
     }
 
-    String dataFile_;
-    private long keysWritten;
-    private IFileWriter dataWriter_;
-    private BufferedRandomAccessFile indexRAF_;
-    private String lastWrittenKey_;
-    private IPartitioner partitioner_;
-    List<KeyPosition> indexPositions_;
-    BloomFilter bf;
-
-    public static synchronized SSTable open(String dataFileName, IPartitioner partitioner) throws IOException
+    public static synchronized SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
     {
-        SSTable sstable = openedFiles.get(dataFileName);
+        SSTableReader sstable = openedFiles.get(dataFileName);
         if (sstable == null)
         {
             assert partitioner != null;
-            sstable = new SSTable(dataFileName, partitioner);
+            sstable = new SSTableReader(dataFileName, partitioner);
 
             long start = System.currentTimeMillis();
             sstable.loadIndexFile();
             sstable.loadBloomFilter();
-            if (logger_.isDebugEnabled())
-              logger_.debug("INDEX LOAD TIME for "  + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
+            logger.debug("INDEX LOAD TIME for "  + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
 
             openedFiles.put(dataFileName, sstable);
         }
         return sstable;
     }
 
-    public static synchronized SSTable get(String dataFileName) throws IOException
+    public static synchronized SSTableReader get(String dataFileName) throws IOException
     {
-        SSTable sstable = openedFiles.get(dataFileName);
+        SSTableReader sstable = openedFiles.get(dataFileName);
         assert sstable != null;
         return sstable;
     }
 
-    private SSTable(String filename, IPartitioner partitioner)
-    {
-        assert filename.endsWith("-Data.db");
-        dataFile_ = filename;
-        partitioner_ = partitioner;
-    }
 
-    public SSTable(String filename, int keyCount, IPartitioner partitioner) throws IOException
+    private ConcurrentLinkedHashMap<String, Long> keyCache = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, 1000);
+
+    SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter)
     {
-        assert filename.endsWith("-Data.db");
-        dataFile_ = filename;
-        partitioner_ = partitioner;
-        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4 * 1024 * 1024);
-        indexRAF_ = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
-        bf = new BloomFilter(keyCount, 15);
+        super(filename, partitioner);
+        this.indexPositions = indexPositions;
+        this.bf = bloomFilter;
+        synchronized (SSTableReader.this)
+        {
+            openedFiles.put(filename, this);
+        }
     }
 
-    static String parseTableName(String filename)
+    private SSTableReader(String filename, IPartitioner partitioner)
     {
-        String[] parts = new File(filename).getName().split("-"); // table, cf, index, [filetype]
-        return parts[0];
+        super(filename, partitioner);
     }
 
     public List<KeyPosition> getIndexPositions()
     {
-        return indexPositions_;
+        return indexPositions;
     }
 
     private void loadBloomFilter() throws IOException
@@ -172,7 +136,7 @@
     private void loadIndexFile() throws IOException
     {
         BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
-        indexPositions_ = new ArrayList<KeyPosition>();
+        indexPositions = new ArrayList<KeyPosition>();
 
         int i = 0;
         long indexSize = input.length();
@@ -185,96 +149,18 @@
             }
             String decoratedKey = input.readUTF();
             input.readLong();
-            if (i++ % indexInterval_ == 0)
+            if (i++ % INDEX_INTERVAL == 0)
             {
-                indexPositions_.add(new KeyPosition(decoratedKey, indexPosition));
+                indexPositions.add(new KeyPosition(decoratedKey, indexPosition));
             }
         }
     }
 
-    private static String indexFilename(String dataFile)
-    {
-        String[] parts = dataFile.split("-");
-        parts[parts.length - 1] = "Index.db";
-        return StringUtils.join(parts, "-");
-    }
-    private String indexFilename()
-    {
-        return indexFilename(dataFile_);
-    }
-
-    private static String filterFilename(String dataFile)
-    {
-        String[] parts = dataFile.split("-");
-        parts[parts.length - 1] = "Filter.db";
-        return StringUtils.join(parts, "-");
-    }
-    private String filterFilename()
-    {
-        return filterFilename(dataFile_);
-    }
-
-    public String getFilename()
-    {
-        return dataFile_;
-    }
-
-    private long beforeAppend(String decoratedKey) throws IOException
-    {
-        if (decoratedKey == null)
-        {
-            throw new IOException("Keys must not be null.");
-        }
-        Comparator<String> c = partitioner_.getDecoratedKeyComparator();
-        if (lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) > 0)
-        {
-            logger_.info("Last written key : " + lastWrittenKey_);
-            logger_.info("Current key : " + decoratedKey);
-            logger_.info("Writing into file " + dataFile_);
-            throw new IOException("Keys must be written in ascending order.");
-        }
-        return (lastWrittenKey_ == null) ? 0 : dataWriter_.getCurrentPosition();
-    }
-
-    private void afterAppend(String decoratedKey, long position) throws IOException
-    {
-        bf.add(decoratedKey);
-        lastWrittenKey_ = decoratedKey;
-        long indexPosition = indexRAF_.getFilePointer();
-        indexRAF_.writeUTF(decoratedKey);
-        indexRAF_.writeLong(position);
-        logger_.trace("wrote " + decoratedKey + " at " + position);
-
-        if (keysWritten++ % indexInterval_ != 0)
-            return;
-        if (indexPositions_ == null)
-        {
-            indexPositions_ = new ArrayList<KeyPosition>();
-        }
-        indexPositions_.add(new KeyPosition(decoratedKey, indexPosition));
-        logger_.trace("wrote index of " + decoratedKey + " at " + indexPosition);
-    }
-
-    // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
-    public void append(String decoratedKey, DataOutputBuffer buffer) throws IOException
-    {
-        long currentPosition = beforeAppend(decoratedKey);
-        dataWriter_.append(decoratedKey, buffer);
-        afterAppend(decoratedKey, currentPosition);
-    }
-
-    public void append(String decoratedKey, byte[] value) throws IOException
-    {
-        long currentPosition = beforeAppend(decoratedKey);
-        dataWriter_.append(decoratedKey, value);
-        afterAppend(decoratedKey, currentPosition);
-    }
-
     /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
     private long getIndexScanPosition(String decoratedKey, IPartitioner partitioner)
     {
-        assert indexPositions_ != null && indexPositions_.size() > 0;
-        int index = Collections.binarySearch(indexPositions_, new KeyPosition(decoratedKey, -1));
+        assert indexPositions != null && indexPositions.size() > 0;
+        int index = Collections.binarySearch(indexPositions, new KeyPosition(decoratedKey, -1));
         if (index < 0)
         {
             // binary search gives us the first index _greater_ than the key searched for,
@@ -282,11 +168,11 @@
             int greaterThan = (index + 1) * -1;
             if (greaterThan == 0)
                 return -1;
-            return indexPositions_.get(greaterThan - 1).position;
+            return indexPositions.get(greaterThan - 1).position;
         }
         else
         {
-            return indexPositions_.get(index).position;
+            return indexPositions.get(index).position;
         }
     }
 
@@ -309,7 +195,7 @@
         }
 
         // TODO mmap the index file?
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile_), "r");
+        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile), "r");
         input.seek(start);
         int i = 0;
         try
@@ -334,7 +220,7 @@
                 }
                 if (v > 0)
                     return -1;
-            } while  (++i < indexInterval_);
+            } while  (++i < INDEX_INTERVAL);
         }
         finally
         {
@@ -346,12 +232,12 @@
     /** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */
     public long getNearestPosition(String decoratedKey) throws IOException
     {
-        long start = getIndexScanPosition(decoratedKey, partitioner_);
+        long start = getIndexScanPosition(decoratedKey, partitioner);
         if (start < 0)
         {
             return 0;
         }
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile_), "r");
+        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile), "r");
         input.seek(start);
         try
         {
@@ -367,7 +253,7 @@
                     return -1;
                 }
                 long position = input.readLong();
-                int v = partitioner_.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey);
+                int v = partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey);
                 if (v >= 0)
                     return position;
             }
@@ -388,9 +274,9 @@
         IFileReader dataReader = null;
         try
         {
-            dataReader = SequenceFile.reader(dataFile_);
-            String decoratedKey = partitioner_.decorateKey(clientKey);
-            long position = getPosition(decoratedKey, partitioner_);
+            dataReader = SequenceFile.reader(dataFile);
+            String decoratedKey = partitioner.decorateKey(clientKey);
+            long position = getPosition(decoratedKey, partitioner);
 
             DataOutputBuffer bufOut = new DataOutputBuffer();
             DataInputBuffer bufIn = new DataInputBuffer();
@@ -424,53 +310,19 @@
         return next(clientKey, columnFamilyName, cnNames);
     }
 
-    private static String rename(String tmpFilename)
-    {
-        String filename = tmpFilename.replace("-" + temporaryFile_, "");
-        new File(tmpFilename).renameTo(new File(filename));
-        return filename;
-    }
-
-    /**
-     * Renames temporary SSTable files to valid data, index, and bloom filter files
-     */
-    public void close() throws IOException
-    {
-        // bloom filter
-        FileOutputStream fos = new FileOutputStream(filterFilename());
-        DataOutputStream stream = new DataOutputStream(fos);
-        BloomFilter.serializer().serialize(bf, stream);
-        stream.flush();
-        fos.getFD().sync();
-        stream.close();
-
-        // index
-        indexRAF_.getChannel().force(true);
-        indexRAF_.close();
-
-        // main data
-        dataWriter_.close(); // calls force
-
-        rename(indexFilename());
-        rename(filterFilename());
-        dataFile_ = rename(dataFile_); // important to do this last since index & filter file names are derived from it
-
-        openedFiles.put(dataFile_, this);
-    }
-
     /**
      * obtain a BlockReader for the getColumnSlice call.
      */
     public ColumnGroupReader getColumnGroupReader(String key, String cfName, String startColumn, boolean isAscending) throws IOException
     {
-        IFileReader dataReader = SequenceFile.reader(dataFile_);
+        IFileReader dataReader = SequenceFile.reader(dataFile);
 
         try
         {
             /* Morph key into actual key based on the partition type. */
-            String decoratedKey = partitioner_.decorateKey(key);
-            long position = getPosition(decoratedKey, partitioner_);
-            return new ColumnGroupReader(dataFile_, decoratedKey, cfName, startColumn, isAscending, position);
+            String decoratedKey = partitioner.decorateKey(key);
+            long position = getPosition(decoratedKey, partitioner);
+            return new ColumnGroupReader(dataFile, decoratedKey, cfName, startColumn, isAscending, position);
         }
         finally
         {
@@ -480,10 +332,10 @@
 
     public void delete() throws IOException
     {
-        FileUtils.deleteWithConfirm(new File(dataFile_));
-        FileUtils.deleteWithConfirm(new File(indexFilename(dataFile_)));
-        FileUtils.deleteWithConfirm(new File(filterFilename(dataFile_)));
-        openedFiles.remove(dataFile_);
+        FileUtils.deleteWithConfirm(new File(dataFile));
+        FileUtils.deleteWithConfirm(new File(indexFilename(dataFile)));
+        FileUtils.deleteWithConfirm(new File(filterFilename(dataFile)));
+        openedFiles.remove(dataFile);
     }
 
     /** obviously only for testing */
@@ -494,17 +346,17 @@
 
     static void reopenUnsafe() throws IOException // testing only
     {
-        Collection<SSTable> sstables = new ArrayList<SSTable>(openedFiles.values());
+        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(openedFiles.values());
         openedFiles.clear();
-        for (SSTable sstable : sstables)
+        for (SSTableReader sstable : sstables)
         {
-            SSTable.open(sstable.dataFile_, sstable.partitioner_);
+            SSTableReader.open(sstable.dataFile, sstable.partitioner);
         }
     }
 
     IPartitioner getPartitioner()
     {
-        return partitioner_;
+        return partitioner;
     }
 
     public FileStruct getFileStruct() throws IOException
@@ -514,48 +366,18 @@
 
     public static void deleteAll() throws IOException
     {
-        for (SSTable sstable : openedFiles.values())
+        for (SSTableReader sstable : openedFiles.values())
         {
             sstable.delete();
         }
     }
-
-    /**
-     * This is a simple container for the index Key and its corresponding position
-     * in the data file. Binary search is performed on a list of these objects
-     * to lookup keys within the SSTable data file.
-     *
-     * All keys are decorated.
-     */
-    class KeyPosition implements Comparable<KeyPosition>
-    {
-        public final String key; // decorated
-        public final long position;
-
-        public KeyPosition(String key, long position)
-        {
-            this.key = key;
-            this.position = position;
-        }
-
-        public int compareTo(KeyPosition kp)
-        {
-            return partitioner_.getDecoratedKeyComparator().compare(key, kp.key);
-        }
-
-        public String toString()
-        {
-            return key + ":" + position;
-        }
-    }
-
 }
 
 class FileSSTableMap
 {
-    private final Map<String, SSTable> map = new NonBlockingHashMap<String, SSTable>();
+    private final Map<String, SSTableReader> map = new NonBlockingHashMap<String, SSTableReader>();
 
-    public SSTable get(String filename)
+    public SSTableReader get(String filename)
     {
         try
         {
@@ -567,7 +389,7 @@
         }
     }
 
-    public SSTable put(String filename, SSTable value)
+    public SSTableReader put(String filename, SSTableReader value)
     {
         try
         {
@@ -579,7 +401,7 @@
         }
     }
 
-    public Collection<SSTable> values()
+    public Collection<SSTableReader> values()
     {
         return map.values();
     }

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=791591&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Mon Jul  6 19:58:05 2009
@@ -0,0 +1,118 @@
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.DataOutputStream;
+import java.util.Comparator;
+import java.util.ArrayList;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.utils.BloomFilter;
+
+public class SSTableWriter extends SSTable
+{
+    private static Logger logger = Logger.getLogger(SSTableWriter.class);
+
+    private long keysWritten;
+    private IFileWriter dataWriter;
+    private BufferedRandomAccessFile indexRAF;
+    private String lastWrittenKey;
+    private BloomFilter bf;
+
+    public SSTableWriter(String filename, int keyCount, IPartitioner partitioner) throws IOException
+    {
+        super(filename, partitioner);
+        dataWriter = SequenceFile.bufferedWriter(dataFile, 4 * 1024 * 1024);
+        indexRAF = new BufferedRandomAccessFile(indexFilename(), "rw", 1024 * 1024);
+        bf = new BloomFilter(keyCount, 15);
+    }
+
+    private long beforeAppend(String decoratedKey) throws IOException
+    {
+        if (decoratedKey == null)
+        {
+            throw new IOException("Keys must not be null.");
+        }
+        Comparator<String> c = partitioner.getDecoratedKeyComparator();
+        if (lastWrittenKey != null && c.compare(lastWrittenKey, decoratedKey) > 0)
+        {
+            logger.info("Last written key : " + lastWrittenKey);
+            logger.info("Current key : " + decoratedKey);
+            logger.info("Writing into file " + dataFile);
+            throw new IOException("Keys must be written in ascending order.");
+        }
+        return (lastWrittenKey == null) ? 0 : dataWriter.getCurrentPosition();
+    }
+
+    private void afterAppend(String decoratedKey, long position) throws IOException
+    {
+        bf.add(decoratedKey);
+        lastWrittenKey = decoratedKey;
+        long indexPosition = indexRAF.getFilePointer();
+        indexRAF.writeUTF(decoratedKey);
+        indexRAF.writeLong(position);
+        logger.trace("wrote " + decoratedKey + " at " + position);
+
+        if (keysWritten++ % INDEX_INTERVAL != 0)
+            return;
+        if (indexPositions == null)
+        {
+            indexPositions = new ArrayList<KeyPosition>();
+        }
+        indexPositions.add(new KeyPosition(decoratedKey, indexPosition));
+        logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
+    }
+
+    // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
+    public void append(String decoratedKey, DataOutputBuffer buffer) throws IOException
+    {
+        long currentPosition = beforeAppend(decoratedKey);
+        dataWriter.append(decoratedKey, buffer);
+        afterAppend(decoratedKey, currentPosition);
+    }
+
+    public void append(String decoratedKey, byte[] value) throws IOException
+    {
+        long currentPosition = beforeAppend(decoratedKey);
+        dataWriter.append(decoratedKey, value);
+        afterAppend(decoratedKey, currentPosition);
+    }
+
+    private static String rename(String tmpFilename)
+    {
+        String filename = tmpFilename.replace("-" + TEMPFILE_MARKER, "");
+        new File(tmpFilename).renameTo(new File(filename));
+        return filename;
+    }
+
+    /**
+     * Renames temporary SSTable files to valid data, index, and bloom filter files
+     */
+    public SSTableReader closeAndOpenReader() throws IOException
+    {
+        // bloom filter
+        FileOutputStream fos = new FileOutputStream(filterFilename());
+        DataOutputStream stream = new DataOutputStream(fos);
+        BloomFilter.serializer().serialize(bf, stream);
+        stream.flush();
+        fos.getFD().sync();
+        stream.close();
+
+        // index
+        indexRAF.getChannel().force(true);
+        indexRAF.close();
+
+        // main data
+        dataWriter.close(); // calls force
+
+        rename(indexFilename());
+        rename(filterFilename());
+        dataFile = rename(dataFile); // important to do this last since index & filter file names are derived from it
+
+        return new SSTableReader(dataFile, partitioner, indexPositions, bf);
+    }
+
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SequenceFile.java Mon Jul  6 19:58:05 2009
@@ -19,12 +19,7 @@
 package org.apache.cassandra.io;
 
 import java.io.*;
-import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -32,7 +27,6 @@
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.LogUtil;
 
 import org.apache.log4j.Logger;
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/test/DBTest.java Mon Jul  6 19:58:05 2009
@@ -18,30 +18,17 @@
 
 package org.apache.cassandra.test;
 
-import java.io.FileInputStream;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Scanner;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.IFileWriter;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.io.SequenceFile;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.LogUtil;
 
 
 public class DBTest

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Mon Jul  6 19:58:05 2009
@@ -30,9 +30,7 @@
 
 import static junit.framework.Assert.assertEquals;
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.io.SSTableReader;
 
 public class ColumnFamilyStoreTest extends CleanupHelper
 {
@@ -114,7 +112,7 @@
         rm.apply();
         store.forceBlockingFlush();
 
-        List<SSTable> ssTables = table.getAllSSTablesOnDisk();
+        List<SSTableReader> ssTables = table.getAllSSTablesOnDisk();
         assertEquals(1, ssTables.size());
         ssTables.get(0).forceBloomFilterFailures();
         ColumnFamily cf = store.getColumnFamily("key2", "Standard1:Column1", new IdentityFilter());

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Mon Jul  6 19:58:05 2009
@@ -27,7 +27,7 @@
 
 import org.junit.Test;
 
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.CleanupHelper;
 import static junit.framework.Assert.assertEquals;
 
@@ -42,7 +42,7 @@
 
         final int ROWS_PER_SSTABLE = 10;
         Set<String> inserted = new HashSet<String>();
-        for (int j = 0; j < (SSTable.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+        for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
             for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
                 String key = String.valueOf(i % 2);
                 RowMutation rm = new RowMutation("Table1", key);

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Mon Jul  6 19:58:05 2009
@@ -26,7 +26,7 @@
 
 import static junit.framework.Assert.*;
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
 
 public class TableTest extends CleanupHelper
 {
@@ -134,7 +134,7 @@
         table.getColumnFamilyStore("Standard2").forceBlockingFlush();
         validateGetSliceNoMatch(table);
 
-        Collection<SSTable> ssTables = table.getColumnFamilyStore("Standard2").getSSTables();
+        Collection<SSTableReader> ssTables = table.getColumnFamilyStore("Standard2").getSSTables();
         assertEquals(1, ssTables.size());
         ssTables.iterator().next().forceBloomFilterFailures();
         validateGetSliceNoMatch(table);

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=791591&r1=791590&r2=791591&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java Mon Jul  6 19:58:05 2009
@@ -35,27 +35,27 @@
         File f = tempSSTableFileName();
 
         // write test data
-        SSTable ssTable = new SSTable(f.getAbsolutePath(), 1, new OrderPreservingPartitioner());
+        SSTableWriter writer = new SSTableWriter(f.getAbsolutePath(), 1, new OrderPreservingPartitioner());
         Random random = new Random();
         byte[] bytes = new byte[1024];
         random.nextBytes(bytes);
 
         String key = Integer.toString(1);
-        ssTable.append(key, bytes);
-        ssTable.close();
+        writer.append(key, bytes);
+        SSTableReader ssTable = writer.closeAndOpenReader();
 
         // verify
         verifySingle(ssTable, bytes, key);
-        SSTable.reopenUnsafe(); // force reloading the index
+        SSTableReader.reopenUnsafe(); // force reloading the index
         verifySingle(ssTable, bytes, key);
     }
 
     private File tempSSTableFileName() throws IOException
     {
-        return File.createTempFile("sstable", "-" + SSTable.temporaryFile_ + "-Data.db");
+        return File.createTempFile("sstable", "-" + SSTable.TEMPFILE_MARKER + "-Data.db");
     }
 
-    private void verifySingle(SSTable sstable, byte[] bytes, String key) throws IOException
+    private void verifySingle(SSTableReader sstable, byte[] bytes, String key) throws IOException
     {
         FileStruct fs = sstable.getFileStruct();
         fs.seekTo(key);
@@ -76,22 +76,22 @@
         }
 
         // write
-        SSTable ssTable = new SSTable(f.getAbsolutePath(), 1000, new OrderPreservingPartitioner());
+        SSTableWriter writer = new SSTableWriter(f.getAbsolutePath(), 1000, new OrderPreservingPartitioner());
         for (String key: map.navigableKeySet())
         {
-            ssTable.append(key, map.get(key));
+            writer.append(key, map.get(key));
         }
-        ssTable.close();
+        SSTableReader ssTable = writer.closeAndOpenReader();
 
         // verify
         verifyMany(ssTable, map);
-        SSTable.reopenUnsafe(); // force reloading the index
+        SSTableReader.reopenUnsafe(); // force reloading the index
         verifyMany(ssTable, map);
     }
 
-    private void verifyMany(SSTable sstable, TreeMap<String, byte[]> map) throws IOException
+    private void verifyMany(SSTableReader sstable, TreeMap<String, byte[]> map) throws IOException
     {
-        List<String> keys = new ArrayList(map.keySet());
+        List<String> keys = new ArrayList<String>(map.keySet());
         Collections.shuffle(keys);
         FileStruct fs = sstable.getFileStruct();
         for (String key : keys)