You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:04:22 UTC

svn commit: r749202 [4/6] - in /incubator/cassandra/src: ./ org/ org/apache/ org/apache/cassandra/ org/apache/cassandra/db/

Added: incubator/cassandra/src/org/apache/cassandra/db/IdentityFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/IdentityFilter.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/IdentityFilter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/IdentityFilter.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,72 @@
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+
+
+public class IdentityFilter implements IFilter
+{
+    private boolean isDone_ = false;
+    
+	public boolean isDone()
+	{
+		return isDone_;
+	}
+
+	public ColumnFamily filter(String cfString, ColumnFamily columnFamily)
+	{
+    	String[] values = RowMutation.getColumnAndColumnFamily(cfString);
+    	if( columnFamily == null )
+    		return columnFamily;
+		String cfName = columnFamily.name();
+		if ( values.length == 2 && !DatabaseDescriptor.getColumnType(cfName).equals("Super") )
+		{
+			Collection<IColumn> columns = columnFamily.getAllColumns();
+			if(columns.size() >= 1)
+				isDone_ = true;
+		}
+		if ( values.length == 3 && DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+		{
+    		Collection<IColumn> columns = columnFamily.getAllColumns();
+    		for(IColumn column : columns)
+    		{
+    			SuperColumn superColumn = (SuperColumn)column;
+        		Collection<IColumn> subColumns = superColumn.getSubColumns();
+        		if( subColumns.size() >= 1 )
+        			isDone_ = true;
+    		}
+		}
+		return columnFamily;
+	}
+
+	public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+	{
+		// TODO Auto-generated method stub
+		return column;
+	}
+
+	public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+	{
+		return ssTable.next(key, cf);
+	}
+
+	public void setDone()
+	{
+		isDone_ = true;
+	}
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args)
+	{
+		// TODO Auto-generated method stub
+
+	}
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/LoadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/LoadVerbHandler.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/LoadVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/LoadVerbHandler.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class LoadVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(LoadVerbHandler.class);    
+    
+    public void doVerb(Message message)
+    { 
+        try
+        {
+	        Object[] body = message.getMessageBody();
+	        RowMutationMessage rmMsg = (RowMutationMessage)body[0];
+	        RowMutation rm = rmMsg.getRowMutation();
+	
+			EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
+	
+			Message messageInternal = new Message(StorageService.getLocalStorageEndPoint(), 
+	                StorageService.mutationStage_,
+					StorageService.mutationVerbHandler_, 
+	                new Object[]{ rmMsg }
+	        );
+            
+            StringBuilder sb = new StringBuilder();
+			for(EndPoint endPoint : endpoints)
+			{                
+                sb.append(endPoint);
+				MessagingService.getMessagingInstance().sendOneWay(messageInternal, endPoint);
+			}
+            logger_.debug("Sent data to " + sb.toString());            
+        }        
+        catch ( Exception e )
+        {
+            logger_.debug(LogUtil.throwableToString(e));            
+        }        
+    }
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/Memtable.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/Memtable.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/Memtable.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Memtable implements MemtableMBean, Comparable<Memtable>
+{
+	private static Logger logger_ = Logger.getLogger( Memtable.class );
+    private static Map<String, ExecutorService> apartments_ = new HashMap<String, ExecutorService>();
+    public static final String flushKey_ = "FlushKey";
+    public static void shutdown()
+    {
+    	Set<String> names = apartments_.keySet();
+    	for (String name : names)
+    	{
+    		apartments_.get(name).shutdownNow();
+    	}
+    }
+
+    private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
+    private int thresholdCount_ = DatabaseDescriptor.getMemtableObjectCount()*1024*1024;
+    private AtomicInteger currentSize_ = new AtomicInteger(0);
+    private AtomicInteger currentObjectCount_ = new AtomicInteger(0);
+
+    /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
+    private String table_;
+    private String cfName_;
+    /* Creation time of this Memtable */
+    private long creationTime_;
+    private boolean isFrozen_ = false;
+    private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
+    /* Lock and Condition for notifying new clients about Memtable switches */
+    Lock lock_ = new ReentrantLock();
+    Condition condition_;
+
+    Memtable(String table, String cfName) throws IOException
+    {
+        if ( apartments_.get(cfName) == null )
+        {
+            apartments_.put(cfName, new DebuggableThreadPoolExecutor( 1,
+                    1,
+                    Integer.MAX_VALUE,
+                    TimeUnit.SECONDS,
+                    new LinkedBlockingQueue<Runnable>(),
+                    new ThreadFactoryImpl("FAST-MEMTABLE-POOL")
+                    ));
+        }
+
+        condition_ = lock_.newCondition();
+        table_ = table;
+        cfName_ = cfName;
+        creationTime_ = System.currentTimeMillis();
+    }
+
+    class Putter implements Runnable
+    {
+        private String key_;
+        private ColumnFamily columnFamily_;
+
+        Putter(String key, ColumnFamily cf)
+        {
+            key_ = key;
+            columnFamily_ = cf;
+        }
+
+        public void run()
+        {
+        	resolve(key_, columnFamily_);
+        }
+    }
+
+    class Getter implements Callable<ColumnFamily>
+    {
+        private String key_;
+        private String columnFamilyName_;
+        private IFilter filter_;
+
+        Getter(String key, String cfName)
+        {
+            key_ = key;
+            columnFamilyName_ = cfName;
+        }
+        
+        Getter(String key, String cfName, IFilter filter)
+        {
+            this(key, cfName);
+            filter_ = filter;
+        }
+
+        public ColumnFamily call()
+        {
+        	ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);            
+            return cf;
+        }
+    }
+
+    class Remover implements Runnable
+    {
+        private String key_;
+        private ColumnFamily columnFamily_;
+
+        Remover(String key, ColumnFamily columnFamily)
+        {
+            key_ = key;
+            columnFamily_ = columnFamily;
+        }
+
+        public void run()
+        {
+        	columnFamily_.delete();
+            columnFamilies_.put(key_, columnFamily_);
+        }
+    }
+
+    /**
+     * Compares two Memtable based on creation time. 
+     * @param rhs
+     * @return
+     */
+    public int compareTo(Memtable rhs)
+    {
+    	long diff = creationTime_ - rhs.creationTime_;
+    	if ( diff > 0 )
+    		return 1;
+    	else if ( diff < 0 )
+    		return -1;
+    	else
+    		return 0;
+    }
+
+    public int getMemtableThreshold()
+    {
+        return currentSize_.get();
+    }
+
+    void resolveSize(int oldSize, int newSize)
+    {
+        currentSize_.addAndGet(newSize - oldSize);
+    }
+
+    void resolveCount(int oldCount, int newCount)
+    {
+        currentObjectCount_.addAndGet(newCount - oldCount);
+    }
+
+    private boolean isLifetimeViolated()
+    {
+      /* Memtable lifetime in terms of milliseconds */
+      long lifetimeInMillis = DatabaseDescriptor.getMemtableLifetime() * 3600 * 1000;
+      return ( ( System.currentTimeMillis() - creationTime_ ) >= lifetimeInMillis );
+    }
+
+    boolean isThresholdViolated(String key)
+    {
+    	boolean bVal = false;//isLifetimeViolated();
+        if (currentSize_.get() >= threshold_ ||  currentObjectCount_.get() >= thresholdCount_ || bVal || key.equals(flushKey_))
+        {
+        	if ( bVal )
+        		logger_.info("Memtable's lifetime for " + cfName_ + " has been violated.");
+        	return true;
+        }
+        return false;
+    }
+
+    String getColumnFamily()
+    {
+    	return cfName_;
+    }
+
+    void printExecutorStats()
+    {
+    	DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)apartments_.get(cfName_);
+    	long taskCount = (es.getTaskCount() - es.getCompletedTaskCount());
+    	logger_.debug("MEMTABLE TASKS : " + taskCount);
+    }
+
+    /*
+     * This version is used by the external clients to put data into
+     * the memtable. This version will respect the threshold and flush
+     * the memtable to disk when the size exceeds the threshold.
+    */
+    void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
+    {
+        if (isThresholdViolated(key) )
+        {
+            lock_.lock();
+            try
+            {
+                ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+                if (!isFrozen_)
+                {
+                    isFrozen_ = true;
+                    MemtableManager.instance().submit(cfStore.getColumnFamilyName(), this, cLogCtx);
+                    cfStore.switchMemtable(key, columnFamily, cLogCtx);
+                }
+                else
+                {
+                    cfStore.apply(key, columnFamily, cLogCtx);
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        else
+        {
+        	printExecutorStats();
+        	Runnable putter = new Putter(key, columnFamily);
+        	apartments_.get(cfName_).submit(putter);
+        }
+    }
+
+    /*
+     * This version is used to switch memtable and force flush.
+    */
+    void forceflush(ColumnFamilyStore cfStore, boolean fRecovery) throws IOException
+    {
+        if(!fRecovery)
+        {
+	    	RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
+	        try
+	        {
+	            rm.add(cfStore.columnFamily_ + ":Column","0".getBytes());
+	            rm.apply();
+	        }
+	        catch(ColumnFamilyNotDefinedException ex)
+	        {
+	            logger_.debug(LogUtil.throwableToString(ex));
+	        }
+        }
+        else
+        {
+        	flush(CommitLog.CommitLogContext.NULL);
+        }
+    }
+
+
+
+    private void resolve(String key, ColumnFamily columnFamily)
+    {
+    	ColumnFamily oldCf = columnFamilies_.get(key);
+        if ( oldCf != null )
+        {
+            int oldSize = oldCf.size();
+            int oldObjectCount = oldCf.getColumnCount();
+            oldCf.addColumns(columnFamily);
+            int newSize = oldCf.size();
+            int newObjectCount = oldCf.getColumnCount();
+            resolveSize(oldSize, newSize);
+            resolveCount(oldObjectCount, newObjectCount);
+        }
+        else
+        {
+            columnFamilies_.put(key, columnFamily);
+            currentSize_.addAndGet(columnFamily.size() + key.length());
+            currentObjectCount_.addAndGet(columnFamily.getColumnCount());
+        }
+    }
+
+    /*
+     * This version is called on commit log recovery. The threshold
+     * is not respected and a forceFlush() needs to be invoked to flush
+     * the contents to disk.
+    */
+    void putOnRecovery(String key, ColumnFamily columnFamily) throws IOException
+    {
+        if(!key.equals(Memtable.flushKey_))
+        	resolve(key, columnFamily);
+    }
+
+    
+    ColumnFamily getLocalCopy(String key, String cfName, IFilter filter)
+    {
+    	String[] values = RowMutation.getColumnAndColumnFamily(cfName);
+    	ColumnFamily columnFamily = null;
+        if(values.length == 1 )
+        {
+        	columnFamily = columnFamilies_.get(key);        	
+        }
+        else
+        {
+        	ColumnFamily cFamily = columnFamilies_.get(key);
+        	if(cFamily == null)
+        		return null;
+        	IColumn column = null;
+        	if(values.length == 2)
+        	{
+        		column = cFamily.getColumn(values[1]);
+        		if(column != null )
+        		{
+        			columnFamily = new ColumnFamily(cfName_);
+        			columnFamily.addColumn(column.name(), column);
+        		}
+        	}
+        	else
+        	{
+        		column = cFamily.getColumn(values[1]);
+        		if(column != null )
+        		{
+        			 
+        			IColumn subColumn = ((SuperColumn)column).getSubColumn(values[2]);
+        			if(subColumn != null)
+        			{
+	        			columnFamily = new ColumnFamily(cfName_);
+	            		columnFamily.createColumn(values[1] + ":" + values[2], subColumn.value(), subColumn.timestamp());
+        			}
+        		}
+        	}
+        }
+        /* Filter unnecessary data from the column based on the provided filter */
+        return filter.filter(cfName, columnFamily);
+    }
+
+    ColumnFamily get(String key, String cfName)
+    {
+    	printExecutorStats();
+    	Callable<ColumnFamily> call = new Getter(key, cfName);
+    	ColumnFamily cf = null;
+    	try
+    	{
+    		cf = apartments_.get(cfName_).submit(call).get();
+    	}
+    	catch ( ExecutionException ex )
+    	{
+    		logger_.debug(LogUtil.throwableToString(ex));
+    	}
+    	catch ( InterruptedException ex2 )
+    	{
+    		logger_.debug(LogUtil.throwableToString(ex2));
+    	}
+    	return cf;
+    }
+    
+    ColumnFamily get(String key, String cfName, IFilter filter)
+    {
+    	printExecutorStats();
+    	Callable<ColumnFamily> call = new Getter(key, cfName, filter);
+    	ColumnFamily cf = null;
+    	try
+    	{
+    		cf = apartments_.get(cfName_).submit(call).get();
+    	}
+    	catch ( ExecutionException ex )
+    	{
+    		logger_.debug(LogUtil.throwableToString(ex));
+    	}
+    	catch ( InterruptedException ex2 )
+    	{
+    		logger_.debug(LogUtil.throwableToString(ex2));
+    	}
+    	return cf;
+    }
+
+    /*
+     * Although the method is named remove() we cannot remove the key
+     * from memtable. We add it to the memtable but mark it as deleted.
+     * The reason for this because we do not want a successive get()
+     * for the same key to scan the ColumnFamilyStore files for this key.
+    */
+    void remove(String key, ColumnFamily columnFamily) throws IOException
+    {
+    	printExecutorStats();
+    	Runnable deleter = new Remover(key, columnFamily);
+    	apartments_.get(cfName_).submit(deleter);
+    }
+
+    /*
+     * param recoveryMode - indicates if this was invoked during
+     *                      recovery.
+    */
+    void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
+    {
+        ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+        if ( columnFamilies_.size() == 0 )
+        {
+        	// This should be called even if size is 0
+        	// This is because we should try to delete the useless commitlogs
+        	// even though there is nothing to flush in memtables for a given family like Hints etc.
+            cfStore.onMemtableFlush(cLogCtx);
+            return;
+        }
+
+        PartitionerType pType = StorageService.getPartitionerType();
+        String directory = DatabaseDescriptor.getDataFileLocation();
+        String filename = cfStore.getNextFileName();
+        SSTable ssTable = new SSTable(directory, filename, pType);
+        switch (pType) 
+        {
+            case OPHF:
+                flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
+                break;
+                
+            default:
+                flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
+                break;
+        }
+        
+        columnFamilies_.clear();        
+    }
+    
+    private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
+    {
+        /* List of primary keys in sorted order */
+        List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies_.keySet() );
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        /* Use this BloomFilter to decide if a key exists in a SSTable */
+        BloomFilter bf = new BloomFilter(pKeys.size(), 15);
+        for ( PrimaryKey pKey : pKeys )
+        {
+            buffer.reset();
+            ColumnFamily columnFamily = columnFamilies_.get(pKey.key());
+            if ( columnFamily != null )
+            {
+                /* serialize the cf with column indexes */
+                ColumnFamily.serializer2().serialize( columnFamily, buffer );
+                /* Now write the key and value to disk */
+                ssTable.append(pKey.key(), pKey.hash(), buffer);
+                bf.fill(pKey.key());
+                columnFamily.clear();
+            }
+        }
+        ssTable.close(bf);
+        cfStore.onMemtableFlush(cLogCtx);
+        cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+        buffer.close();
+    }
+    
+    private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
+    {
+        List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
+        Collections.sort(keys);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        /* Use this BloomFilter to decide if a key exists in a SSTable */
+        BloomFilter bf = new BloomFilter(keys.size(), 15);
+        for ( String key : keys )
+        {
+            buffer.reset();
+            ColumnFamily columnFamily = columnFamilies_.get(key);
+            if ( columnFamily != null )
+            {
+                /* serialize the cf with column indexes */
+                ColumnFamily.serializer2().serialize( columnFamily, buffer );
+                /* Now write the key and value to disk */
+                ssTable.append(key, buffer);
+                bf.fill(key);
+                columnFamily.clear();
+            }
+        }
+        ssTable.close(bf);
+        cfStore.onMemtableFlush(cLogCtx);
+        cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+        buffer.close();
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/MemtableMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/MemtableMBean.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/MemtableMBean.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/MemtableMBean.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+
+public interface MemtableMBean
+{
+    public int getMemtableThreshold();
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/MemtableManager.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/MemtableManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/MemtableManager.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MemtableManager
+{
+    private static MemtableManager instance_;
+    private static Lock lock_ = new ReentrantLock();
+    private static Logger logger_ = Logger.getLogger(MemtableManager.class);
+    private ReentrantReadWriteLock rwLock_ = new ReentrantReadWriteLock(true);
+    static MemtableManager instance() 
+    {
+        if ( instance_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                    instance_ = new MemtableManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return instance_;
+    }
+    
+    class MemtableFlusher implements Runnable
+    {
+        private Memtable memtable_;
+        private CommitLog.CommitLogContext cLogCtx_;
+        
+        MemtableFlusher(Memtable memtable, CommitLog.CommitLogContext cLogCtx)
+        {
+            memtable_ = memtable;
+            cLogCtx_ = cLogCtx;
+        }
+        
+        public void run()
+        {
+            try
+            {
+            	memtable_.flush(cLogCtx_);
+            }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
+        	rwLock_.writeLock().lock();
+            try
+            {
+            	List<Memtable> memtables = history_.get(memtable_.getColumnFamily());
+                memtables.remove(memtable_);                	
+            }
+        	finally
+        	{
+            	rwLock_.writeLock().unlock();
+        	}
+        }
+    }
+    
+    private Map<String, List<Memtable>> history_ = new HashMap<String, List<Memtable>>();
+    private ExecutorService flusher_ = new DebuggableThreadPoolExecutor( 1,
+            1,
+            Integer.MAX_VALUE,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL")
+            );  
+    
+    /* Submit memtables to be flushed to disk */
+    void submit(String cfName, Memtable memtbl, CommitLog.CommitLogContext cLogCtx)
+    {
+    	rwLock_.writeLock().lock();
+    	try
+    	{
+	        List<Memtable> memtables = history_.get(cfName);
+	        if ( memtables == null )
+	        {
+	            memtables = new ArrayList<Memtable>();
+	            history_.put(cfName, memtables);
+	        }
+	        memtables.add(memtbl);	        
+	        flusher_.submit( new MemtableFlusher(memtbl, cLogCtx) );
+    	}
+    	finally
+    	{
+        	rwLock_.writeLock().unlock();
+    	}
+    }
+    
+
+    /*
+     * Retrieve column family from the list of Memtables that have been
+     * submitted for flush but have not yet been flushed.
+     * It also filters out unneccesary columns based on the passed in filter.
+    */
+    void getColumnFamily(String key, String cfName, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
+    {
+    	rwLock_.readLock().lock();
+    	try
+    	{
+	        /* Get all memtables associated with this column family */
+	        List<Memtable> memtables = history_.get(cfName);
+	        if ( memtables != null )
+	        {
+		        Collections.sort(memtables);
+	        	int size = memtables.size();
+	            for ( int i = size - 1; i >= 0; --i  )
+	            {
+	                ColumnFamily columnFamily = memtables.get(i).getLocalCopy(key, cf, filter);
+	                if ( columnFamily != null )
+	                {
+	                    columnFamilies.add(columnFamily);
+	                    if( filter.isDone())
+	                    	break;
+	                }
+	            }
+	        }        
+    	}
+    	finally
+    	{
+        	rwLock_.readLock().unlock();
+    	}
+    }
+
+
+
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/MinorCompactionManager.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/MinorCompactionManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/MinorCompactionManager.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.db.HintedHandOffManager.HintedHandOff;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MinorCompactionManager implements IComponentShutdown
+{
+    private static MinorCompactionManager instance_;
+    private static Lock lock_ = new ReentrantLock();
+    private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
+    final static long intervalInMins_ = 5;
+
+    public static MinorCompactionManager instance()
+    {
+        if ( instance_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                    instance_ = new MinorCompactionManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return instance_;
+    }
+
+    class FileCompactor implements Runnable
+    {
+        private ColumnFamilyStore columnFamilyStore_;
+
+        FileCompactor(ColumnFamilyStore columnFamilyStore)
+        {
+        	columnFamilyStore_ = columnFamilyStore;
+        }
+
+        public void run()
+        {
+            try
+            {
+                logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+            	columnFamilyStore_.doCompaction();
+                logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+            }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
+            catch (Throwable th)
+            {
+                logger_.error( LogUtil.throwableToString(th) );
+            }
+        }
+    }
+
+    class FileCompactor2 implements Callable<Boolean>
+    {
+        private ColumnFamilyStore columnFamilyStore_;
+        private List<Range> ranges_;
+        private EndPoint target_;
+        private List<String> fileList_;
+
+        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges)
+        {
+            columnFamilyStore_ = columnFamilyStore;
+            ranges_ = ranges;
+        }
+        
+        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target,List<String> fileList)
+        {
+            columnFamilyStore_ = columnFamilyStore;
+            ranges_ = ranges;
+            target_ = target;
+            fileList_ = fileList;
+        }
+
+        public Boolean call()
+        {
+        	boolean result = true;
+            try
+            {
+                logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+                result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+                logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+            }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
+            return result;
+        }
+    }
+
+    class OnDemandCompactor implements Runnable
+    {
+        private ColumnFamilyStore columnFamilyStore_;
+        private long skip_ = 0L;
+
+        OnDemandCompactor(ColumnFamilyStore columnFamilyStore, long skip)
+        {
+            columnFamilyStore_ = columnFamilyStore;
+            skip_ = skip;
+        }
+
+        public void run()
+        {
+            try
+            {
+                logger_.debug("Started  Major compaction ..."+columnFamilyStore_.columnFamily_);
+                columnFamilyStore_.doMajorCompaction(skip_);
+                logger_.debug("Finished Major compaction ..."+columnFamilyStore_.columnFamily_);
+            }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
+            return ;
+        }
+    }
+
+    class CleanupCompactor implements Runnable
+    {
+        private ColumnFamilyStore columnFamilyStore_;
+
+        CleanupCompactor(ColumnFamilyStore columnFamilyStore)
+        {
+        	columnFamilyStore_ = columnFamilyStore;
+        }
+
+        public void run()
+        {
+            try
+            {
+                logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+            	columnFamilyStore_.doCleanupCompaction();
+                logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+            }
+            catch (IOException e)
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
+            catch (Throwable th)
+            {
+                logger_.error( LogUtil.throwableToString(th) );
+            }
+        }
+    }
+    
+    
+    private ScheduledExecutorService compactor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("MINOR-COMPACTION-POOL"));
+
+    public MinorCompactionManager()
+    {
+    	StorageService.instance().registerComponentForShutdown(this);
+	}
+
+    public void shutdown()
+    {
+    	compactor_.shutdownNow();
+    }
+
+    public void submitPeriodicCompaction(ColumnFamilyStore columnFamilyStore)
+    {        
+    	compactor_.scheduleWithFixedDelay(new FileCompactor(columnFamilyStore), MinorCompactionManager.intervalInMins_,
+    			MinorCompactionManager.intervalInMins_, TimeUnit.MINUTES);       
+    }
+
+    public void submit(ColumnFamilyStore columnFamilyStore)
+    {
+        compactor_.submit(new FileCompactor(columnFamilyStore));
+    }
+    
+    public void submitCleanup(ColumnFamilyStore columnFamilyStore)
+    {
+        compactor_.submit(new CleanupCompactor(columnFamilyStore));
+    }
+
+    public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target, List<String> fileList)
+    {
+        return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target, fileList) );
+    } 
+
+    public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges)
+    {
+        return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges) );
+    }
+
+    public void  submitMajor(ColumnFamilyStore columnFamilyStore, List<Range> ranges, long skip)
+    {
+        compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/NamesFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/NamesFilter.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/NamesFilter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/NamesFilter.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+
+
+
+public class NamesFilter implements IFilter
+{
+    /* list of column names to filter against. */
+    private List<String> names_ = new ArrayList<String>();  
+    
+    NamesFilter(List<String> names)
+    {
+        names_ = names;     
+    }
+    
+    public ColumnFamily filter(String cf, ColumnFamily columnFamily)
+    {
+        if ( columnFamily == null )
+        {
+            return columnFamily;
+        }
+    	String[] values = RowMutation.getColumnAndColumnFamily(cf);
+		String cfName = columnFamily.name();
+		ColumnFamily filteredCf = new ColumnFamily(cfName);
+		if( values.length == 1 )
+		{
+			Collection<IColumn> columns = columnFamily.getAllColumns();
+			for(IColumn column : columns)
+			{
+		        if ( names_.contains(column.name()) )
+		        {
+		            names_.remove(column.name());            
+					filteredCf.addColumn(column.name(), column);
+		        }
+				if( isDone() )
+				{
+					return filteredCf;
+				}
+			}
+		}
+		else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super") )
+		{
+    		Collection<IColumn> columns = columnFamily.getAllColumns();
+    		for(IColumn column : columns)
+    		{
+    			SuperColumn superColumn = (SuperColumn)column;
+    			SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+				filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
+        		Collection<IColumn> subColumns = superColumn.getSubColumns();
+        		for(IColumn subColumn : subColumns)
+        		{
+    		        if ( names_.contains(subColumn.name()) )
+    		        {
+    		            names_.remove(subColumn.name());            
+    		            filteredSuperColumn.addColumn(subColumn.name(), subColumn);
+    		        }
+    				if( isDone() )
+    				{
+    					return filteredCf;
+    				}
+    			}
+    		}
+		}
+    	else 
+    	{
+    		throw new UnsupportedOperationException();
+    	}
+		return filteredCf;
+    }
+    
+    public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+    {       
+        String columnName = column.name();
+        if ( names_.contains(columnName) )
+        {
+            names_.remove(columnName);            
+        }
+        else
+        {
+            column = null;
+        }
+        
+        return column;
+    }
+    
+    public boolean isDone()
+    {
+        return names_.isEmpty();
+    }
+
+    public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+    {
+    	return ssTable.next(key, cf, names_);
+    }
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/PrimaryKey.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/PrimaryKey.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/PrimaryKey.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/PrimaryKey.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+public class PrimaryKey implements Comparable<PrimaryKey>
+{   
+    public static List<PrimaryKey> create(Set<String> keys)
+    {
+        List<PrimaryKey> list = new ArrayList<PrimaryKey>();
+        for ( String key : keys )
+        {
+            list.add( new PrimaryKey(key) );
+        }
+        Collections.sort(list);
+        return list;
+    }
+    
+    /* MD5 hash of the key_ */
+    private BigInteger hash_;
+    /* Key used by the application */
+    private String key_;
+    
+    PrimaryKey(String key)
+    {
+        PartitionerType pType = StorageService.getPartitionerType();
+        switch (pType)
+        {
+            case RANDOM:
+                hash_ = FBUtilities.hash(key);
+                break;
+            
+            case OPHF:
+                break;
+                
+            default:
+                hash_ = hash_ = FBUtilities.hash(key);
+                break;
+        }        
+        key_ = key;
+    }
+    
+    PrimaryKey(String key, BigInteger hash)
+    {
+        hash_ = hash;
+        key_ = key;
+    }
+    
+    public String key()
+    {
+        return key_;
+    }
+    
+    public BigInteger hash()
+    {
+        return hash_;
+    }
+    
+    /**
+     * This performs semantic comparison of Primary Keys.
+     * If the partition algorithm chosen is "Random" then
+     * the hash of the key is used for comparison. If it 
+     * is an OPHF then the key is used.
+     * 
+     * @param rhs primary against which we wish to compare.
+     * @return
+     */
+    public int compareTo(PrimaryKey rhs)
+    {
+        int value = 0;
+        PartitionerType pType = StorageService.getPartitionerType();
+        switch (pType)
+        {
+            case RANDOM:
+                value = hash_.compareTo(rhs.hash_);
+                break;
+            
+            case OPHF:
+                value = key_.compareTo(rhs.key_);
+                break;
+                
+            default:
+                value = hash_.compareTo(rhs.hash_);
+                break;
+        }        
+        return value;
+    }
+    
+    @Override
+    public String toString()
+    {
+        return (hash_ != null) ? (key_ + ":" + hash_) : key_;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/ReadMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ReadMessage.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ReadMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ReadMessage.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadMessage implements Serializable
+{
+    private static ICompactSerializer<ReadMessage> serializer_;	
+	
+    static
+    {
+        serializer_ = new ReadMessageSerializer();
+    }
+
+    static ICompactSerializer<ReadMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    public static Message makeReadMessage(ReadMessage readMessage) throws IOException
+    {
+    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        ReadMessage.serializer().serialize(readMessage, dos);
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.readVerbHandler_, new Object[]{bos.toByteArray()});         
+        return message;
+    }
+    
+    @XmlElement(name="Table")
+    private String table_;
+    
+    @XmlElement(name="Key")
+    private String key_;
+    
+    @XmlElement(name="ColumnFamily")
+    private String columnFamily_column_ = null;
+    
+    @XmlElement(name="start")
+    private int start_ = -1;
+
+    @XmlElement(name="count")
+    private int count_ = -1 ;
+    
+    @XmlElement(name="sinceTimestamp")
+    private long sinceTimestamp_ = -1 ;
+
+    @XmlElement(name="columnNames")
+    private List<String> columns_ = new ArrayList<String>();
+    
+    @XmlElement(name="isDigestQuery")
+    private boolean isDigestQuery_ = false;
+        
+    private ReadMessage()
+    {
+    }
+    
+    public ReadMessage(String table, String key)
+    {
+        table_ = table;
+        key_ = key;
+    }
+
+    public ReadMessage(String table, String key, String columnFamily_column)
+    {
+        table_ = table;
+        key_ = key;
+        columnFamily_column_ = columnFamily_column;
+    }
+    
+    public ReadMessage(String table, String key, String columnFamily, List<String> columns)
+    {
+    	table_ = table;
+    	key_ = key;
+    	columnFamily_column_ = columnFamily;
+    	columns_ = columns;
+    }
+    
+    public ReadMessage(String table, String key, String columnFamily_column, int start, int count)
+    {
+        table_ = table;
+        key_ = key;
+        columnFamily_column_ = columnFamily_column;
+        start_ = start ;
+        count_ = count;
+    }
+
+    public ReadMessage(String table, String key, String columnFamily_column, long sinceTimestamp)
+    {
+        table_ = table;
+        key_ = key;
+        columnFamily_column_ = columnFamily_column;
+        sinceTimestamp_ = sinceTimestamp ;
+    }
+
+    String table()
+    {
+        return table_;
+    }
+    
+    String key()
+    {
+        return key_;
+    }
+
+    String columnFamily_column()
+    {
+        return columnFamily_column_;
+    }
+
+    int start()
+    {
+        return start_;
+    }
+
+    int count()
+    {
+        return count_;
+    }
+
+    long sinceTimestamp()
+    {
+        return sinceTimestamp_;
+    }
+
+    public boolean isDigestQuery()
+    {
+    	return isDigestQuery_;
+    }
+    
+    public void setIsDigestQuery(boolean isDigestQuery)
+    {
+    	isDigestQuery_ = isDigestQuery;
+    }
+    
+    public List<String> getColumnNames()
+    {
+    	return columns_;
+    }
+}
+
+class ReadMessageSerializer implements ICompactSerializer<ReadMessage>
+{
+	public void serialize(ReadMessage rm, DataOutputStream dos) throws IOException
+	{
+		dos.writeUTF(rm.table());
+		dos.writeUTF(rm.key());
+		dos.writeUTF(rm.columnFamily_column());
+		dos.writeInt(rm.start());
+		dos.writeInt(rm.count());
+		dos.writeLong(rm.sinceTimestamp());
+		dos.writeBoolean(rm.isDigestQuery());
+		List<String> columns = rm.getColumnNames();
+		dos.writeInt(columns.size());
+		if ( columns.size() > 0 )
+		{
+			for ( String column : columns )
+			{
+				dos.writeInt(column.getBytes().length);
+				dos.write(column.getBytes());
+			}
+		}
+	}
+	
+    public ReadMessage deserialize(DataInputStream dis) throws IOException
+    {
+		String table = dis.readUTF();
+		String key = dis.readUTF();
+		String columnFamily_column = dis.readUTF();
+		int start = dis.readInt();
+		int count = dis.readInt();
+		long sinceTimestamp = dis.readLong();
+		boolean isDigest = dis.readBoolean();
+		
+		int size = dis.readInt();
+		List<String> columns = new ArrayList<String>();		
+		for ( int i = 0; i < size; ++i )
+		{
+			byte[] bytes = new byte[dis.readInt()];
+			dis.readFully(bytes);
+			columns.add( new String(bytes) );
+		}
+		ReadMessage rm = null;
+		if ( columns.size() > 0 )
+		{
+			rm = new ReadMessage(table, key, columnFamily_column, columns);
+		}
+		if( sinceTimestamp > 0 )
+		{
+			rm = new ReadMessage(table, key, columnFamily_column, sinceTimestamp);
+		}
+		else
+		{
+			rm = new ReadMessage(table, key, columnFamily_column, start, count);
+		}
+		rm.setIsDigestQuery(isDigest);
+    	return rm;
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ReadRepairVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ReadRepairVerbHandler.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadRepairVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(ReadRepairVerbHandler.class);    
+    
+    public void doVerb(Message message)
+    {          
+        byte[] body = (byte[])message.getMessageBody()[0];
+        DataInputBuffer buffer = new DataInputBuffer();
+        buffer.reset(body, body.length);        
+        
+        try
+        {
+            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+            RowMutation rm = rmMsg.getRowMutation();
+            rm.apply();                                   
+        }
+        catch( ColumnFamilyNotDefinedException ex )
+        {
+            logger_.debug(LogUtil.throwableToString(ex));
+        }
+        catch ( IOException e )
+        {
+            logger_.debug(LogUtil.throwableToString(e));            
+        }        
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/ReadResponseMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ReadResponseMessage.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ReadResponseMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ReadResponseMessage.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+
+/*
+ * The read response message is sent by the server when reading data 
+ * this encapsulates the tablename and teh row that has been read.
+ * The table name is needed so that we can use it to create repairs.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class ReadResponseMessage implements Serializable 
+{
+private static ICompactSerializer<ReadResponseMessage> serializer_;	
+	
+    static
+    {
+        serializer_ = new ReadResponseMessageSerializer();
+    }
+
+    public static ICompactSerializer<ReadResponseMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+	public static Message makeReadResponseMessage(ReadResponseMessage readResponseMessage) throws IOException
+    {
+    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        ReadResponseMessage.serializer().serialize(readResponseMessage, dos);
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});         
+        return message;
+    }
+	
+	@XmlElement(name = "Table")
+	private String table_;
+
+	@XmlElement(name = "Row")
+	private Row row_;
+
+	@XmlElement(name = "Digest")
+	private byte[] digest_ = new byte[0];
+
+    @XmlElement(name="isDigestQuery")
+    private boolean isDigestQuery_ = false;
+	
+	private ReadResponseMessage() {
+	}
+
+	public ReadResponseMessage(String table, byte[] digest ) {
+		table_ = table;
+		digest_= digest;
+	}
+
+	public ReadResponseMessage(String table, Row row) {
+		table_ = table;
+		row_ = row;
+	}
+
+	public String table() {
+		return table_;
+	}
+
+	public Row row() 
+    {
+		return row_;
+    }
+        
+	public byte[] digest() {
+		return digest_;
+	}
+
+	public boolean isDigestQuery()
+    {
+    	return isDigestQuery_;
+    }
+    
+    public void setIsDigestQuery(boolean isDigestQuery)
+    {
+    	isDigestQuery_ = isDigestQuery;
+    }
+}
+
+
+class ReadResponseMessageSerializer implements ICompactSerializer<ReadResponseMessage>
+{
+	public void serialize(ReadResponseMessage rm, DataOutputStream dos) throws IOException
+	{
+		dos.writeUTF(rm.table());
+        dos.writeInt(rm.digest().length);
+        dos.write(rm.digest());
+        dos.writeBoolean(rm.isDigestQuery());
+        
+        if( !rm.isDigestQuery() && rm.row() != null )
+        {            
+            Row.serializer().serialize(rm.row(), dos);
+        }				
+	}
+	
+    public ReadResponseMessage deserialize(DataInputStream dis) throws IOException
+    {
+    	String table = dis.readUTF();
+        int digestSize = dis.readInt();
+        byte[] digest = new byte[digestSize];
+        dis.read(digest, 0 , digestSize);
+        boolean isDigest = dis.readBoolean();
+        
+        Row row = null;
+        if ( !isDigest )
+        {
+            row = Row.serializer().deserialize(dis);
+        }
+		
+		ReadResponseMessage rmsg = null;
+    	if( isDigest  )
+        {
+    		rmsg =  new ReadResponseMessage(table, digest);
+        }
+    	else
+        {
+    		rmsg =  new ReadResponseMessage(table, row);
+        }
+        rmsg.setIsDigestQuery(isDigest);
+    	return rmsg;
+    } 
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ReadVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ReadVerbHandler.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ReadVerbHandler implements IVerbHandler
+{
+    private static class ReadContext
+    {
+        protected DataInputBuffer bufIn_ = new DataInputBuffer();
+        protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
+    }
+
+    private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
+    /* We use this so that we can reuse the same row mutation context for the mutation. */
+    private static ThreadLocal<ReadContext> tls_ = new InheritableThreadLocal<ReadContext>();
+
+    public void doVerb(Message message)
+    {
+        byte[] body = (byte[])message.getMessageBody()[0];
+        /* Obtain a Read Context from TLS */
+        ReadContext readCtx = tls_.get();
+        if ( readCtx == null )
+        {
+            readCtx = new ReadContext();
+            tls_.set(readCtx);
+        }
+        readCtx.bufIn_.reset(body, body.length);
+
+        try
+        {
+            ReadMessage readMessage = ReadMessage.serializer().deserialize(readCtx.bufIn_);
+            Table table = Table.open(readMessage.table());
+            Row row = null;
+            long start = System.currentTimeMillis();
+            if( readMessage.columnFamily_column() == null )
+            	row = table.get(readMessage.key());
+            else
+            {
+            	if(readMessage.getColumnNames().size() == 0)
+            	{
+	            	if(readMessage.count() > 0 && readMessage.start() >= 0)
+	            		row = table.getRow(readMessage.key(), readMessage.columnFamily_column(), readMessage.start(), readMessage.count());
+	            	else
+	            		row = table.getRow(readMessage.key(), readMessage.columnFamily_column());
+            	}
+            	else
+            	{
+            		row = table.getRow(readMessage.key(), readMessage.columnFamily_column(), readMessage.getColumnNames());            		
+            	}
+            }              
+            logger_.info("getRow()  TIME: " + (System.currentTimeMillis() - start) + " ms.");
+            start = System.currentTimeMillis();
+            ReadResponseMessage readResponseMessage = null;
+            if(readMessage.isDigestQuery())
+            {
+                readResponseMessage = new ReadResponseMessage(table.getTableName(), row.digest());
+
+            }
+            else
+            {
+                readResponseMessage = new ReadResponseMessage(table.getTableName(), row);
+            }
+            readResponseMessage.setIsDigestQuery(readMessage.isDigestQuery());
+            /* serialize the ReadResponseMessage. */
+            readCtx.bufOut_.reset();
+
+            start = System.currentTimeMillis();
+            ReadResponseMessage.serializer().serialize(readResponseMessage, readCtx.bufOut_);
+            logger_.info("serialize  TIME: " + (System.currentTimeMillis() - start) + " ms.");
+
+            byte[] bytes = new byte[readCtx.bufOut_.getLength()];
+            start = System.currentTimeMillis();
+            System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
+            logger_.info("copy  TIME: " + (System.currentTimeMillis() - start) + " ms.");
+
+            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bytes} );
+            MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+            logger_.info("ReadVerbHandler  TIME 2: " + (System.currentTimeMillis() - start)
+                    + " ms.");
+        }
+        catch ( IOException ex)
+        {
+            logger_.info( LogUtil.throwableToString(ex) );
+        }
+        catch ( ColumnFamilyNotDefinedException ex)
+        {
+            logger_.info( LogUtil.throwableToString(ex) );
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/RecoveryManager.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/RecoveryManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/RecoveryManager.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RecoveryManager
+{
+    private static RecoveryManager instance_;
+    private static Logger logger_ = Logger.getLogger(RecoveryManager.class);
+    
+    synchronized static RecoveryManager instance() throws IOException
+    {
+        if ( instance_ == null )
+            instance_ = new RecoveryManager();
+        return instance_;
+    }
+
+    public static File[] getListofCommitLogs()
+    {
+        String directory = DatabaseDescriptor.getLogFileLocation();
+        File file = new File(directory);
+        File[] files = file.listFiles();
+        return files;
+    }
+    
+    public static Map<String, List<File>> getListOFCommitLogsPerTable()
+    {
+        File[] files = getListofCommitLogs();
+        /* Maintains a mapping of table name to a list of commit log files */
+        Map<String, List<File>> tableToCommitLogs = new HashMap<String, List<File>>();
+        
+        for (File f : files)
+        {
+            String table = CommitLog.getTableName(f.getName());
+            List<File> clogs = tableToCommitLogs.get(table);
+            if ( clogs == null )
+            {
+                clogs = new ArrayList<File>();
+                tableToCommitLogs.put(table, clogs);
+            }
+            clogs.add(f);
+        }
+        return tableToCommitLogs;
+    }
+    
+    public void doRecovery() throws IOException
+    {
+        File[] files = getListofCommitLogs();
+        Map<String, List<File>> tableToCommitLogs = getListOFCommitLogsPerTable();
+        recoverEachTable(tableToCommitLogs);
+        FileUtils.delete(files);
+    }
+    
+    private void recoverEachTable(Map<String, List<File>> tableToCommitLogs) throws IOException
+    {
+        Comparator<File> fCmp = new FileUtils.FileComparator();
+        Set<String> tables = tableToCommitLogs.keySet();
+        for ( String table : tables )
+        {
+            List<File> clogs = tableToCommitLogs.get(table);
+            Collections.sort(clogs, fCmp);
+            CommitLog clog = new CommitLog(table, true);
+            clog.recover(clogs);
+        }
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        DatabaseDescriptor.init();
+        long start = System.currentTimeMillis();
+        RecoveryManager rm = RecoveryManager.instance();
+        rm.doRecovery();  
+        logger_.debug( "Time taken : " + (System.currentTimeMillis() - start) + " ms.");
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/Row.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/Row.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/Row.java Mon Mar  2 06:04:20 2009
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class Row implements Serializable
+{
+    private static ICompactSerializer<Row> serializer_;
+	private static Logger logger_ = Logger.getLogger(Row.class);
+
+    static
+    {
+        serializer_ = new RowSerializer();
+    }
+
+    static ICompactSerializer<Row> serializer()
+    {
+        return serializer_;
+    }
+
+    private String key_;    
+    private Map<String, ColumnFamily> columnFamilies_ = new Hashtable<String, ColumnFamily>();
+    private transient AtomicInteger size_ = new AtomicInteger(0);
+
+    /* Ctor for JAXB */
+    protected Row()
+    {
+    }
+
+    public Row(String key)
+    {
+        key_ = key;        
+    }
+    
+    public String key()
+    {
+        return key_;
+    }
+    
+    void key(String key)
+    {
+        key_ = key;
+    }
+    
+    public ColumnFamily getColumnFamily(String cfName)
+    {
+        return columnFamilies_.get(cfName);
+    }
+
+    public Map<String, ColumnFamily> getColumnFamilies()
+    {
+        return columnFamilies_;
+    }
+
+    void addColumnFamily(ColumnFamily columnFamily)
+    {
+        columnFamilies_.put(columnFamily.name(), columnFamily);
+        size_.addAndGet(columnFamily.size());
+    }
+
+    void removeColumnFamily(ColumnFamily columnFamily)
+    {
+        columnFamilies_.remove(columnFamily.name());
+        int delta = (-1) * columnFamily.size();
+        size_.addAndGet(delta);
+    }
+
+    public int size()
+    {
+        return size_.get();
+    }
+
+    public boolean isEmpty()
+    {
+    	return ( columnFamilies_.size() == 0 );
+    }
+    
+    /**
+     * This is used as oldRow.merge(newRow). Basically we take the newRow
+     * and merge it into the oldRow.
+    */
+    void merge(Row row)
+    {
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        Set<String> cfNames = columnFamilies.keySet();
+
+        for ( String cfName : cfNames )
+        {
+            ColumnFamily cf = columnFamilies_.get(cfName);
+            if ( cf == null )
+                columnFamilies_.put(cfName, columnFamilies.get(cfName));
+            else
+            {
+                cf.merge(columnFamilies.get(cfName));
+            }
+        }
+    }
+
+    /**
+     * This function will repair the current row with the input row
+     * what that means is that if there are any differences between the 2 rows then
+     * this fn will make the current row take the latest changes .
+     */
+    public void repair(Row row)
+    {
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        Set<String> cfNames = columnFamilies.keySet();
+
+        for ( String cfName : cfNames )
+        {
+            ColumnFamily cf = columnFamilies_.get(cfName);
+            if ( cf == null )
+            {
+            	cf = new ColumnFamily(cfName);
+                columnFamilies_.put(cfName, cf);
+            }
+            cf.repair(columnFamilies.get(cfName));
+        }
+
+    }
+
+    /**
+     * This function will calculate the difference between 2 rows
+     * and return the resultant row. This assumes that the row that
+     * is being submitted is a super set of the current row so
+     * it only calculates additional
+     * difference and does not take care of what needs to be delted from the current row to make
+     * it same as the input row.
+     */
+    public Row diff(Row row)
+    {
+        Row rowDiff = new Row(key_);
+    	Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        Set<String> cfNames = columnFamilies.keySet();
+
+        for ( String cfName : cfNames )
+        {
+            ColumnFamily cf = columnFamilies_.get(cfName);
+            ColumnFamily cfDiff = null;
+            if ( cf == null )
+            	rowDiff.getColumnFamilies().put(cfName, columnFamilies.get(cfName));
+            else
+            {
+            	cfDiff = cf.diff(columnFamilies.get(cfName));
+            	if(cfDiff != null)
+            		rowDiff.getColumnFamilies().put(cfName, cfDiff);
+            }
+        }
+        if(rowDiff.getColumnFamilies().size() != 0)
+        	return rowDiff;
+        else
+        	return null;
+    }
+    
+    public Row cloneMe()
+    {
+    	Row row = new Row(key_);
+    	row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
+    	return row;
+    }
+
+    public byte[] digest()
+    {
+        long start = System.currentTimeMillis();
+    	Set<String> cfamilies = columnFamilies_.keySet();
+    	byte[] xorHash = new byte[0];
+    	byte[] tmpHash = new byte[0];
+    	for(String cFamily : cfamilies)
+    	{
+    		if(xorHash.length == 0)
+    		{
+    			xorHash = columnFamilies_.get(cFamily).digest();
+    		}
+    		else
+    		{
+    			tmpHash = columnFamilies_.get(cFamily).digest();
+    			xorHash = FBUtilities.xor(xorHash, tmpHash);
+    		}
+    	}
+        logger_.info("DIGEST TIME: " + (System.currentTimeMillis() - start)
+                + " ms.");
+    	return xorHash;
+    }
+    
+    void clear()
+    {        
+        columnFamilies_.clear();
+    }
+}
+
+class RowSerializer implements ICompactSerializer<Row>
+{
+    public void serialize(Row row, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(row.key());        
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        int size = columnFamilies.size();        
+        dos.writeInt(size);
+        
+        if ( size > 0 )
+        {   
+        	Set<String> cNames = columnFamilies.keySet();
+            for ( String cName : cNames )
+            {                
+                ColumnFamily.serializer().serialize(columnFamilies.get(cName), dos);
+            }
+        }
+    }
+
+    public Row deserialize(DataInputStream dis) throws IOException
+    {
+        String key = dis.readUTF();        
+        Row row = new Row(key);        
+        int size = dis.readInt();
+        
+        if ( size > 0 )
+        { 
+            for ( int i = 0; i < size; ++i )
+            {
+                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+                row.addColumnFamily(cf);
+            }
+        }
+        return row;
+    }
+}