You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [15/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutation implements Serializable
+{      
+	private static ICompactSerializer<RowMutation> serializer_;	
+	
+    static
+    {
+        serializer_ = new RowMutationSerializer();
+    }
+
+    static ICompactSerializer<RowMutation> serializer()
+    {
+        return serializer_;
+    }
+        
+    private String table_;
+    private String key_;       
+    protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();       
+    protected Map<String, ColumnFamily> deletions_ = new HashMap<String, ColumnFamily>();
+    
+    /* Ctor for JAXB */
+    private RowMutation()
+    {
+    }
+    
+    public RowMutation(String table, String key)
+    {
+        table_ = table;
+        key_ = key;
+    }
+    
+    public RowMutation(String table, Row row)
+    {
+        table_ = table;
+        key_ = row.key();
+        Map<String, ColumnFamily> cfSet = row.getColumnFamilies();
+        Set<String> keyset = cfSet.keySet();
+        for(String cfName : keyset)
+        {
+        	add(cfName, cfSet.get(cfName));
+        }
+    }
+
+    protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications, Map<String, ColumnFamily> deletions)
+    {
+    	table_ = table;
+    	key_ = key;
+    	modifications_ = modifications;
+    	deletions_ = deletions;
+    }
+    
+    public static String[] getColumnAndColumnFamily(String cf)
+    {
+        return cf.split(":");
+    }
+    
+    String table()
+    {
+        return table_;
+    }
+    
+    public String key()
+    {
+        return key_;
+    }
+    
+    void addHints(String hint) throws IOException, ColumnFamilyNotDefinedException
+    {        
+        String cfName = Table.hints_ + ":" + hint;
+        add(cfName, new byte[0]);
+    }
+    
+    /*
+     * Specify a column family name and the corresponding column
+     * family object. 
+     * param @ cf - column family name
+     * param @ columnFamily - the column family.
+    */
+    public void add(String cf, ColumnFamily columnFamily)
+    {       
+        modifications_.put(cf, columnFamily);
+    }
+    
+    /*
+     * Specify a column name and a corresponding value for
+     * the column. Column name is specified as <column family>:column.
+     * This will result in a ColumnFamily associated with
+     * <column family> as name and a Column with <column>
+     * as name.
+     * 
+     * param @ cf - column name as <column family>:<column>
+     * param @ value - value associated with the column
+    */
+    public void add(String cf, byte[] value) throws IOException, ColumnFamilyNotDefinedException
+    {
+        add(cf, value, 0);
+    }
+    
+    /*
+     * Specify a column name and a corresponding value for
+     * the column. Column name is specified as <column family>:column.
+     * This will result in a ColumnFamily associated with
+     * <column family> as name and a Column with <column>
+     * as name. The columan can be further broken up 
+     * as super column name : columnname  in case of super columns
+     * 
+     * param @ cf - column name as <column family>:<column>
+     * param @ value - value associated with the column
+     * param @ timestamp - ts associated with this data.
+    */
+    public void add(String cf, byte[] value, long timestamp)
+    {        
+        String[] values = RowMutation.getColumnAndColumnFamily(cf);
+       
+        if ( values.length == 0 || values.length == 1 || values.length > 3 )
+            throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
+        
+        ColumnFamily columnFamily = modifications_.get(values[0]);
+        if( values.length == 2 )
+        {
+            if ( columnFamily == null )
+            {
+            	columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Standard"));
+            }
+        	columnFamily.createColumn(values[1], value, timestamp);
+        }
+        if( values.length == 3 )
+        {
+            if ( columnFamily == null )
+            {
+            	columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Super"));
+            }
+        	columnFamily.createColumn(values[1]+ ":" + values[2], value, timestamp);
+        }
+        modifications_.put(values[0], columnFamily);
+    }
+    
+    /*
+     * Specify a column name to be deleted. Column name is
+     * specified as <column family>:column. This will result
+     * in a ColumnFamily associated with <column family> as
+     * name and perhaps Column with <column> as name being
+     * marked as deleted.
+     * TODO : Delete is NOT correct as we do not know 
+     * the CF type so we need to fix that.
+     * param @ cf - column name as <column family>:<column>     
+    */
+    public void delete(String cf)
+    {        
+        String[] values = RowMutation.getColumnAndColumnFamily(cf);
+        
+        if ( values.length == 0 || values.length > 3 )
+            throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
+     
+        ColumnFamily columnFamily = modifications_.get(values[0]);
+        if ( columnFamily == null )
+            columnFamily = new ColumnFamily(values[0]);
+        if(values.length == 2 )
+        {
+	        columnFamily.createColumn( values[1]);
+        }
+        if(values.length == 3 )
+        {
+	        columnFamily.createColumn( values[1] + ":" + values[2]);
+        }
+        deletions_.put(values[0], columnFamily);
+    }
+    
+    /* 
+     * This is equivalent to calling commit. Applies the changes to
+     * to the table that is obtained by calling Table.open().
+    */
+    public void apply() throws IOException, ColumnFamilyNotDefinedException
+    {  
+        Row row = new Row(key_);
+        Table table = Table.open(table_);
+        Set<String> cfNames = modifications_.keySet();
+        for (String cfName : cfNames )
+        {        
+            if ( !table.isValidColumnFamily(cfName) )
+                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+            row.addColumnFamily( modifications_.get(cfName) );            
+        }
+        table.apply(row);
+                
+        Set<String> cfNames2 = deletions_.keySet();
+        for (String cfName : cfNames2 )
+        {    
+            if ( !table.isValidColumnFamily(cfName) )
+                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+            row.addColumnFamily( deletions_.get(cfName) );        
+        }
+        if ( deletions_.size() > 0 )
+            table.delete(row);
+    }
+    
+    /* 
+     * This is equivalent to calling commit. Applies the changes to
+     * to the table that is obtained by calling Table.open().
+    */
+    void apply(Row row) throws IOException, ColumnFamilyNotDefinedException
+    {                
+        Table table = Table.open(table_);        
+        Set<String> cfNames = modifications_.keySet();
+        for (String cfName : cfNames )
+        {        
+            if ( !table.isValidColumnFamily(cfName) )
+                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+            row.addColumnFamily( modifications_.get(cfName) );            
+        }
+        table.apply(row);
+                
+        Set<String> cfNames2 = deletions_.keySet();
+        for (String cfName : cfNames2 )
+        {    
+            if ( !table.isValidColumnFamily(cfName) )
+                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+            row.addColumnFamily( deletions_.get(cfName) );        
+        }
+        if ( deletions_.size() > 0 )
+            table.delete(row);
+    }
+    
+    /* 
+     * This is equivalent to calling commit. Applies the changes to
+     * to the table that is obtained by calling Table.open().
+    */
+    void load(Row row) throws IOException, ColumnFamilyNotDefinedException
+    {                
+        Table table = Table.open(table_);        
+        Set<String> cfNames = modifications_.keySet();
+        for (String cfName : cfNames )
+        {        
+            if ( !table.isValidColumnFamily(cfName) )
+                throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+            row.addColumnFamily( modifications_.get(cfName) );            
+        }
+        table.load(row);
+    }  
+}
+
+class RowMutationSerializer implements ICompactSerializer<RowMutation>
+{
+	private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
+	{
+		int size = map.size();
+        dos.writeInt(size);
+        if ( size > 0 )
+        {   
+            Set<String> keys = map.keySet();
+            for( String key : keys )
+            {                
+            	dos.writeUTF(key);
+                ColumnFamily cf = map.get(key);
+                if ( cf != null )
+                {
+                    ColumnFamily.serializer().serialize(cf, dos);
+                }                
+            }
+        }
+	}
+	
+	public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+	{
+		dos.writeUTF(rm.table());
+		dos.writeUTF(rm.key());
+		
+		/* serialize the modifications_ in the mutation */
+        freezeTheMaps(rm.modifications_, dos);
+        
+        /* serialize the deletions_ in the mutation */
+        freezeTheMaps(rm.deletions_, dos);
+	}
+	
+	private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
+	{
+		Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
+        int size = dis.readInt();
+        for ( int i = 0; i < size; ++i )
+        {
+            String key = dis.readUTF();  
+            ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
+            map.put(key, cf);            
+        }
+        return map;
+	}
+	
+    public RowMutation deserialize(DataInputStream dis) throws IOException
+    {
+    	String table = dis.readUTF();
+    	String key = dis.readUTF();
+    	
+    	/* Defreeze the modifications_ map */
+    	Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
+    	
+    	/* Defreeze the deletions_ map */
+    	Map<String, ColumnFamily> deletions = defreezeTheMaps(dis);
+    	
+    	return new RowMutation(table, key, modifications, deletions);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationMessage.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutationMessage implements Serializable
+{   
+    public static final String hint_ = "HINT";
+    private static ICompactSerializer<RowMutationMessage> serializer_;	
+	
+    static
+    {
+        serializer_ = new RowMutationMessageSerializer();
+    }
+
+    static ICompactSerializer<RowMutationMessage> serializer()
+    {
+        return serializer_;
+    }
+
+    public static Message makeRowMutationMessage(RowMutationMessage rowMutationMessage) throws IOException
+    {         
+        return makeRowMutationMessage(rowMutationMessage, StorageService.mutationVerbHandler_);
+    }
+    
+    public static Message makeRowMutationMessage(RowMutationMessage rowMutationMessage, String verbHandlerName) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        RowMutationMessage.serializer().serialize(rowMutationMessage, dos);
+        EndPoint local = StorageService.getLocalStorageEndPoint();
+        EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostName(), 7000); 
+        Message message = new Message(from, StorageService.mutationStage_, verbHandlerName, new Object[]{bos.toByteArray()});         
+        return message;
+    }
+    
+    @XmlElement(name="RowMutation")
+    private RowMutation rowMutation_;
+    
+    private RowMutationMessage()
+    {}
+    
+    public RowMutationMessage(RowMutation rowMutation)
+    {
+        rowMutation_ = rowMutation;
+    }
+    
+   public RowMutation getRowMutation()
+   {
+       return rowMutation_;
+   }
+}
+
+class RowMutationMessageSerializer implements ICompactSerializer<RowMutationMessage>
+{
+	public void serialize(RowMutationMessage rm, DataOutputStream dos) throws IOException
+	{
+		RowMutation.serializer().serialize(rm.getRowMutation(), dos);
+	}
+	
+    public RowMutationMessage deserialize(DataInputStream dis) throws IOException
+    {
+    	RowMutation rm = RowMutation.serializer().deserialize(dis);
+    	return new RowMutationMessage(rm);
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.*;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class RowMutationVerbHandler implements IVerbHandler
+{
+    protected static class RowMutationContext
+    {
+        protected Row row_ = new Row();
+        protected DataInputBuffer buffer_ = new DataInputBuffer();
+    }
+    
+    private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);     
+    /* We use this so that we can reuse the same row mutation context for the mutation. */
+    private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
+    
+    public void doVerb(Message message)
+    {
+        /* For DEBUG only. Printing queue length */                         
+        logger_.info( "ROW MUTATION STAGE: " + StageManager.getStageTaskCount(StorageService.mutationStage_) );
+        /* END DEBUG */
+            
+        byte[] bytes = (byte[])message.getMessageBody()[0];
+        /* Obtain a Row Mutation Context from TLS */
+        RowMutationContext rowMutationCtx = tls_.get();
+        if ( rowMutationCtx == null )
+        {
+            rowMutationCtx = new RowMutationContext();
+            tls_.set(rowMutationCtx);
+        }
+                
+        rowMutationCtx.buffer_.reset(bytes, bytes.length);        
+        
+        try
+        {
+            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+            RowMutation rm = rmMsg.getRowMutation();
+            /* Check if there were any hints in this message */
+            byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);            
+            if ( hintedBytes != null && hintedBytes.length > 0 )
+            {
+            	EndPoint hint = EndPoint.fromBytes(hintedBytes);
+                /* add necessary hints to this mutation */
+                try
+                {
+                	RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
+                	hintedMutation.addHints(rm.key() + ":" + hint.getHost());
+                	hintedMutation.apply();
+                }
+                catch ( ColumnFamilyNotDefinedException ex )
+                {
+                    logger_.debug(LogUtil.throwableToString(ex));
+                }
+            }
+            
+            long start = System.currentTimeMillis(); 
+            
+            rowMutationCtx.row_.key(rm.key());
+            rm.apply(rowMutationCtx.row_);
+            
+            long end = System.currentTimeMillis();                       
+            logger_.info("ROW MUTATION APPLY: " + (end - start) + " ms.");
+            
+            /*WriteResponseMessage writeResponseMessage = new WriteResponseMessage(rm.table(), rm.key(), true);
+            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{writeResponseMessage} );
+            logger_.debug("Sending teh response to " +  message.getFrom() + " for key :" + rm.key());
+            MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());  */                    
+        }         
+        catch( ColumnFamilyNotDefinedException ex )
+        {
+            logger_.debug(LogUtil.throwableToString(ex));
+        }        
+        catch ( IOException e )
+        {
+            logger_.debug(LogUtil.throwableToString(e));            
+        }        
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/Scanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Scanner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Scanner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Scanner.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.IOException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+
+/**
+ * This class is used to loop through a retrieved column family
+ * to get all columns in Iterator style. Usage is as follows:
+ * Scanner scanner = new Scanner("table");
+ * scanner.fetchColumnfamily(key, "column-family");
+ * 
+ * while ( scanner.hasNext() )
+ * {
+ *     Column column = scanner.next();
+ *     // Do something with the column
+ * }
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Scanner implements IScanner<IColumn>
+{
+    /* Table over which we are scanning. */
+    private String table_; 
+    /* Iterator when iterating over the columns of a given key in a column family */
+    private Iterator<IColumn> columnIt_;
+        
+    public Scanner(String table)
+    {
+        table_ = table;
+    }
+    
+    /**
+     * Fetch the columns associated with this key for the specified column family.
+     * This method basically sets up an iterator internally and then provides an 
+     * iterator like interface to iterate over the columns.
+     * @param key key we are interested in.
+     * @param cf column family we are interested in.
+     * @throws IOException
+     * @throws ColumnFamilyNotDefinedException
+     */
+    public void fetch(String key, String cf) throws IOException, ColumnFamilyNotDefinedException
+    {        
+        if ( cf != null )
+        {
+            Table table = Table.open(table_);
+            ColumnFamily columnFamily = table.get(key, cf);
+            if ( columnFamily != null )
+            {
+                Collection<IColumn> columns = columnFamily.getAllColumns();            
+                columnIt_ = columns.iterator();
+            }
+        }
+    }        
+    
+    public boolean hasNext() throws IOException
+    {
+        return columnIt_.hasNext();
+    }
+    
+    public IColumn next()
+    {
+        return columnIt_.next();
+    }
+    
+    public void close() throws IOException
+    {
+        throw new UnsupportedOperationException("This operation is not supported in the Scanner");
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class SuperColumn implements IColumn, Serializable
+{
+	private static Logger logger_ = Logger.getLogger(SuperColumn.class);
+	private static ICompactSerializer2<IColumn> serializer_;
+	private final static String seperator_ = ":";
+
+    static
+    {
+        serializer_ = new SuperColumnSerializer();
+    }
+
+    static ICompactSerializer2<IColumn> serializer()
+    {
+        return serializer_;
+    }
+
+	private String name_;
+    private EfficientBidiMap columns_ = new EfficientBidiMap(ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP));
+	private AtomicBoolean isMarkedForDelete_ = new AtomicBoolean(false);
+    private AtomicInteger size_ = new AtomicInteger(0);
+
+    SuperColumn()
+    {
+    }
+
+    SuperColumn(String name)
+    {
+    	name_ = name;
+    }
+
+	public boolean isMarkedForDelete()
+	{
+		return isMarkedForDelete_.get();
+	}
+
+    public String name()
+    {
+    	return name_;
+    }
+
+    public Collection<IColumn> getSubColumns()
+    {
+    	return columns_.getSortedColumns();
+    }
+
+    public IColumn getSubColumn( String columnName )
+    {
+    	IColumn column = columns_.get(columnName);
+    	if ( column instanceof SuperColumn )
+    		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+		return column;
+    }
+
+    public int compareTo(IColumn superColumn)
+    {
+        return (name_.compareTo(superColumn.name()));
+    }
+
+
+    public int size()
+    {
+        /*
+         * return the size of the individual columns
+         * that make up the super column. This is an
+         * APPROXIMATION of the size used only from the
+         * Memtable.
+        */
+        return size_.get();
+    }
+
+    /**
+     * This returns the size of the super-column when serialized.
+     * @see org.apache.cassandra.db.IColumn#serializedSize()
+    */
+    public int serializedSize()
+    {
+        /*
+         * Size of a super-column is =
+         *   size of a name (UtfPrefix + length of the string)
+         * + 1 byte to indicate if the super-column has been deleted
+         * + 4 bytes for size of the sub-columns
+         * + 4 bytes for the number of sub-columns
+         * + size of all the sub-columns.
+        */
+
+    	/*
+    	 * We store the string as UTF-8 encoded, so when we calculate the length, it
+    	 * should be converted to UTF-8.
+    	 */
+    	/*
+    	 * We need to keep the way we are calculating the column size in sync with the
+    	 * way we are calculating the size for the column family serializer.
+    	 */
+    	return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.intSize_ + DBConstants.intSize_ + getSizeOfAllColumns();
+    }
+
+    /**
+     * This calculates the exact size of the sub columns on the fly
+     */
+    int getSizeOfAllColumns()
+    {
+        int size = 0;
+        Collection<IColumn> subColumns = getSubColumns();
+        for ( IColumn subColumn : subColumns )
+        {
+            size += subColumn.serializedSize();
+        }
+        return size;
+    }
+
+    protected void remove(String columnName)
+    {
+    	columns_.remove(columnName);
+    }
+
+    public long timestamp()
+    {
+    	throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
+    }
+
+    public long timestamp(String key)
+    {
+    	IColumn column = columns_.get(key);
+    	if ( column instanceof SuperColumn )
+    		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+    	if ( column != null )
+    		return column.timestamp();
+    	throw new IllegalArgumentException("Timestamp was requested for a column that does not exist.");
+    }
+
+    public byte[] value()
+    {
+    	throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
+    }
+
+    public byte[] value(String key)
+    {
+    	IColumn column = columns_.get(key);
+    	if ( column instanceof SuperColumn )
+    		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+    	if ( column != null )
+    		return column.value();
+    	throw new IllegalArgumentException("Value was requested for a column that does not exist.");
+    }
+
+    public void addColumn(String name, IColumn column)
+    {
+    	if ( column instanceof SuperColumn )
+    		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+    	IColumn oldColumn = columns_.get(name);
+    	if ( oldColumn == null )
+        {
+    		columns_.put(name, column);
+            size_.addAndGet(column.size());
+        }
+    	else
+    	{
+    		if ( oldColumn.timestamp() <= column.timestamp() )
+            {
+    			columns_.put(name, column);
+                int delta = (-1)*oldColumn.size();
+                /* subtract the size of the oldColumn */
+                size_.addAndGet(delta);
+                /* add the size of the new column */
+                size_.addAndGet(column.size());
+            }
+    	}
+    }
+
+    /*
+     * Go through each sub column if it exists then as it to resolve itself
+     * if the column does not exist then create it.
+     */
+    public boolean putColumn(IColumn column)
+    {
+    	if ( !(column instanceof SuperColumn))
+    		throw new UnsupportedOperationException("Only Super column objects should be put here");
+    	if( !name_.equals(column.name()))
+    		throw new IllegalArgumentException("The name should match the name of the current column or super column");
+    	Collection<IColumn> columns = column.getSubColumns();
+
+        for ( IColumn subColumn : columns )
+        {
+       		addColumn(subColumn.name(), subColumn);
+        }
+        return false;
+    }
+
+    public int getObjectCount()
+    {
+    	return 1 + columns_.size();
+    }
+
+    public void delete()
+    {
+    	columns_.clear();
+    	isMarkedForDelete_.set(true);
+    }
+
+    int getColumnCount()
+    {
+    	return columns_.size();
+    }
+
+    public void repair(IColumn column)
+    {
+    	Collection<IColumn> columns = column.getSubColumns();
+
+        for ( IColumn subColumn : columns )
+        {
+        	IColumn columnInternal = columns_.get(subColumn.name());
+        	if( columnInternal == null )
+        		columns_.put(subColumn.name(), subColumn);
+        	else
+        		columnInternal.repair(subColumn);
+        }
+    }
+
+
+    public IColumn diff(IColumn column)
+    {
+    	IColumn  columnDiff = new SuperColumn(column.name());
+    	Collection<IColumn> columns = column.getSubColumns();
+
+        for ( IColumn subColumn : columns )
+        {
+        	IColumn columnInternal = columns_.get(subColumn.name());
+        	if(columnInternal == null )
+        	{
+        		columnDiff.addColumn(subColumn.name(), subColumn);
+        	}
+        	else
+        	{
+            	IColumn subColumnDiff = columnInternal.diff(subColumn);
+        		if(subColumnDiff != null)
+        		{
+            		columnDiff.addColumn(subColumn.name(), subColumnDiff);
+        		}
+        	}
+        }
+        if(columnDiff.getSubColumns().size() != 0)
+        	return columnDiff;
+        else
+        	return null;
+    }
+
+    public byte[] digest()
+    {
+    	Set<IColumn> columns = columns_.getSortedColumns();
+    	byte[] xorHash = new byte[0];
+    	if(name_ == null)
+    		return xorHash;
+    	xorHash = name_.getBytes();
+    	for(IColumn column : columns)
+    	{
+			xorHash = FBUtilities.xor(xorHash, column.digest());
+    	}
+    	return xorHash;
+    }
+
+
+    public String toString()
+    {
+    	StringBuilder sb = new StringBuilder();
+    	sb.append(name_);
+    	sb.append(":");
+        sb.append(isMarkedForDelete());
+        sb.append(":");
+
+        Collection<IColumn> columns  = getSubColumns();
+        sb.append(columns.size());
+        sb.append(":");
+        sb.append(size());
+        sb.append(":");
+        for ( IColumn subColumn : columns )
+        {
+            sb.append(subColumn.toString());
+        }
+        sb.append(":");
+        return sb.toString();
+    }
+}
+
+class SuperColumnSerializer implements ICompactSerializer2<IColumn>
+{
+    public void serialize(IColumn column, DataOutputStream dos) throws IOException
+    {
+    	SuperColumn superColumn = (SuperColumn)column;
+        dos.writeUTF(superColumn.name());
+        dos.writeBoolean(superColumn.isMarkedForDelete());
+
+        Collection<IColumn> columns  = column.getSubColumns();
+        int size = columns.size();
+        dos.writeInt(size);
+
+        /*
+         * Add the total size of the columns. This is useful
+         * to skip over all the columns in this super column
+         * if we are not interested in this super column.
+        */
+        dos.writeInt(superColumn.getSizeOfAllColumns());
+        // dos.writeInt(superColumn.size());
+
+        for ( IColumn subColumn : columns )
+        {
+            Column.serializer().serialize(subColumn, dos);
+        }
+    }
+
+    /*
+     * Use this method to create a bare bones Super Column. This super column
+     * does not have any of the Column information.
+    */
+    private SuperColumn defreezeSuperColumn(DataInputStream dis) throws IOException
+    {
+        String name = dis.readUTF();
+        boolean delete = dis.readBoolean();
+        SuperColumn superColumn = new SuperColumn(name);
+        if ( delete )
+            superColumn.delete();
+        return superColumn;
+    }
+
+    public IColumn deserialize(DataInputStream dis) throws IOException
+    {
+        SuperColumn superColumn = defreezeSuperColumn(dis);
+        if ( !superColumn.isMarkedForDelete() )
+            fillSuperColumn(superColumn, dis);
+        return superColumn;
+    }
+
+    public void skip(DataInputStream dis) throws IOException
+    {
+        defreezeSuperColumn(dis);
+        /* read the number of columns stored */
+        dis.readInt();
+        /* read the size of all columns to skip */
+        int size = dis.readInt();
+        dis.skip(size);
+    }
+    
+    private void fillSuperColumn(IColumn superColumn, DataInputStream dis) throws IOException
+    {
+        if ( dis.available() == 0 )
+            return;
+        
+        /* read the number of columns */
+        int size = dis.readInt();
+        /* read the size of all columns */
+        dis.readInt();
+        for ( int i = 0; i < size; ++i )
+        {
+            IColumn subColumn = Column.serializer().deserialize(dis);
+            superColumn.addColumn(subColumn.name(), subColumn);
+        }
+    }
+
+    public IColumn deserialize(DataInputStream dis, IFilter filter) throws IOException
+    {
+        if ( dis.available() == 0 )
+            return null;
+        
+        IColumn superColumn = defreezeSuperColumn(dis);
+        superColumn = filter.filter(superColumn, dis);
+        if(superColumn != null)
+        {
+            if ( !superColumn.isMarkedForDelete() )
+                fillSuperColumn(superColumn, dis);
+            return superColumn;
+        }
+        else
+        {
+            /* read the number of columns stored */
+            dis.readInt();
+            /* read the size of all columns to skip */
+            int size = dis.readInt();
+            dis.skip(size);
+        	return null;
+        }
+    }
+
+    /*
+     * Deserialize a particular column since the name is in the form of
+     * superColumn:column.
+    */
+    public IColumn deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
+    {
+        if ( dis.available() == 0 )
+            return null;
+        
+        String[] names = RowMutation.getColumnAndColumnFamily(name);
+        if ( names.length == 1 )
+        {
+            IColumn superColumn = defreezeSuperColumn(dis);
+            if(name.equals(superColumn.name()))
+            {
+                if ( !superColumn.isMarkedForDelete() )
+                {
+                    /* read the number of columns stored */
+                    int size = dis.readInt();
+                    /* read the size of all columns */
+                    dis.readInt();     
+                	IColumn column = null;
+                    for ( int i = 0; i < size; ++i )
+                    {
+                    	column = Column.serializer().deserialize(dis, filter);
+                    	if(column != null)
+                    	{
+                            superColumn.addColumn(column.name(), column);
+                    		column = null;
+                    		if(filter.isDone())
+                    		{
+                    			break;
+                    		}
+                    	}
+                    }
+                }
+                return superColumn;
+            }
+            else
+            {
+                /* read the number of columns stored */
+                dis.readInt();
+                /* read the size of all columns to skip */
+                int size = dis.readInt();
+                dis.skip(size);
+            	return null;
+            }
+        }
+
+        SuperColumn superColumn = defreezeSuperColumn(dis);
+        if ( !superColumn.isMarkedForDelete() )
+        {
+            int size = dis.readInt();
+            /* skip the size of the columns */
+            dis.readInt();
+            if ( size > 0 )
+            {
+                for ( int i = 0; i < size; ++i )
+                {
+                    IColumn subColumn = Column.serializer().deserialize(dis, names[1], filter);
+                    if ( subColumn != null )
+                    {
+                        superColumn.addColumn(subColumn.name(), subColumn);
+                        break;
+                    }
+                }
+            }
+        }
+        return superColumn;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SystemTable
+{
+    private static Logger logger_ = Logger.getLogger(SystemTable.class);
+    private static Map<String, SystemTable> instances_ = new HashMap<String, SystemTable>();
+
+    /* Name of the SystemTable */
+    public static final String name_ = "System";
+    /* Name of the only column family in the Table */
+    static final String cfName_ = "LocationInfo";
+    /* Name of columns in this table */
+    static final String generation_ = "Generation";
+    static final String token_ = "Token";
+
+    /* The ID associated with this column family */
+    static final int cfId_ = -1;
+
+    /* Table name. */
+    private String table_;
+    /* after the header position */
+    private long startPosition_ = 0L;
+    /* Cache the SystemRow that we read. */
+    private Row systemRow_;
+
+    /* Use the following writer/reader to write/read to System table */
+    private IFileWriter writer_;
+    private IFileReader reader_;
+
+    public static SystemTable openSystemTable(String tableName) throws IOException
+    {
+        SystemTable table = instances_.get("System");
+        if ( table == null )
+        {
+            table = new SystemTable(tableName);
+            instances_.put(tableName, table);
+        }
+        return table;
+    }
+
+    SystemTable(String table) throws IOException
+    {
+        table_ = table;
+        String systemTable = getFileName();
+        writer_ = SequenceFile.writer(systemTable);
+        reader_ = SequenceFile.reader(systemTable);
+    }
+
+    private String getFileName()
+    {
+        return DatabaseDescriptor.getMetadataDirectory() + System.getProperty("file.separator") + table_ + ".db";
+    }
+
+    /*
+     * Selects the row associated with the given key.
+    */
+    public Row get(String key) throws IOException
+    {
+        String file = getFileName();
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        reader_.next(bufOut);
+
+        if ( bufOut.getLength() > 0 )
+        {
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(bufOut.getData(), bufOut.getLength());
+            /*
+             * This buffer contains key and value so we need to strip
+             * certain parts
+           */
+            // read the key
+            bufIn.readUTF();
+            // read the data length and then deserialize
+            bufIn.readInt();
+            try
+            {
+                systemRow_ = Row.serializer().deserialize(bufIn);
+            }
+            catch ( IOException e )
+            {
+                logger_.debug( LogUtil.throwableToString(e) );
+            }
+        }
+        return systemRow_;
+    }
+
+    /*
+     * This is a one time thing and hence we do not need
+     * any commit log related activity. Just write in an
+     * atomic fashion to the underlying SequenceFile.
+    */
+    void apply(Row row) throws IOException
+    {
+        systemRow_ = row;
+        String file = getFileName();
+        long currentPos = writer_.getCurrentPosition();
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        Row.serializer().serialize(row, bufOut);
+        try
+        {
+            writer_.append(row.key(), bufOut);
+        }
+        catch ( IOException e )
+        {
+            writer_.seek(currentPos);
+            throw e;
+        }
+    }
+
+    /*
+     * This method is used to update the SystemTable with the
+     * new token.
+    */
+    public void updateToken(BigInteger token) throws IOException
+    {
+        if ( systemRow_ != null )
+        {
+            Map<String, ColumnFamily> columnFamilies = systemRow_.getColumnFamilies();
+            /* Retrieve the "LocationInfo" column family */
+            ColumnFamily columnFamily = columnFamilies.get(SystemTable.cfName_);
+            long oldTokenColumnTimestamp = columnFamily.getColumn(SystemTable.token_).timestamp();
+            /* create the "Token" whose value is the new token. */
+            IColumn tokenColumn = new Column(SystemTable.token_, token.toByteArray(), oldTokenColumnTimestamp + 1);
+            /* replace the old "Token" column with this new one. */
+            logger_.debug("Replacing old token " + new BigInteger( columnFamily.getColumn(SystemTable.token_).value() ).toString() + " with token " + token.toString());
+            columnFamily.addColumn(SystemTable.token_, tokenColumn);
+            reset(systemRow_);
+        }
+    }
+
+    public void reset(Row row) throws IOException
+    {
+        writer_.seek(startPosition_);
+        apply(row);
+    }
+
+    void delete(Row row) throws IOException
+    {
+        throw new UnsupportedOperationException("This operation is not supported for System tables");
+    }
+
+    public static void main(String[] args) throws Throwable
+    {
+        LogUtil.init();
+        StorageService.instance().start();
+        SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.hash("503545744:0") );
+        System.out.println("Done");
+
+        /*
+        BigInteger hash = StorageService.hash("304700067:0");
+        List<Range> ranges = new ArrayList<Range>();
+        ranges.add( new Range(new BigInteger("1218069462158869448693347920504606362273788442553"), new BigInteger("1092770595533781724218060956188429069")) );
+        if ( Range.isKeyInRanges(ranges, "304700067:0") )
+        {
+            System.out.println("Done");
+        }
+        */
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,967 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.analytics.DBAnalyticsSource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.io.IStreamComplete;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.service.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+*/
+
+public class Table
+{
+    /*
+     * This class represents the metadata of this Table. The metadata
+     * is basically the column family name and the ID associated with
+     * this column family. We use this ID in the Commit Log header to
+     * determine when a log file that has been rolled can be deleted.
+    */    
+    public static class TableMetadata
+    {
+        /* Name of the column family */
+        public final static String cfName_ = "TableMetadata";
+        /* 
+         * Name of one of the columns. The other columns are the individual
+         * column families in the system. 
+        */
+        public static final String cardinality_ = "PrimaryCardinality";
+        private static ICompactSerializer<TableMetadata> serializer_;
+        static
+        {
+            serializer_ = new TableMetadataSerializer();
+        }
+        
+        private static TableMetadata tableMetadata_;
+        /* Use the following writer/reader to write/read to Metadata table */
+        private static IFileWriter writer_;
+        private static IFileReader reader_;
+        
+        public static Table.TableMetadata instance() throws IOException
+        {
+            if ( tableMetadata_ == null )
+            {
+                String file = getFileName();
+                writer_ = SequenceFile.writer(file);        
+                reader_ = SequenceFile.reader(file);
+                Table.TableMetadata.load();
+                if ( tableMetadata_ == null )
+                    tableMetadata_ = new Table.TableMetadata();
+            }
+            return tableMetadata_;
+        }
+
+        static ICompactSerializer<TableMetadata> serializer()
+        {
+            return serializer_;
+        }
+        
+        private static void load() throws IOException
+        {            
+            String file = Table.TableMetadata.getFileName();
+            File f = new File(file);
+            if ( f.exists() )
+            {
+                DataOutputBuffer bufOut = new DataOutputBuffer();
+                DataInputBuffer bufIn = new DataInputBuffer();
+                
+                if ( reader_ == null )
+                {
+                    reader_ = SequenceFile.reader(file);
+                }
+                
+                while ( !reader_.isEOF() )
+                {
+                    /* Read the metadata info. */
+                    reader_.next(bufOut);
+                    bufIn.reset(bufOut.getData(), bufOut.getLength());
+
+                    /* The key is the table name */
+                    String key = bufIn.readUTF();
+                    /* read the size of the data we ignore this value */
+                    bufIn.readInt();
+                    tableMetadata_ = Table.TableMetadata.serializer().deserialize(bufIn);
+                    break;
+                }        
+            }            
+        }
+        
+        /* The mapping between column family and the column type. */
+        private Map<String, String> cfTypeMap_ = new HashMap<String, String>();
+        private Map<String, Integer> cfIdMap_ = new HashMap<String, Integer>();
+        private Map<Integer, String> idCfMap_ = new HashMap<Integer, String>();        
+        
+        private static String getFileName()
+        {
+            String table = DatabaseDescriptor.getTables().get(0);
+            return DatabaseDescriptor.getMetadataDirectory() + System.getProperty("file.separator") + table + "-Metadata.db";
+        }
+
+        public void add(String cf, int id)
+        {
+            add(cf, id, "Standard");
+        }
+        
+        public void add(String cf, int id, String type)
+        {
+            cfIdMap_.put(cf, id);
+            idCfMap_.put(id, cf);
+            cfTypeMap_.put(cf, type);
+        }
+        
+        boolean isEmpty()
+        {
+            return cfIdMap_.isEmpty();
+        }
+
+        int getColumnFamilyId(String columnFamily)
+        {
+            return cfIdMap_.get(columnFamily);
+        }
+
+        String getColumnFamilyName(int id)
+        {
+            return idCfMap_.get(id);
+        }
+        
+        String getColumnFamilyType(String cfName)
+        {
+            return cfTypeMap_.get(cfName);
+        }
+
+        void setColumnFamilyType(String cfName, String type)
+        {
+            cfTypeMap_.put(cfName, type);
+        }
+
+        Set<String> getColumnFamilies()
+        {
+            return cfIdMap_.keySet();
+        }
+        
+        int size()
+        {
+            return cfIdMap_.size();
+        }
+        
+        boolean isValidColumnFamily(String cfName)
+        {
+            return cfIdMap_.containsKey(cfName);
+        }
+        
+        void apply() throws IOException
+        {
+            String table = DatabaseDescriptor.getTables().get(0);
+            DataOutputBuffer bufOut = new DataOutputBuffer();
+            Table.TableMetadata.serializer_.serialize(this, bufOut);
+            try
+            {
+                writer_.append(table, bufOut);
+            }
+            catch ( IOException ex )
+            {
+                writer_.seek(0L);
+                logger_.debug(LogUtil.throwableToString(ex));
+            }
+        }
+        
+        public void reset() throws IOException
+        {        
+            writer_.seek(0L);
+            apply();
+        }
+        
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder("");
+            Set<String> cfNames = cfIdMap_.keySet();
+            
+            for ( String cfName : cfNames )
+            {
+                sb.append(cfName);
+                sb.append("---->");
+                sb.append(cfIdMap_.get(cfName));
+                sb.append(System.getProperty("line.separator"));
+            }
+            
+            return sb.toString();
+        }
+    }
+
+    static class TableMetadataSerializer implements ICompactSerializer<TableMetadata>
+    {
+        public void serialize(TableMetadata tmetadata, DataOutputStream dos) throws IOException
+        {
+            int size = tmetadata.cfIdMap_.size();
+            dos.writeInt(size);
+            Set<String> cfNames = tmetadata.cfIdMap_.keySet();
+
+            for ( String cfName : cfNames )
+            {
+                dos.writeUTF(cfName);
+                dos.writeInt( tmetadata.cfIdMap_.get(cfName).intValue() );
+                dos.writeUTF(tmetadata.getColumnFamilyType(cfName));
+            }            
+        }
+
+        public TableMetadata deserialize(DataInputStream dis) throws IOException
+        {
+            TableMetadata tmetadata = new TableMetadata();
+            int size = dis.readInt();
+            for( int i = 0; i < size; ++i )
+            {
+                String cfName = dis.readUTF();
+                int id = dis.readInt();
+                String type = dis.readUTF();
+                tmetadata.add(cfName, id, type);
+            }            
+            return tmetadata;
+        }
+    }
+
+    /**
+     * This is the callback handler that is invoked when we have
+     * completely been bootstrapped for a single file by a remote host.
+    */
+    public static class BootstrapCompletionHandler implements IStreamComplete
+    {                
+        public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+        {                        
+            /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */            
+            if ( streamContext.getTargetFile().indexOf("-Data.db") != -1 )
+            {
+                File file = new File( streamContext.getTargetFile() );
+                String fileName = file.getName();
+                /*
+                 * If the file is a Data File we need to load the indicies associated
+                 * with this file. We also need to cache the file name in the SSTables
+                 * list of the associated Column Family. Also merge the CBF into the
+                 * sampler.
+                */                
+                SSTable ssTable = new SSTable(streamContext.getTargetFile() );
+                ssTable.close();
+                logger_.debug("Merging the counting bloom filter in the sampler ...");                
+                String[] peices = FBUtilities.strip(fileName, "-");
+                Table.open(peices[0]).getColumnFamilyStore(peices[1]).addToList(streamContext.getTargetFile());                
+            }
+            
+            EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
+            logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
+            /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
+            StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
+            Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+            MessagingService.getMessagingInstance().sendOneWay(message, to);           
+        }
+    }
+
+    public static class BootStrapInitiateVerbHandler implements IVerbHandler
+    {
+        /*
+         * Here we handle the BootstrapInitiateMessage. Here we get the
+         * array of StreamContexts. We get file names for the column
+         * families associated with the files and replace them with the
+         * file names as obtained from the column family store on the
+         * receiving end.
+        */
+        public void doVerb(Message message)
+        {
+            byte[] body = (byte[])message.getMessageBody()[0];
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length); 
+            
+            try
+            {
+                BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
+                StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();                
+                
+                Map<String, String> fileNames = getNewNames(streamContexts);
+                /*
+                 * For each of stream context's in the incoming message
+                 * generate the new file names and store the new file names
+                 * in the StreamContextManager.
+                */
+                for (StreamContextManager.StreamContext streamContext : streamContexts )
+                {                    
+                    StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
+                    File sourceFile = new File( streamContext.getTargetFile() );
+                    String[] peices = FBUtilities.strip(sourceFile.getName(), "-");
+                    String newFileName = fileNames.get( peices[1] + "-" + peices[2] );
+                    
+                    String file = new String(DatabaseDescriptor.getDataFileLocation() + System.getProperty("file.separator") + newFileName + "-Data.db");
+                    logger_.debug("Received Data from  : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
+                    streamContext.setTargetFile(file);
+                    addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);                                            
+                }    
+                                             
+                StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new Table.BootstrapCompletionHandler());
+                /* Send a bootstrap initiation done message to execute on default stage. */
+                logger_.debug("Sending a bootstrap initiate done message ...");                
+                Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new Object[]{new byte[0]} );
+                MessagingService.getMessagingInstance().sendOneWay(doneMessage, message.getFrom());
+            }
+            catch ( IOException ex )
+            {
+                logger_.info(LogUtil.throwableToString(ex));
+            }
+        }
+        
+        private Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts)
+        {
+            /* 
+             * Mapping for each file with unique CF-i ---> new file name. For eg.
+             * for a file with name <Table>-<CF>-<i>-Data.db there is a corresponding
+             * <Table>-<CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+             * generated file name.
+            */
+            Map<String, String> fileNames = new HashMap<String, String>();
+            /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index file combination */
+            Set<String> distinctEntries = new HashSet<String>();
+            for ( StreamContextManager.StreamContext streamContext : streamContexts )
+            {
+                String[] peices = FBUtilities.strip(streamContext.getTargetFile(), "-");
+                distinctEntries.add(peices[1] + "-" + peices[2]);
+            }
+            
+            /* Generate unique file names per entry */
+            Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
+            Map<String, ColumnFamilyStore> columnFamilyStores = table.getColumnFamilyStores();
+            
+            for ( String distinctEntry : distinctEntries )
+            {
+                String[] peices = FBUtilities.strip(distinctEntry, "-");
+                ColumnFamilyStore cfStore = columnFamilyStores.get(peices[0]);
+                logger_.debug("Generating file name for " + distinctEntry + " ...");
+                fileNames.put(distinctEntry, cfStore.getNextFileName());
+            }
+            
+            return fileNames;
+        }
+        
+        private boolean isStreamContextForThisColumnFamily(StreamContextManager.StreamContext streamContext, String cf)
+        {
+            String[] peices = FBUtilities.strip(streamContext.getTargetFile(), "-");
+            return peices[1].equals(cf);
+        }
+        
+        private String getColumnFamilyFromFile(String file)
+        {
+            String[] peices = FBUtilities.strip(file, "-");
+            return peices[1];
+        }
+                
+        private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+        {
+            logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
+            StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+        }
+    }
+    
+    private static Logger logger_ = Logger.getLogger(Table.class);
+    public static final String newLine_ = System.getProperty("line.separator");
+    public static final String recycleBin_ = "RecycleColumnFamily";
+    public static final String hints_ = "HintsColumnFamily";
+    
+    /* Used to lock the factory for creation of Table instance */
+    private static Lock createLock_ = new ReentrantLock();
+    private static Map<String, Table> instances_ = new HashMap<String, Table>();
+    /* Table name. */
+    private String table_;
+    /* Handle to the Table Metadata */
+    private Table.TableMetadata tableMetadata_;
+    /* ColumnFamilyStore per column family */
+    private Map<String, ColumnFamilyStore> columnFamilyStores_ = new HashMap<String, ColumnFamilyStore>();
+    /* The AnalyticsSource instance which keeps track of statistics reported to Ganglia. */
+    private DBAnalyticsSource dbAnalyticsSource_;    
+    
+    public static Table open(String table)
+    {
+        Table tableInstance = instances_.get(table);
+        /*
+         * Read the config and figure the column families for this table.
+         * Set the isConfigured flag so that we do not read config all the
+         * time.
+        */
+        if ( tableInstance == null )
+        {
+            Table.createLock_.lock();
+            try
+            {
+                if ( tableInstance == null )
+                {
+                    tableInstance = new Table(table);
+                    instances_.put(table, tableInstance);
+                }
+            }
+            finally
+            {
+                createLock_.unlock();
+            }
+        }
+        return tableInstance;
+    }
+        
+    public Set<String> getColumnFamilies()
+    {
+        return tableMetadata_.getColumnFamilies();
+    }
+
+    Map<String, ColumnFamilyStore> getColumnFamilyStores()
+    {
+        return columnFamilyStores_;
+    }
+
+    ColumnFamilyStore getColumnFamilyStore(String cfName)
+    {
+        return columnFamilyStores_.get(cfName);
+    }
+    
+    String getColumnFamilyType(String cfName)
+    {
+        String cfType = null;
+        if ( tableMetadata_ != null )
+          cfType = tableMetadata_.getColumnFamilyType(cfName);
+        return cfType;
+    }
+
+    public void setColumnFamilyType(String cfName, String type)
+    {
+        tableMetadata_.setColumnFamilyType(cfName, type);
+    }
+    
+    /*
+     * This method is called to obtain statistics about
+     * the table. It will return statistics about all
+     * the column families that make up this table. 
+    */
+    public String tableStats(String newLineSeparator, java.text.DecimalFormat df)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(table_ + " statistics :");
+        sb.append(newLineSeparator);
+        int oldLength = sb.toString().length();
+        
+        Set<String> cfNames = columnFamilyStores_.keySet();
+        for ( String cfName : cfNames )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
+            sb.append(cfStore.cfStats(newLineSeparator, df));
+        }
+        int newLength = sb.toString().length();
+        
+        /* Don't show anything if there is nothing to show. */
+        if ( newLength == oldLength )
+            return "";
+        
+        return sb.toString();
+    }
+
+    void onStart() throws IOException
+    {
+        /* Cache the callouts if any */
+        CalloutManager.instance().onStart();
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        for ( String columnFamily : columnFamilies )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+            if ( cfStore != null )
+                cfStore.onStart();
+        }        
+    }
+    
+    /** 
+     * Do a cleanup of keys that do not belong locally.
+     */
+    public void doGC()
+    {
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        for ( String columnFamily : columnFamilies )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+            if ( cfStore != null )
+                cfStore.forceCleanup();
+        }   
+    }
+    
+    
+    /*
+     * This method is used to ensure that all keys
+     * prior to the specified key, as dtermined by
+     * the SSTable index bucket it falls in, are in
+     * buffer cache.  
+    */
+    public void touch(String key, boolean fData) throws IOException
+    {
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        for ( String columnFamily : columnFamilies )
+        {
+            if ( DatabaseDescriptor.isApplicationColumnFamily(columnFamily) )
+            {
+                ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+                if ( cfStore != null )
+                    cfStore.touch(key, fData);
+            }
+        }
+    }
+    
+    /*
+     * Take a snapshot of the entire set of column families.
+    */
+    public void snapshot( String clientSuppliedName ) throws IOException
+    {
+    	String snapshotDirectory = DatabaseDescriptor.getSnapshotDirectory();
+    	File snapshotDir = new File(snapshotDirectory);
+    	if( !snapshotDir.exists() )
+    		snapshotDir.mkdir();
+
+    	String currentSnapshotDir = null;
+    	if( clientSuppliedName != null && !clientSuppliedName.equals("") )
+    		currentSnapshotDir = snapshotDirectory + System.getProperty("file.separator") + System.currentTimeMillis() + "-" + clientSuppliedName; 
+    	else
+    		currentSnapshotDir = snapshotDirectory + System.getProperty("file.separator") + System.currentTimeMillis();
+
+    	/* First take a snapshot of the commit logs */
+    	CommitLog.open(table_).snapshot( currentSnapshotDir );
+    	/* force roll over the commit log */
+    	CommitLog.open(table_).setForcedRollOver();
+    	/* Now take a snapshot of all columnfamily stores */
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        for ( String columnFamily : columnFamilies )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+            if ( cfStore != null )
+                cfStore.snapshot( currentSnapshotDir );
+        }
+    }
+
+    /*
+     * Clear the existing snapshots in the system
+     */
+    public void clearSnapshot()
+    {
+    	String snapshotDir = DatabaseDescriptor.getSnapshotDirectory();
+    	File snapshot = new File(snapshotDir);
+    	FileUtils.deleteDir(snapshot);
+    }
+    
+    /*
+     * This method is invoked only during a bootstrap process. We basically
+     * do a complete compaction since we can figure out based on the ranges
+     * whether the files need to be split.
+    */
+    public boolean forceCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
+    {
+        boolean result = true;
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        for ( String columnFamily : columnFamilies )
+        {
+            if ( !isApplicationColumnFamily(columnFamily) )
+                continue;
+            
+            ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+            if ( cfStore != null )
+            {
+                /* Counting Bloom Filter for the Column Family */
+                cfStore.forceCompaction(ranges, target, 0, fileList);                
+            }
+        }
+        return result;
+    }
+    
+    /*
+     * This method is an ADMIN operation to force compaction
+     * of all SSTables on disk. 
+    */
+    public void forceCompaction() throws IOException
+    {
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        for ( String columnFamily : columnFamilies )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+            if ( cfStore != null )
+                MinorCompactionManager.instance().submitMajor(cfStore, null, 0);
+        }
+    }
+
+    /*
+     * Get the list of all SSTables on disk. 
+    */
+    public List<String> getAllSSTablesOnDisk()
+    {
+        List<String> list = new ArrayList<String>();
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        for ( String columnFamily : columnFamilies )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
+            if ( cfStore != null )
+                list.addAll( cfStore.getAllSSTablesOnDisk() );
+        }
+        return list;
+    }
+
+    private Table(String table)
+    {
+        table_ = table;
+        dbAnalyticsSource_ = new DBAnalyticsSource();
+        try
+        {
+            tableMetadata_ = Table.TableMetadata.instance();
+            Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+            for ( String columnFamily : columnFamilies )
+            {
+                columnFamilyStores_.put( columnFamily, new ColumnFamilyStore(table, columnFamily) );
+            }
+        }
+        catch ( IOException ex )
+        {
+            logger_.info(LogUtil.throwableToString(ex));
+        }
+    }
+
+    String getTableName()
+    {
+        return table_;
+    }
+    
+    boolean isApplicationColumnFamily(String columnFamily)
+    {
+        return DatabaseDescriptor.isApplicationColumnFamily(columnFamily);
+    }
+
+    int getNumberOfColumnFamilies()
+    {
+        return tableMetadata_.size();
+    }
+
+    int getColumnFamilyId(String columnFamily)
+    {
+        return tableMetadata_.getColumnFamilyId(columnFamily);
+    }
+
+    String getColumnFamilyName(int id)
+    {
+        return tableMetadata_.getColumnFamilyName(id);
+    }
+
+    boolean isValidColumnFamily(String columnFamily)
+    {
+        return tableMetadata_.isValidColumnFamily(columnFamily);
+    }
+
+    /**
+     * Selects the row associated with the given key.
+    */
+    public Row get(String key) throws IOException
+    {        
+        Row row = new Row(key);
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        long start = System.currentTimeMillis();
+        for ( String columnFamily : columnFamilies )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily);
+            if ( cfStore != null )
+            {                
+                ColumnFamily cf = cfStore.getColumnFamily(key, columnFamily, new IdentityFilter());                
+                if ( cf != null )
+                    row.addColumnFamily(cf);
+            }
+        }
+        
+        long timeTaken = System.currentTimeMillis() - start;
+        dbAnalyticsSource_.updateReadStatistics(timeTaken);
+        return row;
+    }
+    
+    public Row getRowFromMemory(String key)
+    {
+        Row row = new Row(key);
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        long start = System.currentTimeMillis();
+        for ( String columnFamily : columnFamilies )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily);
+            if ( cfStore != null )
+            {    
+                ColumnFamily cf = cfStore.getColumnFamilyFromMemory(key, columnFamily, new IdentityFilter());
+                if ( cf != null )
+                    row.addColumnFamily(cf);
+            }
+        }
+        long timeTaken = System.currentTimeMillis() - start;
+        dbAnalyticsSource_.updateReadStatistics(timeTaken);
+        return row;
+    }
+
+
+    /**
+     * Selects the specified column family for the specified key.
+    */
+    public ColumnFamily get(String key, String cf) throws ColumnFamilyNotDefinedException, IOException
+    {
+        String[] values = RowMutation.getColumnAndColumnFamily(cf);
+        long start = System.currentTimeMillis();
+        ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
+        if ( cfStore != null )
+        {
+            ColumnFamily columnFamily = cfStore.getColumnFamily(key, cf, new IdentityFilter());
+            long timeTaken = System.currentTimeMillis() - start;
+            dbAnalyticsSource_.updateReadStatistics(timeTaken);
+            return columnFamily;
+        }
+        else
+        {
+            throw new ColumnFamilyNotDefinedException("Column family " + cf + " has not been defined");
+        }
+    }
+
+    /**
+     * Selects only the specified column family for the specified key.
+    */
+    public Row getRow(String key, String cf) throws ColumnFamilyNotDefinedException, IOException
+    {
+        Row row = new Row(key);
+        ColumnFamily columnFamily = get(key, cf);
+        if ( columnFamily != null )
+        	row.addColumnFamily(columnFamily);
+        return row;
+    }
+  
+    /**
+     * Selects only the specified column family for the specified key.
+    */
+    public Row getRow(String key, String cf, int start, int count) throws ColumnFamilyNotDefinedException, IOException
+    {
+        Row row = new Row(key);
+        String[] values = RowMutation.getColumnAndColumnFamily(cf);
+        ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
+        long start1 = System.currentTimeMillis();
+        if ( cfStore != null )
+        {
+            ColumnFamily columnFamily = cfStore.getColumnFamily(key, cf, new CountFilter(count));
+            if ( columnFamily != null )
+                row.addColumnFamily(columnFamily);
+            long timeTaken = System.currentTimeMillis() - start1;
+            dbAnalyticsSource_.updateReadStatistics(timeTaken);
+            return row;
+        }
+        else
+            throw new ColumnFamilyNotDefinedException("Column family " + cf + " has not been defined");
+    }
+    
+    public Row getRow(String key, String cf, long sinceTimeStamp) throws ColumnFamilyNotDefinedException, IOException
+    {
+        Row row = new Row(key);
+        String[] values = RowMutation.getColumnAndColumnFamily(cf);
+        ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
+        long start1 = System.currentTimeMillis();
+        if ( cfStore != null )
+        {
+            ColumnFamily columnFamily = cfStore.getColumnFamily(key, cf, new TimeFilter(sinceTimeStamp));
+            if ( columnFamily != null )
+                row.addColumnFamily(columnFamily);
+            long timeTaken = System.currentTimeMillis() - start1;
+            dbAnalyticsSource_.updateReadStatistics(timeTaken);
+            return row;
+        }
+        else
+            throw new ColumnFamilyNotDefinedException("Column family " + cf + " has not been defined");
+    }
+
+    /**
+     * This method returns the specified columns for the specified
+     * column family.
+     * 
+     *  param @ key - key for which data is requested.
+     *  param @ cf - column family we are interested in.
+     *  param @ columns - columns that are part of the above column family.
+    */
+    public Row getRow(String key, String cf, List<String> columns) throws IOException
+    {
+    	Row row = new Row(key);
+        String[] values = RowMutation.getColumnAndColumnFamily(cf);
+        ColumnFamilyStore cfStore = columnFamilyStores_.get(values[0]);
+
+        if ( cfStore != null )
+        {
+        	ColumnFamily columnFamily = cfStore.getColumnFamily(key, cf, new NamesFilter(new ArrayList<String>(columns)));
+        	if ( columnFamily != null )
+        		row.addColumnFamily(columnFamily);
+        }
+    	return row;
+    }
+
+    /**
+     * This method adds the row to the Commit Log associated with this table.
+     * Once this happens the data associated with the individual column families
+     * is also written to the column family store's memtable.
+    */
+    public void apply(Row row) throws IOException
+    {        
+        String key = row.key();
+        /* Add row to the commit log. */
+        long start = System.currentTimeMillis();
+               
+        CommitLog.CommitLogContext cLogCtx = CommitLog.open(table_).add(row);
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        Set<String> cNames = columnFamilies.keySet();
+        for ( String cName : cNames )
+        {
+        	ColumnFamily columnFamily = columnFamilies.get(cName);
+            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+            cfStore.apply( key, columnFamily, cLogCtx);            
+        }
+        row.clear();
+        long timeTaken = System.currentTimeMillis() - start;
+        dbAnalyticsSource_.updateWriteStatistics(timeTaken);
+    }
+
+    void applyNow(Row row) throws IOException
+    {
+        String key = row.key();
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+
+        Set<String> cNames = columnFamilies.keySet();
+        for ( String cName : cNames )
+        {
+            ColumnFamily columnFamily = columnFamilies.get(cName);
+            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+            cfStore.applyNow( key, columnFamily );
+        }
+    }
+
+    public void flush(boolean fRecovery) throws IOException
+    {
+        Set<String> cfNames = columnFamilyStores_.keySet();
+        for ( String cfName : cfNames )
+        {
+            columnFamilyStores_.get(cfName).forceFlush(fRecovery);
+        }
+    }
+
+    void delete(Row row) throws IOException
+    {
+        String key = row.key();
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+
+        /* Add row to commit log */
+        CommitLog.open(table_).add(row);
+        Set<String> cNames = columnFamilies.keySet();
+
+        for ( String cName : cNames )
+        {
+        	ColumnFamily columnFamily = columnFamilies.get(cName);
+            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+            cfStore.delete( key, columnFamily );
+        }
+    }
+
+    void load(Row row) throws IOException
+    {
+        String key = row.key();
+        /* Add row to the commit log. */
+        long start = System.currentTimeMillis();
+                
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        Set<String> cNames = columnFamilies.keySet();
+        for ( String cName : cNames )
+        {
+        	if( cName.equals(Table.recycleBin_))
+        	{
+	        	ColumnFamily columnFamily = columnFamilies.get(cName);
+	        	Collection<IColumn> columns = columnFamily.getAllColumns();
+        		for(IColumn column : columns)
+        		{
+    	            ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
+    	            if(column.timestamp() == 1)
+    	            {
+    	            	cfStore.forceFlushBinary();
+    	            }
+    	            else if(column.timestamp() == 2)
+    	            {
+    	            	cfStore.forceCompaction(null, null, BasicUtilities.byteArrayToLong(column.value()), null);
+    	            }
+    	            else if(column.timestamp() == 3)
+    	            {
+    	            	cfStore.forceFlush(false);
+    	            }
+    	            else if(column.timestamp() == 4)
+    	            {
+    	            	cfStore.forceCleanup();
+    	            }
+    	            else if(column.timestamp() == 5)
+    	            {
+    	            	cfStore.snapshot( new String(column.value()) );
+    	            }
+    	            else
+    	            {
+    	            	cfStore.applyBinary(key, column.value());
+    	            }
+        		}
+        	}
+        }
+        row.clear();
+        long timeTaken = System.currentTimeMillis() - start;
+        dbAnalyticsSource_.updateWriteStatistics(timeTaken);
+    }
+
+    public static void main(String[] args) throws Throwable
+    {
+        StorageService service = StorageService.instance();
+        service.start();
+        Table table = Table.open("Mailbox");
+        Row row = table.get("35300190:1");
+        System.out.println( row.key() );
+    }
+}