You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC

svn commit: r749205 [15/16] - in /incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/ config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/ cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...

Added: incubator/cassandra/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/SSTable.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/SSTable.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/SSTable.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,1146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.math.BigInteger;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This class is built on top of the SequenceFile. It stores
+ * data on disk in sorted fashion. However the sorting is upto
+ * the application. This class expects keys to be handed to it
+ * in sorted order. SSTable is broken up into blocks where each
+ * block contains 128 keys. At the end of the file  the block 
+ * index is written which contains the offsets to the keys in the
+ * block. SSTable also maintains an index file to which every 128th 
+ * key is written with a pointer to the block index which is the block 
+ * that actually contains the key. This index file is then read and 
+ * maintained in memory. SSTable is append only and immutable. SSTable
+ * on disk looks as follows:
+ * 
+ *                 -------------------------
+ *                 |------------------------|<-------|
+ *                 |                        |        |  BLOCK-INDEX PTR
+ *                 |                        |        |
+ *                 |------------------------|--------
+ *                 |------------------------|<-------|
+ *                 |                        |        |
+ *                 |                        |        |  BLOCK-INDEX PTR 
+ *                 |                        |        |
+ *                 |------------------------|---------
+ *                 |------------------------|<--------|
+ *                 |                        |         |
+ *                 |                        |         |
+ *                 |                        |         | BLOCK-INDEX PTR
+ *                 |                        |         |
+ *                 |------------------------|         |
+ *                 |------------------------|----------
+ *                 |------------------------|-----------------> BLOOM-FILTER
+ * version-info <--|----------|-------------|-------> relative offset to last block index.
+ *                 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SSTable
+{
+    private static Logger logger_ = Logger.getLogger(SSTable.class);
+    /* use this as a monitor to lock when loading index. */
+    private static Object indexLoadLock_ = new Object();
+    /* Every 128th key is an index. */
+    private static final int indexInterval_ = 128;
+    /* Key associated with block index written to disk */
+    public static final String blockIndexKey_ = "BLOCK-INDEX";
+    /* Position in SSTable after the first Block Index */
+    private static long positionAfterFirstBlockIndex_ = 0L;
+    /* Required extension for temporary files created during compactions. */
+    public static final String temporaryFile_ = "tmp";
+    /* Use this long as a 64 bit entity to turn on some bits for various settings */
+    private static final long version_ = 0L;
+    /*
+     * This map has the SSTable as key and a BloomFilter as value. This
+     * BloomFilter will tell us if a key/column pair is in the SSTable.
+     * If not we can avoid scanning it.
+     */
+    private static Map<String, BloomFilter> bfs_ = new Hashtable<String, BloomFilter>();
+    /* Maintains a touched set of keys */
+    private static LinkedHashMap<String, Long> touchCache_ = new TouchedKeyCache(DatabaseDescriptor.getTouchKeyCacheSize());        
+    
+    /**
+     * This class holds the position of a key in a block
+     * and the size of the data associated with this key. 
+    */
+    protected static class BlockMetadata
+    {
+        protected static final BlockMetadata NULL = new BlockMetadata(-1L, -1L);
+        
+        long position_;
+        long size_;
+        
+        BlockMetadata(long position, long size)
+        {
+            position_ = position;
+            size_ = size;
+        }
+    }
+    
+    /*
+     * This abstraction provides LRU symantics for the keys that are 
+     * "touched". Currently it holds the offset of the key in a data
+     * file. May change to hold a reference to a IFileReader which
+     * memory maps the key and its associated data on a touch.
+    */
+    private static class TouchedKeyCache extends LinkedHashMap<String, Long>
+    {
+        private final int capacity_;
+        
+        TouchedKeyCache(int capacity)
+        {
+            super(capacity + 1, 1.1f, true);
+            capacity_ = capacity;
+        }
+        
+        protected boolean removeEldestEntry(Map.Entry<String, Long> entry)
+        {
+            return ( size() > capacity_ );
+        }
+    }
+    
+    /**
+     * This compares two strings and does it in reverse
+     * order.
+     * 
+     * @author alakshman
+     *
+     */
+    private static class OrderPreservingPartitionerComparator implements Comparator<String>
+    {
+        public int compare(String c1, String c2) 
+        {
+            return c2.compareTo(c1);
+        } 
+    }
+
+    /**
+     * This class compares two BigInteger's passes in
+     * as strings and does so in reverse order.
+     * @author alakshman
+     *
+     */
+    private static class RandomPartitionerComparator implements Comparator<String>
+    {
+        public int compare(String c1, String c2) 
+        {
+            BigInteger b1 = new BigInteger(c1);
+            BigInteger b2 = new BigInteger(c2);
+            return b2.compareTo(b1);
+        } 
+    }
+    
+    /**
+     * This is a simple container for the index Key and its corresponding position
+     * in the data file. Binary search is performed on a list of these objects
+     * to lookup keys within the SSTable data file.
+    */
+    public static class KeyPositionInfo implements Comparable<KeyPositionInfo>
+    {
+        private String key_;
+        private long position_;
+
+        public KeyPositionInfo(String key)
+        {
+            key_ = key;            
+        }
+
+        public KeyPositionInfo(String key, long position)
+        {
+            this(key);
+            position_ = position;
+        }
+
+        public String key()
+        {
+            return key_;
+        }
+
+        public long position()
+        {
+            return position_;
+        }
+
+        public int compareTo(KeyPositionInfo kPosInfo)
+        {
+            int value = 0;
+            PartitionerType pType = StorageService.getPartitionerType();
+            switch( pType )
+            {
+                case OPHF:
+                    value = key_.compareTo(kPosInfo.key_);                    
+                    break;
+                    
+                default:
+                    BigInteger b = new BigInteger(key_);
+                    value = b.compareTo( new BigInteger(kPosInfo.key_) );
+                    break;
+            }
+            return value;
+        }
+
+        public String toString()
+        {
+        	return key_ + ":" + position_;
+        }
+    }
+    
+    public static int indexInterval()
+    {
+    	return indexInterval_;
+    }
+    
+    /*
+     * Maintains a list of KeyPositionInfo objects per SSTable file loaded.
+     * We do this so that we don't read the index file into memory multiple
+     * times.
+    */
+    private static Map<String, List<KeyPositionInfo>> indexMetadataMap_ = new Hashtable<String, List<KeyPositionInfo>>();
+    
+    /** 
+     * This method deletes both the specified data file
+     * and the associated index file
+     *
+     * @param dataFile - data file associated with the SSTable
+    */
+    public static void delete(String dataFile)
+    {        
+        /* remove the cached index table from memory */
+        indexMetadataMap_.remove(dataFile);
+        
+        File file = new File(dataFile);
+        if ( file.exists() )
+        {
+            /* delete the data file */
+			if (file.delete())
+			{			    
+			    logger_.info("** Deleted " + file.getName() + " **");
+			}
+			else
+			{			  
+			    logger_.error("Failed to delete " + file.getName());
+			}
+        }
+    }
+
+    public static int getApproximateKeyCount( List<String> dataFiles)
+    {
+    	int count = 0 ;
+
+    	for(String dataFile : dataFiles )
+    	{    		
+    		List<KeyPositionInfo> index = indexMetadataMap_.get(dataFile);
+    		if (index != null )
+    		{
+    			int indexKeyCount = index.size();
+    			count = count + (indexKeyCount+1) * indexInterval_ ;
+    	        logger_.debug("index size for bloom filter calc for file  : " + dataFile + "   : " + count);
+    		}
+    	}
+
+    	return count;
+    }
+
+    /**
+     * Get all indexed keys in the SSTable.
+    */
+    public static List<String> getIndexedKeys()
+    {
+        Set<String> indexFiles = indexMetadataMap_.keySet();
+        List<KeyPositionInfo> keyPositionInfos = new ArrayList<KeyPositionInfo>();
+
+        for ( String indexFile : indexFiles )
+        {
+            keyPositionInfos.addAll( indexMetadataMap_.get(indexFile) );
+        }
+
+        List<String> indexedKeys = new ArrayList<String>();
+        for ( KeyPositionInfo keyPositionInfo : keyPositionInfos )
+        {
+            indexedKeys.add(keyPositionInfo.key_);
+        }
+
+        Collections.sort(indexedKeys);
+        return indexedKeys;
+    }
+    
+    /*
+     * Intialize the index files and also cache the Bloom Filters
+     * associated with these files. Also caches the file handles 
+     * associated with these files.
+    */
+    public static void onStart(List<String> filenames) throws IOException
+    {
+        for ( String filename : filenames )
+        {
+            SSTable ssTable = null;
+            try
+            {
+                ssTable = new SSTable(filename);                                
+            }
+            catch ( IOException ex )
+            {
+                logger_.info("Deleting corrupted file " + filename);
+                FileUtils.delete(filename);
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+            finally
+            {
+                if ( ssTable != null )
+                {
+                    ssTable.close();
+                }
+            }
+        }
+    }
+
+    /*
+     * Stores the Bloom Filter associated with the given file.
+    */
+    public static void storeBloomFilter(String filename, BloomFilter bf)
+    {
+        bfs_.put(filename, bf);
+    }
+
+    /*
+     * Removes the bloom filter associated with the specified file.
+    */
+    public static void removeAssociatedBloomFilter(String filename)
+    {
+        bfs_.remove(filename);
+    }
+
+    /*
+     * Determines if the given key is in the specified file. If the
+     * key is not present then we skip processing this file.
+    */
+    public static boolean isKeyInFile(String key, String filename)
+    {
+        boolean bVal = false;
+        BloomFilter bf = bfs_.get(filename);
+        if ( bf != null )
+        {
+            bVal = bf.isPresent(key);
+        }
+        return bVal;
+    }
+
+    private String dataFile_;    
+    private IFileWriter dataWriter_;
+    private String lastWrittenKey_;    
+    private long firstBlockPosition_ = 0L;    
+    private int indexKeysWritten_ = 0;
+    /* Holds the keys and their respective positions of the current block index */
+    private SortedMap<String, BlockMetadata> blockIndex_;    
+    /* Holds all the block indicies for this SSTable */
+    private List<SortedMap<String, BlockMetadata>> blockIndexes_;
+    
+    /**
+     * This ctor basically gets passed in the full path name
+     * of the data file associated with this SSTable. Use this
+     * ctor to read the data in this file.
+    */
+    public SSTable(String dataFileName) throws IOException
+    {        
+        dataFile_ = dataFileName;
+        init();
+    }
+
+    /**
+     * This ctor is used for writing data into the SSTable. Use this
+     * version for non DB writes to the SSTable.
+    */
+    public SSTable(String directory, String filename) throws IOException
+    {  
+        dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";                
+        blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
+        blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();        
+        // dataWriter_ = SequenceFile.writer(dataFile_);
+        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
+        SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition(); 
+    } 
+    
+    private void initBlockIndex()
+    {
+        initBlockIndex(StorageService.getPartitionerType());
+    }
+    
+    private void initBlockIndex(PartitionerType pType)
+    {
+        switch ( pType )
+        {
+            case OPHF: 
+                blockIndex_ = new TreeMap<String, BlockMetadata>( new SSTable.OrderPreservingPartitionerComparator() );                
+               break;
+               
+            default:
+                blockIndex_ = new TreeMap<String, BlockMetadata>( new SSTable.RandomPartitionerComparator() );
+                break;
+        }
+    }
+    
+    /**
+     * This ctor is used for DB writes into the SSTable. Use this
+     * version to write to the SSTable.
+    */
+    public SSTable(String directory, String filename, PartitionerType pType) throws IOException
+    {        
+        dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";  
+        dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);        
+        SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition(); 
+        /* set up the block index based on partition type */
+        initBlockIndex(pType);
+        blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
+    }
+    
+    private void loadBloomFilter(IFileReader indexReader, long size) throws IOException
+    {        
+        /* read the position of the bloom filter */
+        indexReader.seek(size - 8);
+        byte[] bytes = new byte[8];
+        long currentPosition = indexReader.getCurrentPosition();
+        indexReader.readDirect(bytes);
+        long position = BasicUtilities.byteArrayToLong(bytes);
+        /* seek to the position of the bloom filter */
+        indexReader.seek(currentPosition - position);
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        /* read the bloom filter from disk */
+        indexReader.next(bufOut);   
+        bufOut.close();
+        bufIn.reset(bufOut.getData(), bufOut.getLength());
+        String key = bufIn.readUTF();
+        if ( key.equals(SequenceFile.marker_) )
+        {
+            /*
+             * We are now reading the serialized Bloom Filter. We read
+             * the length and then pass the bufIn to the serializer of
+             * the BloomFilter. We then store the Bloom filter in the
+             * map. However if the Bloom Filter already exists then we
+             * need not read the rest of the file.
+            */
+            bufIn.readInt();
+            if ( bfs_.get(dataFile_) == null )
+                bfs_.put(dataFile_, BloomFilter.serializer().deserialize(bufIn));
+        }
+    }
+    
+    private void loadIndexFile() throws IOException
+    {    
+        IFileReader indexReader = null;
+        /* Read all block indexes to maintain an index in memory */
+        try
+        {            
+            indexReader = SequenceFile.bufferedReader(dataFile_, 4*1024*1024);
+            long size = indexReader.getEOF();
+            
+            /* load the bloom filter into memory */
+            loadBloomFilter(indexReader, size);
+            /* read the position of the last block index */
+            byte[] bytes = new byte[8];
+            /* seek to the position to read the relative position of the first block index */
+            indexReader.seek(size - 16L);
+            /* the beginning of the first block index */
+            long currentPosition = indexReader.getCurrentPosition();
+            indexReader.readDirect(bytes);
+            long firstBlockIndexPosition = BasicUtilities.byteArrayToLong(bytes);  
+            List<KeyPositionInfo> keyPositionInfos = new ArrayList<KeyPositionInfo>();
+            indexMetadataMap_.put(dataFile_, keyPositionInfos);
+            DataOutputBuffer bufOut = new DataOutputBuffer();
+            DataInputBuffer bufIn = new DataInputBuffer();        
+            
+            long nextPosition = currentPosition - firstBlockIndexPosition;
+            indexReader.seek(nextPosition);
+            /* read the block indexes from the end of the file till we hit the first one. */
+            while ( nextPosition > 0 )
+            {
+                bufOut.reset();
+                /* position @ the current block index being processed */
+                currentPosition = indexReader.getCurrentPosition();
+                long bytesRead = indexReader.next(bufOut);
+                if ( bytesRead != -1 )
+                {
+                    bufIn.reset(bufOut.getData(), bufOut.getLength());
+                    /* read the block key. */
+                    String blockIndexKey = bufIn.readUTF();
+                    if ( !blockIndexKey.equals(SSTable.blockIndexKey_) )
+                    {
+                    	logger_.debug(" Done reading the block indexes, Index has been created");
+                    	break;
+                    }
+                    /* read the size of the block index */
+                    bufIn.readInt();                    
+                    /* Number of keys in the block. */
+                    int keys = bufIn.readInt();
+                    String largestKeyInBlock = null;
+                    for ( int i = 0; i < keys; ++i )
+                    {
+                        String keyInBlock = bufIn.readUTF();
+                        if ( i == 0 )
+                        {
+                            largestKeyInBlock = keyInBlock;
+                            /* relative offset in the block for the key*/
+                            bufIn.readLong();
+                            /* size of data associated with the key */
+                            bufIn.readLong();
+                            /* load the actual position of the block index into the index map */
+                            keyPositionInfos.add( new KeyPositionInfo(largestKeyInBlock, currentPosition) );
+                        }
+                        else
+                        {
+                            /*
+                             * This is not the key we are looking for. So read its position
+                             * and the size of the data associated with it. This was stored
+                             * as the BlockMetadata.
+                            */
+                            bufIn.readLong();
+                            bufIn.readLong();
+                        }
+                    }
+                }
+            }
+            bufIn.close();
+            bufOut.close();
+            Collections.sort(keyPositionInfos);
+        }        
+        finally
+        {
+            if ( indexReader != null )
+            {
+                indexReader.close();
+            }
+        }        
+    }
+
+    private void init() throws IOException
+    {        
+        /*
+         * this is to prevent multiple threads from
+         * loading the same index files multiple times
+         * into memory.
+        */
+        synchronized( indexLoadLock_ )
+        {
+            if ( indexMetadataMap_.get(dataFile_) == null )
+            {
+                long start = System.currentTimeMillis();
+                loadIndexFile();
+                logger_.debug("INDEX LOAD TIME: " + (System.currentTimeMillis() - start) + " ms.");                
+            }
+        }
+    }
+
+    private String getFile(String name) throws IOException
+    {
+        File file = new File(name);
+        if ( file.exists() )
+            return file.getAbsolutePath();
+        throw new IOException("File " + name + " was not found on disk.");
+    }
+
+    public String getDataFileLocation() throws IOException
+    {
+        return getFile(dataFile_);
+    }
+
+    public long lastModified()
+    {
+        return dataWriter_.lastModified();
+    }
+    
+    /*
+     * Seeks to the specified key on disk.
+    */
+    public void touch(String key, boolean fData) throws IOException
+    {
+        if ( touchCache_.containsKey(key) )
+            return;
+        
+        IFileReader dataReader = SequenceFile.reader(dataFile_); 
+        try
+        {
+        	/* Morph the key */
+        	key = morphKey(key);
+            Coordinate fileCoordinate = getCoordinates(key, dataReader);
+            /* Get offset of key from block Index */
+            dataReader.seek(fileCoordinate.end_);
+            BlockMetadata blockMetadata = dataReader.getBlockMetadata(key);
+            if ( blockMetadata.position_ != -1L )
+            {
+                touchCache_.put(dataFile_ + ":" + key, blockMetadata.position_);                  
+            } 
+            
+            if ( fData )
+            {
+                /* Read the data associated with this key and pull it into the Buffer Cache */
+                if ( blockMetadata.position_ != -1L )
+                {
+                    dataReader.seek(blockMetadata.position_);
+                    DataOutputBuffer bufOut = new DataOutputBuffer();
+                    dataReader.next(bufOut);
+                    bufOut.reset();
+                    logger_.debug("Finished the touch of the key to pull it into buffer cache.");
+                }
+            }
+        }
+        finally
+        {
+            if ( dataReader != null )
+                dataReader.close();
+        }
+    }
+
+    private long beforeAppend(String key) throws IOException
+    {
+    	if(key == null )
+            throw new IOException("Keys must not be null.");
+        if ( lastWrittenKey_ != null && key.compareTo(lastWrittenKey_) <= 0 )
+        {
+            logger_.info("Last written key : " + lastWrittenKey_);
+            logger_.info("Current key : " + key);
+            logger_.info("Writing into file " + dataFile_);
+            throw new IOException("Keys must be written in ascending order.");
+        }
+        long currentPosition = (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
+        return currentPosition;
+    }
+    
+    private long beforeAppend(BigInteger hash) throws IOException
+    {
+        if(hash == null )
+            throw new IOException("Keys must not be null.");
+        if ( lastWrittenKey_ != null )
+        {
+            BigInteger previousKey = new BigInteger(lastWrittenKey_);
+            if ( hash.compareTo(previousKey) <= 0 )
+            {
+                logger_.info("Last written key : " + previousKey);
+                logger_.info("Current key : " + hash);
+                logger_.info("Writing into file " + dataFile_);
+                throw new IOException("Keys must be written in ascending order.");
+            }
+        }
+        long currentPosition = (lastWrittenKey_ == null) ? SSTable.positionAfterFirstBlockIndex_ : dataWriter_.getCurrentPosition();
+        return currentPosition;
+    }
+
+    private void afterAppend(String key, long position, long size) throws IOException
+    {
+        ++indexKeysWritten_;
+        lastWrittenKey_ = key;
+        blockIndex_.put(key, new BlockMetadata(position, size));
+        if ( indexKeysWritten_ == indexInterval_ )
+        {
+        	blockIndexes_.add(blockIndex_);
+        	blockIndex_ = new TreeMap<String, BlockMetadata>(Collections.reverseOrder());
+            indexKeysWritten_ = 0;
+        }                
+    }
+    
+    private void afterAppend(BigInteger hash, long position, long size) throws IOException
+    {
+        ++indexKeysWritten_;
+        String key = hash.toString();
+        lastWrittenKey_ = key;
+        blockIndex_.put(key, new BlockMetadata(position, size));
+        if ( indexKeysWritten_ == indexInterval_ )
+        {
+            blockIndexes_.add(blockIndex_);
+            initBlockIndex();
+            indexKeysWritten_ = 0;
+        }                
+    }
+    
+    /**
+     * Dumps all the block indicies for this SSTable
+     * at the end of the file.
+     * @throws IOException
+     */
+    private void dumpBlockIndexes() throws IOException
+    {    	
+        long position = dataWriter_.getCurrentPosition();
+        firstBlockPosition_ = position;
+    	for( SortedMap<String, BlockMetadata> block : blockIndexes_ )
+    	{
+    		dumpBlockIndex( block );
+    	}  	
+    }    
+    
+    private void dumpBlockIndex( SortedMap<String, BlockMetadata> blockIndex) throws IOException
+    {
+        /* Block Index is empty so bail. */
+        if ( blockIndex.size() == 0 )
+            return;
+        
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        /* 
+         * Record the position where we start writing the block index. This is will be
+         * used as the position of the lastWrittenKey in the block in the index file
+        */
+        long position = dataWriter_.getCurrentPosition();
+        Set<String> keys = blockIndex.keySet();                
+        /* Number of keys in this block */
+        bufOut.writeInt(keys.size());
+        for ( String key : keys )
+        {            
+            bufOut.writeUTF(key);
+            BlockMetadata blockMetadata = blockIndex.get(key);
+            /* position of the key as a relative offset */
+            bufOut.writeLong(position - blockMetadata.position_);
+            bufOut.writeLong(blockMetadata.size_);
+        }
+        /* Write out the block index. */
+        dataWriter_.append(SSTable.blockIndexKey_, bufOut);
+        /* Load this index into the in memory index map */
+        List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.get(dataFile_);
+        if ( keyPositionInfos == null )
+        {
+        	keyPositionInfos = new ArrayList<KeyPositionInfo>();
+        	SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
+        }
+        
+        keyPositionInfos.add(new KeyPositionInfo(blockIndex.firstKey(), position));
+        /*
+        try
+        {
+            keyPositionInfos.add(new KeyPositionInfo(blockIndex.firstKey(), position));
+        }
+        catch(Exception ex)
+        {
+            Set<String> keysInBlock = blockIndex.keySet();
+            for( String keyInBlock : keysInBlock )
+            {
+                logger_.warn("BLOCK KEY: " + keyInBlock);
+            }
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+        */
+        blockIndex.clear();        
+    }
+
+    public void append(String key, DataOutputBuffer buffer) throws IOException
+    {
+        long currentPosition = beforeAppend(key);
+        dataWriter_.append(key, buffer);
+        afterAppend(key, currentPosition, buffer.getLength());
+    }
+    
+    public void append(String key, BigInteger hash, DataOutputBuffer buffer) throws IOException
+    {
+        long currentPosition = beforeAppend(hash);
+        /* Use as key - hash + ":" + key */
+        dataWriter_.append(hash + ":" + key, buffer);
+        afterAppend(hash, currentPosition, buffer.getLength());
+    }
+
+    public void append(String key, byte[] value) throws IOException
+    {
+        long currentPosition = beforeAppend(key);
+        dataWriter_.append(key, value);
+        afterAppend(key, currentPosition, value.length );
+    }
+    
+    public void append(String key, BigInteger hash, byte[] value) throws IOException
+    {
+        long currentPosition = beforeAppend(hash);
+        /* Use as key - hash + ":" + key */
+        dataWriter_.append(hash + ":" + key, value);
+        afterAppend(hash, currentPosition, value.length);
+    }
+
+    private Coordinate getCoordinates(String key, IFileReader dataReader) throws IOException
+    {
+    	List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
+    	int size = (indexInfo == null) ? 0 : indexInfo.size();
+    	long start = 0L;
+    	long end = dataReader.getEOF();
+        if ( size > 0 )
+        {
+            int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(key));
+            if ( index < 0 )
+            {
+                /*
+                 * We are here which means that the requested
+                 * key is not an index.
+                */
+                index = (++index)*(-1);
+                /*
+                 * This means key is not present at all. Hence
+                 * a scan is in order.
+                */
+                start = (index == 0) ? 0 : indexInfo.get(index - 1).position();
+                if ( index < size )
+                {
+                    end = indexInfo.get(index).position();
+                }
+                else
+                {
+                    /* This is the Block Index in the file. */
+                    end = start;
+                }
+            }
+            else
+            {
+                /*
+                 * If we are here that means the key is in the index file
+                 * and we can retrieve it w/o a scan. In reality we would
+                 * like to have a retreive(key, fromPosition) but for now
+                 * we use scan(start, start + 1) - a hack.
+                */
+                start = indexInfo.get(index).position();                
+                end = start;
+            }
+        }
+        else
+        {
+            /*
+             * We are here which means there are less than
+             * 128 keys in the system and hence our only recourse
+             * is a linear scan from start to finish. Automatically
+             * use memory mapping since we have a huge file and very
+             * few keys.
+            */
+            end = dataReader.getEOF();
+        }  
+        
+        return new Coordinate(start, end);
+    }
+    
+    /**
+     * Convert the application key into the appropriate application
+     * key based on the partition type.
+     * 
+     * @param key the application key
+     * @return the appropriate key based on partition mechanism
+    */
+    private String morphKey(String key)
+    {
+        String internalKey = key;
+        PartitionerType pType = StorageService.getPartitionerType();
+        switch ( pType )
+        {
+            case OPHF:
+                break;
+                
+            default:
+                internalKey = FBUtilities.hash(key).toString();
+                break;
+        }
+        return internalKey;
+    }
+    
+    public DataInputBuffer next(String key, String cf, List<String> cNames) throws IOException
+    {
+    	DataInputBuffer bufIn = null;        
+        IFileReader dataReader = null;
+        try
+        {
+            dataReader = SequenceFile.reader(dataFile_);
+            /* Morph key into actual key based on the partition type. */ 
+            key = morphKey(key);
+            Coordinate fileCoordinate = getCoordinates(key, dataReader);    
+            /*
+             * we have the position we have to read from in order to get the
+             * column family, get the column family and column(s) needed.
+            */          
+            bufIn = getData(dataReader, key, cf, cNames, fileCoordinate);
+        }
+        finally
+        {
+            if ( dataReader != null )
+            {
+                dataReader.close();
+            }
+        }
+        return bufIn;
+    }
+    
+    public DataInputBuffer next(String key, String columnName) throws IOException
+    {
+        DataInputBuffer bufIn = null;
+        IFileReader dataReader = null;
+        try
+        {
+            dataReader = SequenceFile.reader(dataFile_);
+            /* Morph key into actual key based on the partition type. */ 
+            key = morphKey(key);
+            Coordinate fileCoordinate = getCoordinates(key, dataReader);
+            /*
+             * we have the position we have to read from in order to get the
+             * column family, get the column family and column(s) needed.
+            */            
+            bufIn = getData(dataReader, key, columnName, fileCoordinate);
+        }
+        finally
+        {
+            if ( dataReader != null )
+            {
+                dataReader.close();
+            }
+        }
+        return bufIn;
+    }
+    
+    public DataInputBuffer next(String key, String columnName, IndexHelper.TimeRange timeRange) throws IOException
+    {
+        DataInputBuffer bufIn = null;
+        IFileReader dataReader = null;
+        try
+        {
+            dataReader = SequenceFile.reader(dataFile_);
+            /* Morph key into actual key based on the partition type. */ 
+            key = morphKey(key);
+            Coordinate fileCoordinate = getCoordinates(key, dataReader);
+            /*
+             * we have the position we have to read from in order to get the
+             * column family, get the column family and column(s) needed.
+            */  
+            bufIn = getData(dataReader, key, columnName, timeRange, fileCoordinate);
+        }
+        finally
+        {
+            if ( dataReader != null )
+            {
+                dataReader.close();
+            }
+        }
+        return bufIn;
+    }
+    
+    long getSeekPosition(String key, long start)
+    {
+        Long seekStart = touchCache_.get(dataFile_ + ":" + key);
+        if( seekStart != null)
+        {
+            return seekStart;
+        }
+        return start;
+    }
+        
+    /*
+     * Get the data for the key from the position passed in. 
+    */
+    private DataInputBuffer getData(IFileReader dataReader, String key, String column, Coordinate section) throws IOException
+    {
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        
+        long bytesRead = dataReader.next(key, bufOut, column, section);
+        if ( bytesRead != -1L )
+        {
+            if ( bufOut.getLength() > 0 )
+            {                              
+                bufIn.reset(bufOut.getData(), bufOut.getLength());            
+                /* read the key even though we do not use it */
+                bufIn.readUTF();
+                bufIn.readInt();            
+            }
+        }
+        
+        return bufIn;
+    }
+    
+    private DataInputBuffer getData(IFileReader dataReader, String key, String cf, List<String> columns, Coordinate section) throws IOException
+    {
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+                  
+        long bytesRead = dataReader.next(key, bufOut, cf, columns, section);
+        if ( bytesRead != -1L )
+        {
+            if ( bufOut.getLength() > 0 )
+            {                     
+                bufIn.reset(bufOut.getData(), bufOut.getLength());             
+                /* read the key even though we do not use it */
+                bufIn.readUTF();
+                bufIn.readInt();            
+            }        
+        }
+        return bufIn;
+    }
+    
+    /*
+     * Get the data for the key from the position passed in. 
+    */
+    private DataInputBuffer getData(IFileReader dataReader, String key, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
+    {
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        DataInputBuffer bufIn = new DataInputBuffer();
+                
+        try
+        {
+            dataReader.next(key, bufOut, column, timeRange, section);
+            if ( bufOut.getLength() > 0 )
+            {                              
+                bufIn.reset(bufOut.getData(), bufOut.getLength());            
+                /* read the key even though we do not use it */
+                bufIn.readUTF();
+                bufIn.readInt();            
+            }
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+        return bufIn;
+    }
+    
+    /*
+     * Given a key we are interested in this method gets the
+     * closest index before the key on disk.
+     *
+     *  param @ key - key we are interested in.
+     *  return position of the closest index before the key
+     *  on disk or -1 if this key is not on disk.
+    */
+    private long getClosestIndexPositionToKeyOnDisk(String key)
+    {
+        long position = -1L;
+        List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
+        int size = indexInfo.size();
+        int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(key));
+        if ( index < 0 )
+        {
+            /*
+             * We are here which means that the requested
+             * key is not an index.
+            */
+            index = (++index)*(-1);
+            /* this means key is not present at all */
+            if ( index >= size )
+                return position;
+            /* a scan is in order. */
+            position = (index == 0) ? 0 : indexInfo.get(index - 1).position();
+        }
+        else
+        {
+            /*
+             * If we are here that means the key is in the index file
+             * and we can retrieve it w/o a scan. In reality we would
+             * like to have a retreive(key, fromPosition) but for now
+             * we use scan(start, start + 1) - a hack.
+            */
+            position = indexInfo.get(index).position();
+        }
+        return position;
+    }
+
+    public void close() throws IOException
+    {
+        close( new byte[0], 0 );
+    }
+
+    public void close(BloomFilter bf) throws IOException
+    {
+        /* Any remnants in the blockIndex should be added to the dump */
+    	blockIndexes_.add(blockIndex_);
+    	dumpBlockIndexes();
+        
+    	/* reset the buffer and serialize the Bloom Filter. */
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        BloomFilter.serializer().serialize(bf, bufOut);
+        bufOut.close();
+
+        close(bufOut.getData(), bufOut.getLength());
+        // byte[] bytes = new byte[bufOut.getLength()];        
+        // System.arraycopy(bufOut.getData(), 0, bytes, 0, bufOut.getLength());
+        // close(bytes, bytes.length);             
+    }
+
+    /**
+     * Renames a temporary SSTable file to a valid data and index file
+     */
+    public void closeRename(BloomFilter bf) throws IOException
+    {
+    	close(bf);
+        String tmpDataFile = dataFile_;
+    	String dataFileName = dataFile_.replace("-" + temporaryFile_,"");    	
+    	File dataFile = new File(dataFile_);
+    	dataFile.renameTo(new File(dataFileName));    	    	
+    	dataFile_ = dataFileName;        
+    	/* Now repair the in memory index associated with the old name */
+    	List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.remove(tmpDataFile);    	    	  	    	
+    	SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
+    }
+    
+    public void closeRename(BloomFilter bf, List<String> files) throws IOException
+    {
+        close( bf);
+        String tmpDataFile = dataFile_;
+        String dataFileName = dataFile_.replace("-" + temporaryFile_,"");
+        File dataFile = new File(dataFile_);
+        dataFile.renameTo(new File(dataFileName));
+        dataFile_ = dataFileName;
+        /* Now repair the in memory index associated with the old name */
+        List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.remove(tmpDataFile);                         
+        SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
+        if ( files != null )
+        {            
+            files.add(dataFile_);
+        }
+    }
+    
+    private void close(byte[] footer, int size) throws IOException
+    {
+        /*
+         * Write the bloom filter for this SSTable.
+         * Then write three longs one which is a version
+         * and one which is a pointer to the last written
+         * block index and the last one is the position of
+         * the Bloom Filter.
+         */
+        if ( dataWriter_ != null )
+        {            
+            long bloomFilterPosition = dataWriter_.getCurrentPosition();
+            dataWriter_.close(footer, size);
+            /* write the version field into the SSTable */           
+            dataWriter_.writeDirect(BasicUtilities.longToByteArray(version_));
+            /* write the relative position of the first block index from current position */
+            long blockPosition = dataWriter_.getCurrentPosition() - firstBlockPosition_;
+            dataWriter_.writeDirect(BasicUtilities.longToByteArray(blockPosition));
+            
+            /* write the position of the bloom filter */
+            long bloomFilterRelativePosition = dataWriter_.getCurrentPosition() - bloomFilterPosition;
+            dataWriter_.writeDirect(BasicUtilities.longToByteArray(bloomFilterRelativePosition));            
+            dataWriter_.close();
+        }
+    } 
+}