You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [13/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnIndexer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnIndexer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnIndexer.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable.KeyPositionInfo;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * Help to create an index for a column family based on size of columns
+ * Author : Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+
+public class ColumnIndexer
+{
+	/**
+	 * Given a column family this, function creates an in-memory structure that represents the
+	 * column index for the column family, and subsequently writes it to disk.
+	 * @param columnFamily Column family to create index for
+	 * @param dos data output stream
+	 * @throws IOException
+	 */
+    public static void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+	{
+        Collection<IColumn> columns = columnFamily.getAllColumns();
+        BloomFilter bf = createColumnBloomFilter(columns);                    
+        /* Write out the bloom filter. */
+        DataOutputBuffer bufOut = new DataOutputBuffer(); 
+        BloomFilter.serializer().serialize(bf, bufOut);
+        /* write the length of the serialized bloom filter. */
+        dos.writeInt(bufOut.getLength());
+        /* write out the serialized bytes. */
+        dos.write(bufOut.getData(), 0, bufOut.getLength());
+                
+        TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(columnFamily.name());        
+        doIndexing(typeInfo, columns, dos);        
+	}
+    
+    /**
+     * Create a bloom filter that contains the subcolumns and the columns that
+     * make up this Column Family.
+     * @param columns columns of the ColumnFamily
+     * @return BloomFilter with the summarized information.
+     */
+    private static BloomFilter createColumnBloomFilter(Collection<IColumn> columns)
+    {
+        int columnCount = 0;
+        for ( IColumn column : columns )
+        {
+            columnCount += column.getObjectCount();
+        }
+        
+        BloomFilter bf = new BloomFilter(columnCount, 4);
+        for ( IColumn column : columns )
+        {
+            bf.fill(column.name());
+            /* If this is SuperColumn type Column Family we need to get the subColumns too. */
+            if ( column instanceof SuperColumn )
+            {
+                Collection<IColumn> subColumns = column.getSubColumns();
+                for ( IColumn subColumn : subColumns )
+                {
+                    bf.fill(subColumn.name());
+                }
+            }
+        }
+        return bf;
+    }
+    
+    private static IndexHelper.ColumnIndexInfo getColumnIndexInfo(TypeInfo typeInfo, IColumn column)
+    {
+        IndexHelper.ColumnIndexInfo cIndexInfo = null;
+        
+        if ( column instanceof SuperColumn )
+        {
+            cIndexInfo = IndexHelper.ColumnIndexFactory.instance(TypeInfo.STRING);            
+            cIndexInfo.set(column.name());
+        }
+        else
+        {
+            cIndexInfo = IndexHelper.ColumnIndexFactory.instance(typeInfo);                        
+            switch(typeInfo)
+            {
+                case STRING:
+                    cIndexInfo.set(column.name());                        
+                    break;
+                    
+                case LONG:
+                    cIndexInfo.set(column.timestamp());                        
+                    break;
+            }
+        }
+        
+        return cIndexInfo;
+    }
+
+    /**
+     * Given the collection of columns in the Column Family,
+     * the name index is generated and written into the provided
+     * stream
+     * @param columns for whom the name index needs to be generated
+     * @param bf bloom filter that summarizes the columns that make
+     *           up the column family.
+     * @param dos stream into which the serialized name index needs
+     *            to be written.
+     * @throws IOException
+     */
+    private static void doIndexing(TypeInfo typeInfo, Collection<IColumn> columns, DataOutputStream dos) throws IOException
+    {
+        /* we are going to write column indexes */
+        int numColumns = 0;
+        int position = 0;
+        int indexSizeInBytes = 0;
+        int sizeSummarized = 0;
+        
+        /*
+         * Maintains a list of KeyPositionInfo objects for the columns in this
+         * column family. The key is the column name and the position is the
+         * relative offset of that column name from the start of the list.
+         * We do this so that we don't read all the columns into memory.
+        */
+        
+        List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();        
+        
+        /* column offsets at the right thresholds into the index map. */
+        for ( IColumn column : columns )
+        {
+            /* if we hit the column index size that we have to index after, go ahead and index it */
+            if(position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
+            {      
+                /*
+                 * ColumnSort applies only to columns. So in case of 
+                 * SuperColumn always use the name indexing scheme for
+                 * the SuperColumns. We will fix this later.
+                 */
+                IndexHelper.ColumnIndexInfo cIndexInfo = getColumnIndexInfo(typeInfo, column);                
+                cIndexInfo.position(position);
+                cIndexInfo.count(numColumns);                
+                columnIndexList.add(cIndexInfo);
+                /*
+                 * we will be writing this object as a UTF8 string and two ints,
+                 * so calculate the size accordingly. Note that we store the string
+                 * as UTF-8 encoded, so when we calculate the length, it should be
+                 * converted to UTF-8.
+                 */
+                indexSizeInBytes += cIndexInfo.size();
+                sizeSummarized = position;
+                numColumns = 0;
+            }
+            position += column.serializedSize();
+            ++numColumns;
+        }
+        /* write the column index list */
+        IndexHelper.serialize(indexSizeInBytes, columnIndexList, dos);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,684 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.nio.file.Path;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+/*
+ * Commit Log tracks every write operation into the system. The aim
+ * of the commit log is to be able to successfully recover data that was
+ * not stored to disk via the Memtable. Every Commit Log maintains a
+ * header represented by the abstraction CommitLogHeader. The header
+ * contains a bit array and an array of longs and both the arrays are
+ * of size, #column families for the Table, the Commit Log represents.
+ * Whenever a ColumnFamily is written to, for the first time its bit flag
+ * is set to one in the CommitLogHeader. When it is flushed to disk by the
+ * Memtable its corresponding bit in the header is set to zero. This helps
+ * track which CommitLogs can be thrown away as a result of Memtable flushes.
+ * However if a ColumnFamily is flushed and again written to disk then its
+ * entry in the array of longs is updated with the offset in the Commit Log
+ * file where it was written. This helps speed up recovery since we can seek
+ * to these offsets and start processing the commit log.
+ * Every Commit Log is rolled over everytime it reaches its threshold in size.
+ * Over time there could be a number of commit logs that would be generated.
+ * Hovever whenever we flush a column family disk and update its bit flag we
+ * take this bit array and bitwise & it with the headers of the other commit
+ * logs that are older.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+class CommitLog
+{
+    private static final int bufSize_ = 128*1024*1024;
+    private static Map<String, CommitLog> instances_ = new HashMap<String, CommitLog>();
+    private static Lock lock_ = new ReentrantLock();
+    private static Logger logger_ = Logger.getLogger(CommitLog.class);
+    private static Map<String, CommitLogHeader> clHeaders_ = new HashMap<String, CommitLogHeader>();
+    
+    public static final class CommitLogContext
+    {
+        static CommitLogContext NULL = new CommitLogContext(null, -1L);
+        /* Commit Log associated with this operation */
+        private String file_;
+        /* Offset within the Commit Log where this row as added */
+        private long position_;
+
+        public CommitLogContext(String file, long position)
+        {
+            file_ = file;
+            position_ = position;
+        }
+
+        boolean isValidContext()
+        {
+            return (position_ != -1L);
+        }
+
+        String file()
+        {
+            return file_;
+        }
+
+        long position()
+        {
+            return position_;
+        }
+    }
+
+    public static class CommitLogFileComparator implements Comparator<String>
+    {
+        public int compare(String f, String f2)
+        {
+            return (int)(getCreationTime(f) - getCreationTime(f2));
+        }
+
+        public boolean equals(Object o)
+        {
+            if ( !(o instanceof CommitLogFileComparator) )
+                return false;
+            return true;
+        }
+    }
+
+    static long getCreationTime(String file)
+    {
+        String[] entries = FBUtilities.strip(file, "-.");
+        return Long.parseLong(entries[entries.length - 2]);
+    }
+
+    /*
+     * Write the serialized commit log header into the specified commit log.
+    */
+    private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
+    {     
+        IFileWriter logWriter = CommitLog.createWriter(commitLogFileName);
+        logWriter.seek(0L);
+        /* write the commit log header */
+        logWriter.writeDirect(bytes);
+        logWriter.close();
+    }
+
+    private static IFileWriter createWriter(String file) throws IOException
+    {        
+        if ( DatabaseDescriptor.isFastSync() )
+        {
+            /* Add this to the threshold */
+            int bufSize = 4*1024*1024;
+            return SequenceFile.fastWriter(file, CommitLog.bufSize_ + bufSize);
+        }
+        else
+            return SequenceFile.writer(file);
+    }
+
+    static CommitLog open(String table) throws IOException
+    {
+        CommitLog commitLog = instances_.get(table);
+        if ( commitLog == null )
+        {
+            CommitLog.lock_.lock();
+            try
+            {
+                commitLog = instances_.get(table);
+                if ( commitLog == null )
+                {
+                    commitLog = new CommitLog(table, false);
+                    instances_.put(table, commitLog);
+                }
+            }
+            finally
+            {
+                CommitLog.lock_.unlock();
+            }
+        }
+        return commitLog;
+    }
+
+    static String getTableName(String file)
+    {
+        String[] values = file.split("-");
+        return values[1];
+    }
+
+    private String table_;
+    /* Current commit log file */
+    private String logFile_;
+    /* header for current commit log */
+    private CommitLogHeader clHeader_;
+    private IFileWriter logWriter_;
+    private long commitHeaderStartPos_;
+    /* Force rollover the commit log on the next insert */
+    private boolean forcedRollOver_ = false;
+
+
+    /*
+     * Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
+     * directory specified by the Database Descriptor.
+    */
+    private void setNextFileName()
+    {
+        logFile_ = DatabaseDescriptor.getLogFileLocation() +
+                            System.getProperty("file.separator") +
+                            "CommitLog-" +
+                            table_ +
+                            "-" +
+                            System.currentTimeMillis() +
+                            ".log";
+    }
+
+    /*
+     * param @ table - name of table for which we are maintaining
+     *                 this commit log.
+     * param @ recoverymode - is commit log being instantiated in
+     *                        in recovery mode.
+    */
+    CommitLog(String table, boolean recoveryMode) throws IOException
+    {
+        table_ = table;
+        if ( !recoveryMode )
+        {
+            setNextFileName();            
+            logWriter_ = CommitLog.createWriter(logFile_);
+            writeCommitLogHeader();
+        }
+    }
+
+    /*
+     * This ctor is currently used only for debugging. We
+     * are now using it to modify the header so that recovery
+     * can be tested in as many scenarios as we could imagine.
+     *
+     * param @ logFile - logfile which we wish to modify.
+    */
+    CommitLog(File logFile) throws IOException
+    {
+        table_ = CommitLog.getTableName(logFile.getName());
+        logFile_ = logFile.getAbsolutePath();        
+        logWriter_ = CommitLog.createWriter(logFile_);
+        commitHeaderStartPos_ = 0L;
+    }
+
+    String getLogFile()
+    {
+        return logFile_;
+    }
+
+    void readCommitLogHeader(String logFile, byte[] bytes) throws IOException
+    {
+        IFileReader logReader = SequenceFile.reader(logFile);
+        try
+        {
+            logReader.readDirect(bytes);
+        }
+        finally
+        {
+            logReader.close();
+        }
+    }
+
+    /*
+     * This is invoked on startup via the ctor. It basically
+     * writes a header with all bits set to zero.
+    */
+    private void writeCommitLogHeader() throws IOException
+    {
+        Table table = Table.open(table_);
+        int cfSize = table.getNumberOfColumnFamilies();
+        /* record the beginning of the commit header */
+        commitHeaderStartPos_ = logWriter_.getCurrentPosition();
+        /* write the commit log header */
+        clHeader_ = new CommitLogHeader(cfSize);
+        writeCommitLogHeader(clHeader_.toByteArray(), false);
+    }
+
+    private void writeCommitLogHeader(byte[] bytes, boolean reset) throws IOException
+    {
+        /* record the current position */
+        long currentPos = logWriter_.getCurrentPosition();
+        logWriter_.seek(commitHeaderStartPos_);
+        /* write the commit log header */
+        logWriter_.writeDirect(bytes);
+        if ( reset )
+        {
+            /* seek back to the old position */
+            logWriter_.seek(currentPos);
+        }
+    }
+
+    void recover(List<File> clogs) throws IOException
+    {
+        Table table = Table.open(table_);
+        int cfSize = table.getNumberOfColumnFamilies();
+        int size = CommitLogHeader.size(cfSize);
+        byte[] header = new byte[size];
+        byte[] header2 = new byte[size];
+        int index = clogs.size() - 1;
+
+        File file = clogs.get(index);
+        readCommitLogHeader(file.getAbsolutePath(), header);
+
+        Stack<File> filesNeeded = new Stack<File>();
+        filesNeeded.push(file);
+
+        /*
+         * Identify files that we need for processing. This can be done
+         * using the information in the header of each file. Simply and
+         * the byte[] (which are the headers) and stop at the file where
+         * the result is a zero.
+        */
+        for ( int i = (index - 1); i >= 0; --i )
+        {
+            file = clogs.get(i);
+            readCommitLogHeader(file.getAbsolutePath(), header2);
+            byte[] result = CommitLogHeader.and(header, header2);
+            if ( !CommitLogHeader.isZero(result) )
+            {
+                filesNeeded.push(file);
+            }
+            else
+            {
+                break;
+            }
+        }
+
+        doRecovery(filesNeeded, header);
+    }
+
+    private void printHeader(byte[] header)
+    {
+        StringBuilder sb = new StringBuilder("");
+        for ( byte b : header )
+        {
+            sb.append(b);
+            sb.append(" ");
+        }
+        logger_.debug(sb.toString());
+    }
+
+    private void doRecovery(Stack<File> filesNeeded, byte[] header) throws IOException
+    {
+        Table table = Table.open(table_);
+
+        DataInputBuffer bufIn = new DataInputBuffer();
+        DataOutputBuffer bufOut = new DataOutputBuffer();        
+
+        while ( !filesNeeded.isEmpty() )
+        {
+            File file = filesNeeded.pop();
+            // IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
+            IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
+            try
+            {
+                Map<String, Row> rows = new HashMap<String, Row>();
+                reader.readDirect(header);
+                /* deserialize the commit log header */
+                bufIn.reset(header, 0, header.length);
+                CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
+                /* seek to the lowest position */
+                int lowPos = CommitLogHeader.getLowestPosition(clHeader);
+                /*
+                 * If lowPos == 0 then we need to skip the processing of this
+                 * file.
+                */
+                if (lowPos == 0)
+                    break;
+                else
+                    reader.seek(lowPos);
+
+                /* read the logs populate RowMutation and apply */
+                while ( !reader.isEOF() )
+                {
+                    bufOut.reset();
+                    long bytesRead = reader.next(bufOut);
+                    if ( bytesRead == -1 )
+                        break;
+
+                    bufIn.reset(bufOut.getData(), bufOut.getLength());
+                    /* Skip over the commit log key portion */
+                    bufIn.readUTF();
+                    /* Skip over data size */
+                    bufIn.readInt();
+                    
+                    /* read the commit log entry */
+                    try
+                    {                        
+                        Row row = Row.serializer().deserialize(bufIn);
+                        Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>(row.getColumnFamilies());
+                        /* remove column families that have already been flushed */
+                    	Set<String> cNames = columnFamilies.keySet();
+
+                        for ( String cName : cNames )
+                        {
+                        	ColumnFamily columnFamily = columnFamilies.get(cName);
+                        	/* TODO: Remove this to not process Hints */
+                        	if ( !DatabaseDescriptor.isApplicationColumnFamily(cName) )
+                        	{
+                        		row.removeColumnFamily(columnFamily);
+                        		continue;
+                        	}	
+                            int id = table.getColumnFamilyId(columnFamily.name());
+                            if ( clHeader.get(id) == 0 || reader.getCurrentPosition() < clHeader.getPosition(id) )
+                                row.removeColumnFamily(columnFamily);
+                        }
+                        if ( !row.isEmpty() )
+                        {                            
+                        	table.applyNow(row);
+                        }
+                    }
+                    catch ( IOException e )
+                    {
+                        logger_.debug( LogUtil.throwableToString(e) );
+                    }
+                }
+                reader.close();
+                /* apply the rows read */
+                table.flush(true);
+            }
+            catch ( Throwable th )
+            {
+                logger_.info( LogUtil.throwableToString(th) );
+                /* close the reader and delete this commit log. */
+                reader.close();
+                FileUtils.delete( new File[]{file} );
+            }
+        }
+    }
+
+    /*
+     * Update the header of the commit log if a new column family
+     * is encountered for the first time.
+    */
+    private void updateHeader(Row row) throws IOException
+    {
+    	Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        Table table = Table.open(table_);
+        Set<String> cNames = columnFamilies.keySet();
+        for ( String cName : cNames )
+        {
+        	ColumnFamily columnFamily = columnFamilies.get(cName);
+        	int id = table.getColumnFamilyId(columnFamily.name());
+        	if ( clHeader_.get(id) == 0 || ( clHeader_.get(id) == 1 && clHeader_.getPosition(id) == 0 ) )
+        	{
+            	if ( clHeader_.get(id) == 0 || ( clHeader_.get(id) == 1 && clHeader_.getPosition(id) == 0 ) )
+            	{
+	        		clHeader_.turnOn( id, logWriter_.getCurrentPosition() );
+	        		writeCommitLogHeader(clHeader_.toByteArray(), true);
+            	}
+        	}
+        }
+    }
+
+    /*
+     * Adds the specified row to the commit log. This method will reset the
+     * file offset to what it is before the start of the operation in case
+     * of any problems. This way we can assume that the subsequent commit log
+     * entry will override the garbage left over by the previous write.
+    */
+    synchronized CommitLogContext add(Row row) throws IOException
+    {
+        long currentPosition = -1L;
+        CommitLogContext cLogCtx = null;
+        DataOutputBuffer cfBuffer = new DataOutputBuffer();
+        long fileSize = 0L;
+        
+        try
+        {
+            /* serialize the row */
+            cfBuffer.reset();
+            Row.serializer().serialize(row, cfBuffer);
+            currentPosition = logWriter_.getCurrentPosition();
+            cLogCtx = new CommitLogContext(logFile_, currentPosition);
+            /* Update the header */
+            updateHeader(row);
+            logWriter_.append(table_, cfBuffer);
+            fileSize = logWriter_.getFileSize();                       
+            checkThresholdAndRollLog(fileSize);            
+        }
+        catch (IOException e)
+        {
+            if ( currentPosition != -1 )
+                logWriter_.seek(currentPosition);
+            throw e;
+        }
+        finally
+        {                  	
+            cfBuffer.close();            
+        }
+        return cLogCtx;
+    }
+
+    /*
+     * This is called on Memtable flush to add to the commit log
+     * a token indicating that this column family has been flushed.
+     * The bit flag associated with this column family is set in the
+     * header and this is used to decide if the log file can be deleted.
+    */
+    synchronized void onMemtableFlush(String cf, CommitLog.CommitLogContext cLogCtx) throws IOException
+    {
+        Table table = Table.open(table_);
+        int id = table.getColumnFamilyId(cf);
+        /* trying discarding old commit log files */
+        discard(cLogCtx, id);
+    }
+
+
+    /*
+     * Check if old commit logs can be deleted. However we cannot
+     * do this anymore in the Fast Sync mode and hence I think we
+     * should get rid of Fast Sync mode altogether. If there is
+     * a pathological event where few CF's are rarely being updated
+     * then their Memtable never gets flushed.
+     * This will prevent commit logs from being deleted. WE NEED to
+     * fix this using some hueristic and force flushing such Memtables.
+     *
+     * param @ cLogCtx The commitLog context .
+     * param @ id id of the columnFamily being flushed to disk.
+     *
+    */
+    private void discard(CommitLog.CommitLogContext cLogCtx, int id) throws IOException
+    {
+        /* retrieve the commit log header associated with the file in the context */
+        CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file());
+        if(commitLogHeader == null )
+        {
+            if( logFile_.equals(cLogCtx.file()) )
+            {
+                /* this means we are dealing with the current commit log. */
+                commitLogHeader = clHeader_;
+                clHeaders_.put(cLogCtx.file(), clHeader_);
+            }
+            else
+                return;
+        }
+        /*
+         * We do any processing only if there is a change in the position in the context.
+         * This can happen if an older Memtable's flush comes in after a newer Memtable's
+         * flush. Right now this cannot happen since Memtables are flushed on a single
+         * thread.
+        */
+        if ( cLogCtx.position() < commitLogHeader.getPosition(id) )
+            return;
+        commitLogHeader.turnOff(id);
+        /* Sort the commit logs based on creation time */
+        List<String> oldFiles = new ArrayList<String>(clHeaders_.keySet());
+        Collections.sort(oldFiles, new CommitLogFileComparator());
+        List<String> listOfDeletedFiles = new ArrayList<String>();
+        /*
+         * Loop through all the commit log files in the history. Now process
+         * all files that are older than the one in the context. For each of
+         * these files the header needs to modified by performing a bitwise &
+         * of the header with the header of the file in the context. If we
+         * encounter the file in the context in our list of old commit log files
+         * then we update the header and write it back to the commit log.
+        */
+        for(String oldFile : oldFiles)
+        {
+            if(oldFile.equals(cLogCtx.file()))
+            {
+                /*
+                 * We need to turn on again. This is because we always keep
+                 * the bit turned on and the position indicates from where the
+                 * commit log needs to be read. When a flush occurs we turn off
+                 * perform & operation and then turn on with the new position.
+                */
+                commitLogHeader.turnOn(id, cLogCtx.position());
+                writeCommitLogHeader(cLogCtx.file(), commitLogHeader.toByteArray());
+                break;
+            }
+            else
+            {
+                CommitLogHeader oldCommitLogHeader = clHeaders_.get(oldFile);
+                oldCommitLogHeader.and(commitLogHeader);
+                if(oldCommitLogHeader.isSafeToDelete())
+                {
+                	logger_.debug("Deleting commit log:"+ oldFile);
+                    FileUtils.deleteAsync(oldFile);
+                    listOfDeletedFiles.add(oldFile);
+                }
+                else
+                {
+                    writeCommitLogHeader(oldFile, oldCommitLogHeader.toByteArray());
+                }
+            }
+        }
+
+        for ( String deletedFile : listOfDeletedFiles)
+        {
+            clHeaders_.remove(deletedFile);
+        }
+    }
+
+    private void checkThresholdAndRollLog( long fileSize )
+    {
+        try
+        {
+            if ( fileSize >= DatabaseDescriptor.getLogFileSizeThreshold() || forcedRollOver_ )
+            {
+                if ( logWriter_.getFileSize() >= DatabaseDescriptor.getLogFileSizeThreshold() || forcedRollOver_ )
+                {
+	                /* Rolls the current log file over to a new one. */
+	                setNextFileName();
+	                String oldLogFile = logWriter_.getFileName();
+	                //history_.add(oldLogFile);
+	                logWriter_.close();
+	
+	                /* point reader/writer to a new commit log file. */
+	                // logWriter_ = SequenceFile.writer(logFile_);
+	                logWriter_ = CommitLog.createWriter(logFile_);
+	                /* squirrel away the old commit log header */
+	                clHeaders_.put(oldLogFile, new CommitLogHeader( clHeader_ ));
+	                /*
+	                 * We need to zero out positions because the positions in
+	                 * the old file do not make sense in the new one.
+	                */
+	                clHeader_.zeroPositions();
+	                writeCommitLogHeader(clHeader_.toByteArray(), false);
+	                // Get the list of files in commit log directory if it is greater than a certain number  
+	                // Force flush all the column families that way we ensure that a slowly populated column family is not screwing up 
+	                // by accumulating the commit logs .
+                }
+            }
+        }
+        catch ( IOException e )
+        {
+            logger_.info(LogUtil.throwableToString(e));
+        }
+        finally
+        {
+        	forcedRollOver_ = false;
+        }
+    }
+
+    public void setForcedRollOver()
+    {
+    	forcedRollOver_ = true;
+    }
+
+    synchronized  public void snapshot( String snapshotDirectory ) throws IOException
+    {
+        Map<String, List<File>> tableToCommitLogs = RecoveryManager.getListOFCommitLogsPerTable();
+        List<File> clogs = tableToCommitLogs.get(table_);
+        
+        if( clogs != null )
+        {
+        	File snapshotDir = new File(snapshotDirectory);
+        	if( !snapshotDir.exists() )
+        		snapshotDir.mkdir();
+        	File commitLogSnapshotDir = new File(snapshotDirectory + System.getProperty("file.separator") + "CommitLogs");
+        	if( !commitLogSnapshotDir.exists() )
+        		commitLogSnapshotDir.mkdir();
+            for (File file : clogs)
+            {
+            	Path existingLink = file.toPath();
+            	File hardLinkFile = new File(commitLogSnapshotDir.getAbsolutePath() + System.getProperty("file.separator") + file.getName());
+            	Path hardLink = hardLinkFile.toPath();
+            	hardLink.createLink(existingLink);
+            }
+        }
+    }
+    public static void main(String[] args) throws Throwable
+    {
+        LogUtil.init();
+
+        // the return value is not used in this case
+        DatabaseDescriptor.init();
+        
+        File logDir = new File(DatabaseDescriptor.getLogFileLocation());
+        File[] files = logDir.listFiles();
+        Arrays.sort( files, new FileUtils.FileComparator() );
+
+        byte[] bytes = new byte[CommitLogHeader.size(Integer.parseInt(args[0]))];
+        for ( File file : files )
+        {
+            CommitLog clog = new CommitLog( file );
+            clog.readCommitLogHeader(file.getAbsolutePath(), bytes);
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(bytes, 0, bytes.length);
+            CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
+            /*
+            StringBuilder sb = new StringBuilder("");
+            for ( byte b : bytes )
+            {
+                sb.append(b);
+                sb.append(" ");
+            }
+            */
+            System.out.println("FILE:" + file);
+            System.out.println(clHeader.toString());
+        }
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogEntry.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogEntry.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogEntry.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogEntry.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+/*
+ * An instance of this class represents an update to a table. 
+ * This is written to the CommitLog to be replayed on recovery. It
+ * contains enough information to be written to a SSTable to 
+ * capture events that happened before some catastrophe.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+class CommitLogEntry
+{    
+    private static AtomicInteger lsnGenerator_ = new AtomicInteger(0);
+    private static ICompactSerializer<CommitLogEntry> serializer_;
+    static
+    {
+        serializer_ = new CommitLogEntrySerializer();
+    }
+    
+    static ICompactSerializer<CommitLogEntry> serializer()
+    {
+        return serializer_;
+    }    
+    
+    private int length_;
+    private byte[] value_ = new byte[0];
+    
+    CommitLogEntry()
+    {
+    }
+    
+    CommitLogEntry(byte[] value)
+    {
+        this(value, 0);
+    }
+    
+    CommitLogEntry(byte[] value, int length)
+    {
+        value_ = value;   
+        length_ = length;
+    }
+    
+    void value(byte[] bytes)
+    {
+        value_ = bytes;
+    }
+    
+    byte[] value()
+    {
+        return value_;
+    }
+    
+    void length(int size)
+    {
+        length_ = size;
+    }
+    
+    int length()
+    {
+        return length_;
+    }
+}
+
+class CommitLogEntrySerializer implements ICompactSerializer<CommitLogEntry>
+{
+    public void serialize(CommitLogEntry logEntry, DataOutputStream dos) throws IOException
+    {    
+        int length = logEntry.length();
+        dos.writeInt(length);
+        dos.write(logEntry.value(), 0, length);           
+    }
+    
+    public CommitLogEntry deserialize(DataInputStream dis) throws IOException
+    {        
+        byte[] value = new byte[dis.readInt()];
+        dis.readFully(value);        
+        return new CommitLogEntry(value);
+    }
+}
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogHeader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogHeader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogHeader.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class CommitLogHeader
+{
+    private static ICompactSerializer<CommitLogHeader> serializer_;
+    
+    static
+    {
+        serializer_ = new CommitLogHeaderSerializer();
+    }
+    
+    static ICompactSerializer<CommitLogHeader> serializer()
+    {
+        return serializer_;
+    }
+    
+    static int size(int size)
+    {
+        /* 
+         * We serialize the CommitLogHeader as a byte[] and write it
+         * to disk. So we first write an "int" to specify the length 
+         * of the byte[] which is why we first have a 4 in the sum.
+         * We then have size which is the number of bits to track who
+         * has been flushed and then the rest is the position[]
+         * size = #of column families 
+         *        + 
+         *        size of the bitset 
+         *        + 
+         *        size of position array 
+         */
+        return 4 + size + (4 * size); 
+    }
+    
+    static int getLowestPosition(CommitLogHeader clHeader)
+    {
+        int[] positions = clHeader.getPositions();
+        int minPosition = Integer.MAX_VALUE;
+        for ( int position : positions )
+        {
+            if ( position < minPosition && position > 0)
+            {
+                minPosition = position;
+            }
+        }
+        
+        if(minPosition == Integer.MAX_VALUE)
+            minPosition = 0;
+        return minPosition;
+    }
+    
+    /* 
+     * Bitwise & of each byte in the two arrays.
+     * Both arrays are of same length. In order
+     * to be memory efficient the result is in
+     * the third parameter.
+    */
+    static byte[] and(byte[] bytes, byte[] bytes2) throws IOException
+    { 
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes, 0, bytes.length);
+        CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
+        byte[] clh = clHeader.getBitSet();
+        
+        bufIn.reset(bytes2, 0, bytes2.length);
+        CommitLogHeader clHeader2 = CommitLogHeader.serializer().deserialize(bufIn);
+        byte[] clh2 = clHeader2.getBitSet();
+        
+        byte[] result = new byte[clh.length];
+        for ( int i = 0; i < clh.length; ++i )
+        {            
+            result[i] = (byte)(clh[i] & clh2[i]);
+        }
+        
+        return result;
+    }
+    
+    static boolean isZero(byte[] bytes)
+    {
+        for ( byte b : bytes )
+        {
+            if ( b == 1 )
+                return false;
+        }
+        return true;
+    }
+    
+    private byte[] header_ = new byte[0];
+    private int[] position_ = new int[0];
+    
+    CommitLogHeader(int size)
+    {
+        header_ = new byte[size];
+        position_ = new int[size];
+    }
+    
+    /*
+     * This ctor is used while deserializing. This ctor
+     * also builds an index of position to column family
+     * Id.
+    */
+    CommitLogHeader(byte[] header, int[] position)
+    {
+        header_ = header;
+        position_ = position;
+    }
+    
+    CommitLogHeader(CommitLogHeader clHeader)
+    {
+        header_ = new byte[clHeader.header_.length];
+        System.arraycopy(clHeader.header_, 0, header_, 0, header_.length);
+        position_ = new int[clHeader.position_.length];
+        System.arraycopy(clHeader.position_, 0, position_, 0, position_.length);
+    }
+    
+    byte get(int index)
+    {
+        return header_[index];
+    } 
+    
+    int getPosition(int index)
+    {
+        return position_[index];
+    }
+    
+    void turnOn(int index, long position)
+    {
+        turnOn(header_, index, position);
+    }
+    
+    void turnOn(byte[] bytes, int index, long position)
+    {
+        bytes[index] = (byte)1;
+        position_[index] = (int)position;
+    }
+    
+    void turnOff(int index)
+    {
+        turnOff(header_, index);
+    }
+    
+    void turnOff(byte[] bytes, int index)
+    {
+        bytes[index] = (byte)0;
+        position_[index] = 0; 
+    }
+    
+    boolean isSafeToDelete() throws IOException
+    {
+        return isSafeToDelete(header_);
+    }
+    
+    boolean isSafeToDelete(byte[] bytes) throws IOException
+    {        
+        for ( byte b : bytes )
+        {
+            if ( b == 1 )
+                return false;
+        }
+        return true;
+    }
+    
+    byte[] getBitSet()
+    {
+        return header_;
+    }
+    
+    int[] getPositions()
+    {
+        return position_;
+    }
+    
+    void zeroPositions()
+    {
+        int size = position_.length;
+        position_ = new int[size];
+    }
+    
+    void and (CommitLogHeader commitLogHeader)
+    {        
+        byte[] clh2 = commitLogHeader.header_;
+        for ( int i = 0; i < header_.length; ++i )
+        {            
+            header_[i] = (byte)(header_[i] & clh2[i]);
+        }
+    }
+    
+    byte[] toByteArray() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);        
+        CommitLogHeader.serializer().serialize(this, dos);
+        return bos.toByteArray();
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");        
+        for ( int i = 0; i < header_.length; ++i )
+        {
+            sb.append(header_[i]);
+            sb.append(":");
+            Table table = Table.open( DatabaseDescriptor.getTables().get(0));
+            sb.append(table.getColumnFamilyName(i));
+            sb.append(" ");
+        }        
+        sb.append(" | " );        
+        for ( int position : position_ )
+        {
+            sb.append(position);
+            sb.append(" ");
+        }        
+        return sb.toString();
+    }
+}
+
+class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
+{
+    public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
+    {        
+        dos.writeInt(clHeader.getBitSet().length);
+        dos.write(clHeader.getBitSet());
+        int[] positions = clHeader.getPositions();        
+        
+        for ( int position : positions )
+        {
+            dos.writeInt(position);
+        }
+    }
+    
+    public CommitLogHeader deserialize(DataInputStream dis) throws IOException
+    {
+        int size = dis.readInt();
+        byte[] bitFlags = new byte[size];
+        dis.readFully(bitFlags);
+        
+        int[] position = new int[size];
+        for ( int i = 0; i < size; ++i )
+        {
+            position[i] = dis.readInt();
+        }
+                                                 
+        return new CommitLogHeader(bitFlags, position);
+    }
+}
+
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+import org.apache.cassandra.io.DataOutputBuffer;
+
+
+/*
+ * This is the abstraction that pre-processes calls to implmentations
+ * of the ICompactSerializer2 serialize() via dynamic proxies.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CompactSerializerInvocationHandler<T> implements InvocationHandler
+{
+    private ICompactSerializer2<T> serializer_;
+
+    public CompactSerializerInvocationHandler(ICompactSerializer2<T> serializer)
+    {
+        serializer_ = serializer;
+    }
+
+    /*
+     * This dynamic runtime proxy adds the indexes before the actual coumns are serialized.
+    */
+    public Object invoke(Object proxy, Method m, Object[] args) throws Throwable
+    {
+        /* Do the preprocessing here. */
+    	ColumnFamily cf = (ColumnFamily)args[0];
+    	DataOutputBuffer bufOut = (DataOutputBuffer)args[1];
+    	ColumnIndexer.serialize(cf, bufOut);
+        return m.invoke(serializer_, args);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+
+
+/**
+ * This class provides a filter for fitering out columns
+ * greater than a certain count.
+ * 
+ * @author pmalik
+ *
+ */
+public class CountFilter implements IFilter
+{
+	private long countLimit_;
+	private boolean isDone_;
+	
+	CountFilter(int countLimit)
+	{
+		countLimit_ = countLimit;		
+		isDone_ = false;
+	}
+	
+	public ColumnFamily filter(String cfNameParam, ColumnFamily columnFamily)
+	{
+    	String[] values = RowMutation.getColumnAndColumnFamily(cfNameParam);
+        if ( columnFamily == null )
+            return columnFamily;
+        
+		String cfName = columnFamily.name();
+		ColumnFamily filteredCf = new ColumnFamily(cfName);
+		if( countLimit_ <= 0 )
+		{
+			isDone_ = true;
+			return filteredCf;
+		}
+		if( values.length == 1)
+		{
+    		Collection<IColumn> columns = columnFamily.getAllColumns();
+    		for(IColumn column : columns)
+    		{
+    			filteredCf.addColumn(column.name(), column);
+    			countLimit_--;
+    			if( countLimit_ <= 0 )
+    			{
+    				isDone_ = true;
+    				return filteredCf;
+    			}
+    		}
+		}
+		else if(values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+		{
+    		Collection<IColumn> columns = columnFamily.getAllColumns();
+    		for(IColumn column : columns)
+    		{
+    			SuperColumn superColumn = (SuperColumn)column;
+    			SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+				filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
+        		Collection<IColumn> subColumns = superColumn.getSubColumns();
+        		for(IColumn subColumn : subColumns)
+        		{
+		            filteredSuperColumn.addColumn(subColumn.name(), subColumn);
+	    			countLimit_--;
+	    			if( countLimit_ <= 0 )
+	    			{
+	    				isDone_ = true;
+	    				return filteredCf;
+	    			}
+        		}
+    		}
+		}    	
+    	else 
+    	{
+    		throw new UnsupportedOperationException();
+    	}
+		return filteredCf;
+	}
+    
+    public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+    {
+		countLimit_--;
+		if( countLimit_ <= 0 )
+		{
+			isDone_ = true;
+		}
+		return column;
+    }
+	
+	public boolean isDone()
+	{
+		return isDone_;
+	}
+
+	public void setDone()
+	{
+		isDone_ = true;
+	}
+
+    public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+    {
+    	return ssTable.next(key, cf);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/DBConstants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DBConstants.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DBConstants.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DBConstants.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,9 @@
+package org.apache.cassandra.db;
+
+final class DBConstants
+{
+	public static final int boolSize_ = 1;
+	public static final int intSize_ = 4;
+	public static final int longSize_ = 8;
+	public static final int tsSize_ = 8;
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.HashingSchemes;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DBManager
+{
+    private static DBManager dbMgr_;
+    private static Lock lock_ = new ReentrantLock();
+
+    public static DBManager instance() throws Throwable
+    {
+        if ( dbMgr_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( dbMgr_ == null )
+                    dbMgr_ = new DBManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return dbMgr_;
+    }
+
+    public static class StorageMetadata
+    {
+        private BigInteger storageId_;
+        private int generation_;
+
+        StorageMetadata(BigInteger storageId, int generation)
+        {
+            storageId_ = storageId;
+            generation_ = generation;
+        }
+
+        public BigInteger getStorageId()
+        {
+            return storageId_;
+        }
+
+        public void setStorageId(BigInteger storageId)
+        {
+            storageId_ = storageId;
+        }
+
+        public int getGeneration()
+        {
+            return generation_;
+        }
+    }
+
+    public DBManager() throws Throwable
+    {
+        /* Read the configuration file */
+        Map<String, Map<String, CFMetaData>> tableToColumnFamilyMap = DatabaseDescriptor.init();
+        storeMetadata(tableToColumnFamilyMap);
+        Set<String> tables = tableToColumnFamilyMap.keySet();
+        
+        for (String table : tables)
+        {
+            Table tbl = Table.open(table);
+            tbl.onStart();
+        }
+        /* Do recovery if need be. */
+        RecoveryManager recoveryMgr = RecoveryManager.instance();
+        recoveryMgr.doRecovery();
+    }
+
+    /*
+     * Create the metadata tables. This table has information about
+     * the table name and the column families that make up the table.
+     * Each column family also has an associated ID which is an int.
+    */
+    private static void storeMetadata(Map<String, Map<String, CFMetaData>> tableToColumnFamilyMap) throws Throwable
+    {
+        AtomicInteger idGenerator = new AtomicInteger(0);
+        Set<String> tables = tableToColumnFamilyMap.keySet();
+
+        for ( String table : tables )
+        {
+            Table.TableMetadata tmetadata = Table.TableMetadata.instance();
+            if ( tmetadata.isEmpty() )
+            {
+                tmetadata = Table.TableMetadata.instance();
+                /* Column families associated with this table */
+                Map<String, CFMetaData> columnFamilies = tableToColumnFamilyMap.get(table);
+
+                for (String columnFamily : columnFamilies.keySet())
+                {
+                    tmetadata.add(columnFamily, idGenerator.getAndIncrement(), DatabaseDescriptor.getColumnType(columnFamily));
+                }
+
+                /*
+                 * Here we add all the system related column families. 
+                */
+                /* Add the TableMetadata column family to this map. */
+                tmetadata.add(Table.TableMetadata.cfName_, idGenerator.getAndIncrement());
+                /* Add the LocationInfo column family to this map. */
+                tmetadata.add(SystemTable.cfName_, idGenerator.getAndIncrement());
+                /* Add the recycle column family to this map. */
+                tmetadata.add(Table.recycleBin_, idGenerator.getAndIncrement());
+                /* Add the Hints column family to this map. */
+                tmetadata.add(Table.hints_, idGenerator.getAndIncrement(), ColumnFamily.getColumnType("Super"));
+                tmetadata.apply();
+                idGenerator.set(0);
+            }
+        }
+    }
+
+    /*
+     * This method reads the system table and retrieves the metadata
+     * associated with this storage instance. Currently we store the
+     * metadata in a Column Family called LocatioInfo which has two
+     * columns namely "Token" and "Generation". This is the token that
+     * gets gossiped around and the generation info is used for FD.
+    */
+    public DBManager.StorageMetadata start() throws IOException
+    {
+        StorageMetadata storageMetadata = null;
+        /* Read the system table to retrieve the storage ID and the generation */
+        SystemTable sysTable = SystemTable.openSystemTable(SystemTable.name_);
+        Row row = sysTable.get(FBUtilities.getHostName());
+
+        Random random = new Random();
+        if ( row == null )
+        {
+        	/* Generate a token for this Storage node */                       
+            String guid = GuidGenerator.guid();
+            BigInteger token = StorageService.hash(guid);
+            if ( token.signum() == -1 )
+                token = token.multiply(BigInteger.valueOf(-1L));
+
+            int generation = 1;
+
+            String key = FBUtilities.getHostName();
+            row = new Row(key);
+            ColumnFamily cf = new ColumnFamily(SystemTable.cfName_);
+            cf.addColumn(SystemTable.token_, new Column(SystemTable.token_, token.toByteArray()) );
+            cf.addColumn(SystemTable.generation_, new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
+            row.addColumnFamily(cf);
+            sysTable.apply(row);
+            storageMetadata = new StorageMetadata( token, generation);
+        }
+        else
+        {
+            /* we crashed and came back up need to bump generation # */
+        	Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        	Set<String> cfNames = columnFamilies.keySet();
+
+            for ( String cfName : cfNames )
+            {
+            	ColumnFamily columnFamily = columnFamilies.get(cfName);
+
+                IColumn token = columnFamily.getColumn(SystemTable.token_);
+                BigInteger bi = new BigInteger( token.value() );
+
+                IColumn generation = columnFamily.getColumn(SystemTable.generation_);
+                int gen = BasicUtilities.byteArrayToInt(generation.value()) + 1;
+
+                Column generation2 = new Column("Generation", BasicUtilities.intToByteArray(gen), generation.timestamp() + 1);
+                columnFamily.addColumn("Generation", generation2);
+                storageMetadata = new StorageMetadata( bi, gen );
+                break;
+            }
+            sysTable.reset(row);
+        }
+        return storageMetadata;
+    }
+
+    public static void main(String[] args) throws Throwable
+    {
+        DBManager.instance().start();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DataFileVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DataFileVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DataFileVerbHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,46 @@
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class DataFileVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger( DataFileVerbHandler.class );
+    
+    public void doVerb(Message message)
+    {        
+        Object[] body = message.getMessageBody();
+        byte[] bytes = (byte[])body[0];
+        String table = new String(bytes);
+        logger_.info("**** Received a request from " + message.getFrom());
+        
+        List<String> allFiles = Table.open(table).getAllSSTablesOnDisk();        
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        try
+        {
+            dos.writeInt(allFiles.size());
+            for ( String file : allFiles )
+            {
+                dos.writeUTF(file);
+            }
+            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bos.toByteArray()});
+            MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/EfficientBidiMap.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/EfficientBidiMap.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/EfficientBidiMap.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/EfficientBidiMap.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.Serializable;
+import java.util.*;
+
+import org.apache.cassandra.db.ColumnComparatorFactory.ComparatorType;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class EfficientBidiMap implements Serializable
+{
+    private Map<String, IColumn> map_ = new HashMap<String, IColumn>();
+    private SortedSet<IColumn> sortedSet_;
+    private Comparator<IColumn> columnComparator_;
+
+    EfficientBidiMap()
+    {
+    	this(ColumnComparatorFactory.getComparator(ComparatorType.TIMESTAMP));
+    }
+
+    EfficientBidiMap(Comparator<IColumn> columnComparator)
+    {
+    	columnComparator_ = columnComparator;
+    	sortedSet_ = new TreeSet<IColumn>(columnComparator);
+    }
+
+    EfficientBidiMap(Map<String, IColumn> map, SortedSet<IColumn> set, Comparator<IColumn> comparator)
+    {
+    	map_ = map;
+    	sortedSet_ = set;
+    	columnComparator_ = comparator;
+    }
+
+    EfficientBidiMap(Object[] objects, Comparator<IColumn> columnComparator)
+    {
+    	columnComparator_ = columnComparator;
+    	sortedSet_ = new TreeSet<IColumn>(columnComparator);
+        for ( Object object : objects )
+        {
+            IColumn column = (IColumn)object;
+            sortedSet_.add(column);
+            map_.put(column.name(), column);
+        }
+    }
+
+    public Comparator<IColumn> getComparator()
+    {
+    	return columnComparator_;
+    }
+
+    public void put(String key, IColumn column)
+    {
+    	IColumn oldColumn = map_.get(key);
+    	if( oldColumn != null )
+    		sortedSet_.remove( oldColumn );
+        map_.put(key, column);
+        sortedSet_.add(column);
+    }
+
+    public IColumn get(String key)
+    {
+        return map_.get(key);
+    }
+
+    public SortedSet<IColumn> getSortedColumns()
+    {
+    	return sortedSet_;
+    }
+
+    public Map<String, IColumn> getColumns()
+    {
+        return map_;
+    }
+
+    public int size()
+    {
+    	return map_.size();
+    }
+
+    public void remove (String columnName)
+    {
+    	sortedSet_.remove(map_.get(columnName));
+    	map_.remove(columnName);
+    }
+    void clear()
+    {
+    	map_.clear();
+    	sortedSet_.clear();
+    }
+
+    ColumnComparatorFactory.ComparatorType getComparatorType()
+	{
+		return ((AbstractColumnComparator)columnComparator_).getComparatorType();
+	}
+
+    EfficientBidiMap cloneMe()
+    {
+    	Map<String, IColumn> map = new HashMap<String, IColumn>(map_);
+    	SortedSet<IColumn> set = new TreeSet<IColumn>(sortedSet_);
+    	return new EfficientBidiMap(map, set, columnComparator_);
+    }
+}
+
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileNameComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileNameComparator.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileNameComparator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileNameComparator.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,32 @@
+package org.apache.cassandra.db;
+
+import java.util.Comparator;
+
+class FileNameComparator implements Comparator<String>
+{
+	// 0 - ascending , 1- descending
+	private int order_ = 1 ;
+	
+	public static final int  Ascending = 0 ;
+	public static final int  Descending = 1 ;
+	
+	FileNameComparator( int order )
+	{
+		order_ = order;
+	}
+	
+    public int compare(String f, String f2)
+    {
+    	if( order_ == 1 )
+    		return ColumnFamilyStore.getIndexFromFileName(f2) - ColumnFamilyStore.getIndexFromFileName(f);
+    	else
+    		return ColumnFamilyStore.getIndexFromFileName(f) - ColumnFamilyStore.getIndexFromFileName(f2);
+    }
+
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof FileNameComparator))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+
+
+public class FileStruct implements Comparable<FileStruct>
+{
+    IFileReader reader_;
+    String key_;        
+    DataInputBuffer bufIn_;
+    DataOutputBuffer bufOut_;
+    
+    public FileStruct()
+    {
+    }
+    
+    public FileStruct(String file, int bufSize) throws IOException
+    {
+        bufIn_ = new DataInputBuffer();
+        bufOut_ = new DataOutputBuffer();
+        reader_ = SequenceFile.bufferedReader(file, bufSize);
+        long bytesRead = advance();
+        if ( bytesRead == -1L )
+            throw new IOException("Either the file is empty or EOF has been reached.");          
+    }
+    
+    public String getKey()
+    {
+        return key_;
+    }
+    
+    public DataOutputBuffer getBuffer()
+    {
+        return bufOut_;
+    }
+    
+    public long advance() throws IOException
+    {        
+        long bytesRead = -1L;
+        bufOut_.reset();
+        /* advance and read the next key in the file. */           
+        if (reader_.isEOF())
+        {
+            reader_.close();
+            return bytesRead;
+        }
+            
+        bytesRead = reader_.next(bufOut_);        
+        if (bytesRead == -1)
+        {
+            reader_.close();
+            return bytesRead;
+        }
+
+        bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
+        key_ = bufIn_.readUTF();
+        /* If the key we read is the Block Index Key then omit and read the next key. */
+        if ( key_.equals(SSTable.blockIndexKey_) )
+        {
+            bufOut_.reset();
+            bytesRead = reader_.next(bufOut_);
+            if (bytesRead == -1)
+            {
+                reader_.close();
+                return bytesRead;
+            }
+            bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
+            key_ = bufIn_.readUTF();
+        }
+        
+        return bytesRead;
+    }
+
+    public int compareTo(FileStruct f)
+    {
+    	int value = 0;
+        PartitionerType pType = StorageService.getPartitionerType();
+        switch( pType )
+        {
+            case OPHF:
+                value = key_.compareTo(f.key_);                    
+                break;
+                
+            default:
+            	String lhs = key_.split(":")[0];            
+                BigInteger b = new BigInteger(lhs);
+                String rhs = f.key_.split(":")[0];
+                BigInteger b2 = new BigInteger(rhs);
+                value = b.compareTo(b2);
+                break;
+        }
+        return value;
+    }
+    
+    public void close() throws IOException
+    {
+        bufIn_.close();
+        bufOut_.close();
+        reader_.close();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,18 @@
+package org.apache.cassandra.db;
+
+import java.util.Comparator;
+
+class FileStructComparator implements Comparator<FileStruct>
+{
+    public int compare(FileStruct f, FileStruct f2)
+    {
+        return f.reader_.getFileName().compareTo(f2.reader_.getFileName());
+    }
+
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof FileStructComparator))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseResolver;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class HintedHandOffManager implements IComponentShutdown
+{
+    private static HintedHandOffManager instance_;
+    private static Lock lock_ = new ReentrantLock();
+    private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
+    public static final String key_ = "HintedHandOffKey";
+    final static long intervalInMins_ = 20;
+    private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
+
+
+    public static HintedHandOffManager instance()
+    {
+        if ( instance_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                    instance_ = new HintedHandOffManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return instance_;
+    }
+
+    class HintedHandOff implements Runnable
+    {
+        private ColumnFamilyStore columnFamilyStore_ = null;
+        private EndPoint endPoint_ = null;
+
+        HintedHandOff(ColumnFamilyStore columnFamilyStore)
+        {
+        	columnFamilyStore_ = columnFamilyStore;
+        }
+        HintedHandOff(EndPoint endPoint)
+        {
+        	endPoint_ = endPoint;
+        }
+
+        private boolean sendMessage(String endpointAddress, String key) throws Exception
+        {
+        	boolean success = false; // TODO : fix the hack we need to make sure the data is written on the other end.
+        	if(FailureDetector.instance().isAlive(new EndPoint(endpointAddress, DatabaseDescriptor.getControlPort())))
+        	{
+        		success = true;
+        	}
+        	else
+        	{
+        		return success;
+        	}
+        	Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        	Row row = null;
+        	row = table.get(key);
+        	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
+			RowMutationMessage rmMsg = new RowMutationMessage(rm);
+			Message message = RowMutationMessage.makeRowMutationMessage( rmMsg );
+			EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
+			MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
+			return success;
+        }
+
+        private void deleteEndPoint(String endpointAddress, String key) throws Exception
+        {
+        	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+        	rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress);
+        	rm.apply();
+        }
+
+        private void deleteKey(String key) throws Exception
+        {
+        	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+        	rm.delete(Table.hints_ + ":" + key);
+        	rm.apply();
+        }
+
+        private void runHints()
+        {
+            logger_.debug("Started  hinted handoff " + columnFamilyStore_.columnFamily_);
+
+            // 1. Scan through all the keys that we need to handoff
+            // 2. For each key read the list of recepients and send
+            // 3. Delete that recepient from the key if write was successful
+            // 4. If all writes were success for a given key we can even delete the key .
+            // 5. Now force a flush
+            // 6. Do major compaction to clean up all deletes etc.
+            // 7. I guess we r done
+            Table table =  Table.open(DatabaseDescriptor.getTables().get(0));
+            ColumnFamily hintedColumnFamily = null;
+            boolean success = false;
+            boolean allsuccess = true;
+            try
+            {
+            	hintedColumnFamily = table.get(key_, Table.hints_);
+            	if(hintedColumnFamily == null)
+            	{
+                    // Force flush now
+                    columnFamilyStore_.forceFlush(false);
+            		return;
+            	}
+            	Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+            	if(keys != null)
+            	{
+                	for(IColumn key : keys)
+                	{
+                		// Get all the endpoints for teh key
+                		Collection<IColumn> endpoints =  key.getSubColumns();
+                		allsuccess = true;
+                		if ( endpoints != null )
+                		{
+                			for(IColumn endpoint : endpoints )
+                			{
+                				success = sendMessage(endpoint.name(), key.name());
+                				if(success)
+                				{
+                					// Delete the endpoint from the list
+                					deleteEndPoint(endpoint.name(), key.name());
+                				}
+                				else
+                				{
+                					allsuccess = false;
+                				}
+                			}
+                		}
+                		if(endpoints == null || allsuccess)
+                		{
+                			// Delete the key itself.
+                			deleteKey(key.name());
+                		}
+                	}
+            	}
+                // Force flush now
+                columnFamilyStore_.forceFlush(false);
+
+                // Now do a major compaction
+                columnFamilyStore_.forceCompaction(null, null, 0, null);
+            }
+            catch ( Exception ex)
+            {
+            	logger_.warn(ex.getMessage());
+            }
+            logger_.debug("Finished hinted handoff ..."+columnFamilyStore_.columnFamily_);
+        }
+
+        private void runDeliverHints(EndPoint to)
+        {
+            logger_.debug("Started  hinted handoff for endPoint " + endPoint_.getHost());
+
+            // 1. Scan through all the keys that we need to handoff
+            // 2. For each key read the list of recepients if teh endpoint matches send
+            // 3. Delete that recepient from the key if write was successful
+
+            Table table =  Table.open(DatabaseDescriptor.getTables().get(0));
+            ColumnFamily hintedColumnFamily = null;
+            boolean success = false;
+            try
+            {
+            	hintedColumnFamily = table.get(key_, Table.hints_);
+            	if(hintedColumnFamily == null)
+            		return;
+            	Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+            	if(keys != null)
+            	{
+                	for(IColumn key : keys)
+                	{
+                		// Get all the endpoints for teh key
+                		Collection<IColumn> endpoints =  key.getSubColumns();
+                		if ( endpoints != null )
+                		{
+                			for(IColumn endpoint : endpoints )
+                			{
+                				if(endpoint.name().equals(endPoint_.getHost()))
+                				{
+	                				success = sendMessage(endpoint.name(), key.name());
+	                				if(success)
+	                				{
+	                					// Delete the endpoint from the list
+	                					deleteEndPoint(endpoint.name(), key.name());
+	                				}
+                				}
+                			}
+                		}
+                		if(endpoints == null)
+                		{
+                			// Delete the key itself.
+                			deleteKey(key.name());
+                		}
+                	}
+            	}
+            }
+            catch ( Exception ex)
+            {
+            	logger_.warn(ex.getMessage());
+            }
+            logger_.debug("Finished hinted handoff for endpoint ..." + endPoint_.getHost());
+        }
+
+        public void run()
+        {
+        	if(endPoint_ == null)
+        	{
+        		runHints();
+        	}
+        	else
+        	{
+        		runDeliverHints(endPoint_);
+        	}
+
+        }
+    }
+
+    public HintedHandOffManager()
+    {
+    	StorageService.instance().registerComponentForShutdown(this);
+    }
+
+    public void submit(ColumnFamilyStore columnFamilyStore)
+    {
+    	executor_.scheduleWithFixedDelay(new HintedHandOff(columnFamilyStore), HintedHandOffManager.intervalInMins_,
+    			HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
+    }
+
+    /*
+     * This method is used to deliver hints to a particular endpoint.
+     * When we learn that some endpoint is back up we deliver the data
+     * to him via an event driven mechanism.
+    */
+    public void deliverHints(EndPoint to)
+    {
+    	executor_.submit(new HintedHandOff(to));
+    }
+
+    public void shutdown()
+    {
+    	executor_.shutdownNow();
+    }
+}