You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:39:42 UTC

svn commit: r759026 [1/3] - /incubator/cassandra/trunk/src/org/apache/cassandra/db/

Author: alakshman
Date: Fri Mar 27 05:39:40 2009
New Revision: 759026

URL: http://svn.apache.org/viewvc?rev=759026&view=rev
Log:
This is a wierd revert to fix some issues. Some changes will need to be re-applied.

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java
      - copied unchanged from r758964, incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponseMessage.java
      - copied unchanged from r758964, incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponseMessage.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java Fri Mar 27 05:39:40 2009
@@ -31,13 +31,13 @@
 abstract class AbstractColumnFactory
 {
     private static Map<String, AbstractColumnFactory> columnFactory_ = new HashMap<String, AbstractColumnFactory>();
-
+	
 	static
 	{
 		columnFactory_.put(ColumnFamily.getColumnType("Standard"),new ColumnFactory());
 		columnFactory_.put(ColumnFamily.getColumnType("Super"),new SuperColumnFactory());
 	}
-
+	
 	static AbstractColumnFactory getColumnFactory(String columnType)
 	{
 		/* Create based on the type required. */
@@ -46,11 +46,10 @@
 		else
 			return columnFactory_.get("Super");
 	}
-
+    
 	public abstract IColumn createColumn(String name);
 	public abstract IColumn createColumn(String name, byte[] value);
-    public abstract IColumn createColumn(String name, byte[] value, long timestamp);
-    public abstract IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted);
+	public abstract IColumn createColumn(String name, byte[] value, long timestamp);
     public abstract ICompactSerializer2<IColumn> createColumnSerializer();
 }
 
@@ -60,21 +59,17 @@
 	{
 		return new Column(name);
 	}
-
+	
 	public IColumn createColumn(String name, byte[] value)
 	{
 		return new Column(name, value);
 	}
-
+	
 	public IColumn createColumn(String name, byte[] value, long timestamp)
 	{
 		return new Column(name, value, timestamp);
 	}
-
-    public IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted) {
-        return new Column(name, value, timestamp, deleted);
-    }
-
+    
     public ICompactSerializer2<IColumn> createColumnSerializer()
     {
         return Column.serializer();
@@ -108,28 +103,29 @@
         }
 		return superColumn;
 	}
-
+	
 	public IColumn createColumn(String name, byte[] value)
 	{
-        return createColumn(name, value, 0);
+		String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+        if ( values.length != 2 )
+            throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+        IColumn superColumn = new SuperColumn(values[0]);
+        IColumn subColumn = new Column(values[1], value);
+        superColumn.addColumn(values[1], subColumn);
+		return superColumn;
 	}
-
-    public IColumn createColumn(String name, byte[] value, long timestamp)
-    {
-        return createColumn(name, value, timestamp, false);
-    }
-
-    public IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted)
+	
+	public IColumn createColumn(String name, byte[] value, long timestamp)
 	{
 		String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
         if ( values.length != 2 )
             throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
         IColumn superColumn = new SuperColumn(values[0]);
-        IColumn subColumn = new Column(values[1], value, timestamp, deleted);
+        IColumn subColumn = new Column(values[1], value, timestamp);
         superColumn.addColumn(values[1], subColumn);
 		return superColumn;
 	}
-
+    
     public ICompactSerializer2<IColumn> createColumnSerializer()
     {
         return SuperColumn.serializer();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java Fri Mar 27 05:39:40 2009
@@ -29,9 +29,8 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.io.SSTable;
-
+import org.apache.cassandra.utils.BloomFilter;
 import org.apache.log4j.Logger;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java Fri Mar 27 05:39:40 2009
@@ -18,41 +18,60 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
-
-import org.apache.commons.lang.ArrayUtils;
-
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
 
 
 /**
- * Column is immutable, which prevents all kinds of confusion in a multithreaded environment.
- * (TODO: look at making SuperColumn immutable too.  This is trickier but is probably doable
- *  with something like PCollections -- http://code.google.com
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com ) & Prashant Malik ( pmalik@facebook.com )
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
 
-public final class Column implements IColumn
+public final class Column implements IColumn, Serializable
 {
-    private static ColumnSerializer serializer_ = new ColumnSerializer();
+	private static Logger logger_ = Logger.getLogger(SuperColumn.class);
+    private static ICompactSerializer2<IColumn> serializer_;
+	private final static String seperator_ = ":";
+    static
+    {
+        serializer_ = new ColumnSerializer();
+    }
 
-    static ColumnSerializer serializer()
+    static ICompactSerializer2<IColumn> serializer()
     {
         return serializer_;
     }
 
-    private final String name;
-    private final byte[] value;
-    private final long timestamp;
-    private final boolean isMarkedForDelete;
+    private String name_;
+    private byte[] value_ = new byte[0];
+    private long timestamp_ = 0;
+
+    private transient AtomicBoolean isMarkedForDelete_;
+
+    /* CTOR for JAXB */
+    Column()
+    {
+    }
 
     Column(String name)
     {
-        this(name, ArrayUtils.EMPTY_BYTE_ARRAY);
+        name_ = name;
     }
 
     Column(String name, byte[] value)
@@ -62,71 +81,54 @@
 
     Column(String name, byte[] value, long timestamp)
     {
-        this(name, value, timestamp, false);
-    }
-
-    Column(String name, byte[] value, long timestamp, boolean isDeleted)
-    {
-        assert name != null;
-        assert value != null;
-        this.name = name;
-        this.value = value;
-        this.timestamp = timestamp;
-        isMarkedForDelete = isDeleted;
+        this(name);
+        value_ = value;
+        timestamp_ = timestamp;
     }
 
     public String name()
     {
-        return name;
-    }
-
-    public IColumn getSubColumn(String columnName)
-    {
-        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+        return name_;
     }
 
     public byte[] value()
     {
-        return value;
+        return value_;
     }
 
     public byte[] value(String key)
     {
-        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
 
     public Collection<IColumn> getSubColumns()
     {
-        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+    }
+
+    public IColumn getSubColumn( String columnName )
+    {
+    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
 
     public int getObjectCount()
     {
-        return 1;
+    	return 1;
     }
 
     public long timestamp()
     {
-        return timestamp;
+        return timestamp_;
     }
 
     public long timestamp(String key)
     {
-        throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+    	throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
 
     public boolean isMarkedForDelete()
     {
-        return isMarkedForDelete;
-    }
-
-    public long getMarkedForDeleteAt()
-    {
-        if (!isMarkedForDelete())
-        {
-            throw new IllegalStateException("column is not marked for delete");
-        }
-        return timestamp;
+        return (isMarkedForDelete_ != null) ? isMarkedForDelete_.get() : false;
     }
 
     public int size()
@@ -140,11 +142,11 @@
          * + entire byte array.
         */
 
-        /*
-           * We store the string as UTF-8 encoded, so when we calculate the length, it
-           * should be converted to UTF-8.
-           */
-        return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value.length;
+    	/*
+    	 * We store the string as UTF-8 encoded, so when we calculate the length, it
+    	 * should be converted to UTF-8.
+    	 */
+        return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value_.length;
     }
 
     /*
@@ -153,43 +155,112 @@
     */
     public int serializedSize()
     {
-        return size();
+    	return size();
     }
 
     public void addColumn(String name, IColumn column)
     {
-        throw new UnsupportedOperationException("This operation is not supported for simple columns.");
+    	throw new UnsupportedOperationException("This operation is not supported for simple columns.");
+    }
+
+    public void delete()
+    {
+        if ( isMarkedForDelete_ == null )
+            isMarkedForDelete_ = new AtomicBoolean(true);
+        else
+            isMarkedForDelete_.set(true);
+    	value_ = new byte[0];
     }
 
+    public void repair(IColumn column)
+    {
+    	if( timestamp() < column.timestamp() )
+    	{
+    		value_ = column.value();
+    		timestamp_ = column.timestamp();
+    	}
+    }
     public IColumn diff(IColumn column)
     {
-        if (timestamp() < column.timestamp())
-        {
-            return column;
-        }
-        return null;
+    	IColumn  columnDiff = null;
+    	if( timestamp() < column.timestamp() )
+    	{
+    		columnDiff = new Column(column.name(),column.value(),column.timestamp());
+    	}
+    	return columnDiff;
+    }
+
+    /*
+     * Resolve the column by comparing timestamps
+     * if a newer vaue is being input
+     * take the change else ignore .
+     *
+     */
+    public boolean putColumn(IColumn column)
+    {
+    	if ( !(column instanceof Column))
+    		throw new UnsupportedOperationException("Only Column objects should be put here");
+    	if( !name_.equals(column.name()))
+    		throw new IllegalArgumentException("The name should match the name of the current column or super column");
+    	if(timestamp_ <= column.timestamp())
+    	{
+            return true;
+    	}
+        return false;
     }
 
     public String toString()
     {
-        StringBuilder sb = new StringBuilder();
-        sb.append(name);
-        sb.append(":");
-        sb.append(isMarkedForDelete());
-        sb.append(":");
-        sb.append(value().length);
-        sb.append("@");
-        sb.append(timestamp());
-        return sb.toString();
+    	StringBuilder sb = new StringBuilder();
+    	sb.append(name_);
+    	sb.append(":");
+    	sb.append(isMarkedForDelete());
+    	sb.append(":");
+    	sb.append(timestamp());
+    	sb.append(":");
+    	sb.append(value().length);
+    	sb.append(":");
+    	sb.append(value());
+    	sb.append(":");
+    	return sb.toString();
     }
 
     public byte[] digest()
     {
-        StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append(name);
-        stringBuilder.append(":");
-        stringBuilder.append(timestamp);
-        return stringBuilder.toString().getBytes();
+    	StringBuilder stringBuilder = new StringBuilder();
+  		stringBuilder.append(name_);
+  		stringBuilder.append(seperator_);
+  		stringBuilder.append(timestamp_);
+    	return stringBuilder.toString().getBytes();
+    }
+    
+    /**
+     * This method is basically implemented for Writable interface
+     * for M/R. 
+     */
+    public void readFields(DataInput in) throws IOException
+    {
+        name_ = in.readUTF();
+        boolean delete = in.readBoolean();
+        long ts = in.readLong();
+        int size = in.readInt();
+        byte[] value = new byte[size];
+        in.readFully(value);        
+        if ( delete )
+            delete();
+    }
+    
+    /**
+     * This method is basically implemented for Writable interface
+     * for M/R. 
+     */
+    public void write(DataOutput out) throws IOException
+    {
+        out.writeUTF(name_);
+        out.writeBoolean(isMarkedForDelete());
+        out.writeLong(timestamp_);
+        out.writeInt(value().length);
+        out.write(value());
     }
 
 }
@@ -213,7 +284,9 @@
         int size = dis.readInt();
         byte[] value = new byte[size];
         dis.readFully(value);
-        column = new Column(name, value, ts, delete);
+        column = new Column(name, value, ts);
+        if ( delete )
+            column.delete();
         return column;
     }
 
@@ -230,7 +303,7 @@
     {
         if ( dis.available() == 0 )
             return null;
-
+                
         String name = dis.readUTF();
         IColumn column = new Column(name);
         column = filter.filter(column, dis);
@@ -266,8 +339,8 @@
             	/*
             	 * If this is being called with identity filter
             	 * since a column name is passed in we know
-            	 * that this is a final call
-            	 * Hence if the column is found set the filter to done
+            	 * that this is a final call 
+            	 * Hence if the column is found set the filter to done 
             	 * so that we do not look for the column in further files
             	 */
             	IdentityFilter f = (IdentityFilter)filter;
@@ -295,6 +368,5 @@
         /* size of the column */
         int size = dis.readInt();
         dis.skip(size);
-    }
+    }    
 }
-

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java Fri Mar 27 05:39:40 2009
@@ -28,123 +28,127 @@
 
 public class ColumnComparatorFactory
 {
-    public static enum ComparatorType
-    {
-        NAME,
-        TIMESTAMP
-    }
-
-    private static Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
-    private static Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
-
-    public static Comparator<IColumn> getComparator(ComparatorType comparatorType)
-    {
-        Comparator<IColumn> columnComparator = timestampComparator_;
-
-        switch (comparatorType)
-        {
-            case NAME:
-                columnComparator = nameComparator_;
-                break;
-
-            case TIMESTAMP:
-
-            default:
-                columnComparator = timestampComparator_;
-                break;
-        }
-
-        return columnComparator;
-    }
-
-    public static Comparator<IColumn> getComparator(int comparatorTypeInt)
-    {
-        ComparatorType comparatorType = ComparatorType.NAME;
-
-        if (comparatorTypeInt == ComparatorType.NAME.ordinal())
-        {
-            comparatorType = ComparatorType.NAME;
-        }
-        else if (comparatorTypeInt == ComparatorType.TIMESTAMP.ordinal())
-        {
-            comparatorType = ComparatorType.TIMESTAMP;
-        }
-        return getComparator(comparatorType);
-    }
-
+	public static enum ComparatorType
+	{
+		NAME,
+		TIMESTAMP
+	}
+
+	private static Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
+	private static Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
+
+	public static Comparator<IColumn> getComparator(ComparatorType comparatorType)
+	{
+		Comparator<IColumn> columnComparator = timestampComparator_;
+
+		switch(comparatorType)
+		{
+			case NAME:
+				columnComparator = nameComparator_;
+				break;
+
+			case TIMESTAMP:
+
+			default:
+				columnComparator = timestampComparator_;
+				break;
+		}
+
+		return columnComparator;
+	}
+
+	public static Comparator<IColumn> getComparator(int comparatorTypeInt)
+	{
+		ComparatorType comparatorType = ComparatorType.NAME;
+
+		if(comparatorTypeInt == ComparatorType.NAME.ordinal())
+		{
+			comparatorType = ComparatorType.NAME;
+		}
+		else if(comparatorTypeInt == ComparatorType.TIMESTAMP.ordinal())
+		{
+			comparatorType = ComparatorType.TIMESTAMP;
+		}
+		return getComparator(comparatorType);
+	}
+
+	public static void main(String[] args)
+	{
+		IColumn col1 = new Column("Column-9");
+		IColumn col2 = new Column("Column-10");
+		System.out.println("Result of compare: " + getComparator(ComparatorType.NAME).compare(col1, col2));
+	}
 }
 
 abstract class AbstractColumnComparator implements Comparator<IColumn>, Serializable
 {
-    protected ColumnComparatorFactory.ComparatorType comparatorType_;
-
-    public AbstractColumnComparator(ColumnComparatorFactory.ComparatorType comparatorType)
-    {
-        comparatorType_ = comparatorType;
-    }
+	protected ColumnComparatorFactory.ComparatorType comparatorType_;
 
-    ColumnComparatorFactory.ComparatorType getComparatorType()
-    {
-        return comparatorType_;
-    }
+	public AbstractColumnComparator(ColumnComparatorFactory.ComparatorType comparatorType)
+	{
+		comparatorType_ = comparatorType;
+	}
+
+	ColumnComparatorFactory.ComparatorType getComparatorType()
+	{
+		return comparatorType_;
+	}
 }
 
 class ColumnTimestampComparator extends AbstractColumnComparator
 {
-    ColumnTimestampComparator()
-    {
-        super(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
-    }
+	ColumnTimestampComparator()
+	{
+		super(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
+	}
 
-    /* if the time-stamps are the same then sort by names */
+	/* if the time-stamps are the same then sort by names */
     public int compare(IColumn column1, IColumn column2)
     {
-        assert column1.getClass() == column2.getClass();
-        /* inverse sort by time to get hte latest first */
-        long result = column2.timestamp() - column1.timestamp();
-        int finalResult = 0;
-        if (result == 0)
-        {
-            result = column1.name().compareTo(column2.name());
-        }
-        if (result > 0)
-        {
-            finalResult = 1;
-        }
-        if (result < 0)
-        {
-            finalResult = -1;
-        }
+    	/* inverse sort by time to get hte latest first */
+    	long result = column2.timestamp() - column1.timestamp();
+    	int finalResult = 0;
+    	if(result == 0)
+    	{
+    		result = column1.name().compareTo(column2.name());
+    	}
+    	if(result > 0)
+    	{
+    		finalResult = 1;
+    	}
+    	if( result < 0 )
+    	{
+    		finalResult = -1;
+    	}
         return finalResult;
     }
 }
 
 class ColumnNameComparator extends AbstractColumnComparator
 {
-    ColumnNameComparator()
-    {
-        super(ColumnComparatorFactory.ComparatorType.NAME);
-    }
+	ColumnNameComparator()
+	{
+		super(ColumnComparatorFactory.ComparatorType.NAME);
+	}
 
     /* if the names are the same then sort by time-stamps */
     public int compare(IColumn column1, IColumn column2)
     {
-        assert column1.getClass() == column2.getClass();
-        long result = column1.name().compareTo(column2.name());
-        int finalResult = 0;
-        if (result == 0 && (column1 instanceof Column))
-        {
-            /* inverse sort by time to get the latest first */
-            result = column2.timestamp() - column1.timestamp();
-        }
-        if (result > 0)
-        {
-            finalResult = 1;
-        }
-        if (result < 0)
-        {
-            finalResult = -1;
-        }
+    	long result = column1.name().compareTo(column2.name());
+    	int finalResult = 0;
+    	if(result == 0)
+    	{
+    		/* inverse sort by time to get hte latest first */
+    		result = column2.timestamp() - column1.timestamp();
+    	}
+    	if(result > 0)
+    	{
+    		finalResult = 1;
+    	}
+    	if( result < 0 )
+    	{
+    		finalResult = -1;
+    	}
         return finalResult;
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Fri Mar 27 05:39:40 2009
@@ -18,29 +18,28 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.reflect.Proxy;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
-public final class ColumnFamily
+
+public final class ColumnFamily implements Serializable
 {
     private static ICompactSerializer2<ColumnFamily> serializer_;
     public static final short utfPrefix_ = 2;   
@@ -57,7 +56,7 @@
         /* TODO: These are the various column types. Hard coded for now. */
         columnTypes_.put("Standard", "Standard");
         columnTypes_.put("Super", "Super");
-
+        
         indexTypes_.put("Name", "Name");
         indexTypes_.put("Time", "Time");
     }
@@ -71,7 +70,7 @@
      * This method returns the serializer whose methods are
      * preprocessed by a dynamic proxy.
     */
-    public static ICompactSerializer2<ColumnFamily> serializerWithIndexes()
+    public static ICompactSerializer2<ColumnFamily> serializer2()
     {
         return (ICompactSerializer2<ColumnFamily>)Proxy.newProxyInstance( ColumnFamily.class.getClassLoader(), new Class[]{ICompactSerializer2.class}, new CompactSerializerInvocationHandler<ColumnFamily>(serializer_) );
     }
@@ -95,9 +94,9 @@
 
     private String name_;
 
-    private transient ICompactSerializer2<IColumn> columnSerializer_;
-    private long markedForDeleteAt = Long.MIN_VALUE;
-    private AtomicInteger size_ = new AtomicInteger(0);
+    private  transient ICompactSerializer2<IColumn> columnSerializer_;
+    private transient AtomicBoolean isMarkedForDelete_;
+    private  AtomicInteger size_ = new AtomicInteger(0);
     private EfficientBidiMap columns_;
 
     private Comparator<IColumn> columnComparator_;
@@ -123,16 +122,20 @@
 
 		return columnComparator_;
 	}
+    
+    ColumnFamily()
+    {
+    }
 
-    public ColumnFamily(String cfName)
+    public ColumnFamily(String cf)
     {
-        name_ = cfName;
+        name_ = cf;
         createColumnFactoryAndColumnSerializer();
     }
 
-    public ColumnFamily(String cfName, String columnType)
+    public ColumnFamily(String cf, String columnType)
     {
-        this(cfName);
+        name_ = cf;
         createColumnFactoryAndColumnSerializer(columnType);
     }
 
@@ -165,7 +168,7 @@
     ColumnFamily cloneMe()
     {
     	ColumnFamily cf = new ColumnFamily(name_);
-    	cf.markedForDeleteAt = markedForDeleteAt;
+    	cf.isMarkedForDelete_ = isMarkedForDelete_;
     	cf.columns_ = columns_.cloneMe();
     	return cf;
     }
@@ -175,15 +178,18 @@
         return name_;
     }
 
-    /*
+    /**
      *  We need to go through each column
      *  in the column family and resolve it before adding
     */
     void addColumns(ColumnFamily cf)
     {
-        for (IColumn column : cf.getAllColumns())
+        Map<String, IColumn> columns = cf.getColumns();
+        Set<String> cNames = columns.keySet();
+
+        for ( String cName : cNames )
         {
-            addColumn(column);
+        	addColumn(cName, columns.get(cName));
         }
     }
 
@@ -193,9 +199,10 @@
     	return columnSerializer_;
     }
 
-    public void addColumn(String name)
+    public void createColumn(String name)
     {
-    	addColumn(columnFactory_.createColumn(name));
+    	IColumn column = columnFactory_.createColumn(name);
+    	addColumn(column.name(), column);
     }
 
     int getColumnCount()
@@ -204,7 +211,7 @@
     	Map<String, IColumn> columns = columns_.getColumns();
     	if( columns != null )
     	{
-    		if(!isSuper())
+    		if(!DatabaseDescriptor.getColumnType(name_).equals("Super"))
     		{
     			count = columns.size();
     		}
@@ -220,26 +227,17 @@
     	return count;
     }
 
-    public boolean isSuper()
-    {
-        return DatabaseDescriptor.getColumnType(name_).equals("Super");
-    }
-
-    public void addColumn(String name, byte[] value)
+    public void createColumn(String name, byte[] value)
     {
-    	addColumn(name, value, 0);
+    	IColumn column = columnFactory_.createColumn(name, value);
+    	addColumn(column.name(), column);
     }
 
-    public void addColumn(String name, byte[] value, long timestamp)
-    {
-        addColumn(name, value, timestamp, false);
-    }
-
-    public void addColumn(String name, byte[] value, long timestamp, boolean deleted)
+	public void createColumn(String name, byte[] value, long timestamp)
 	{
-		IColumn column = columnFactory_.createColumn(name, value, timestamp, deleted);
-		addColumn(column);
-    }
+		IColumn column = columnFactory_.createColumn(name, value, timestamp);
+		addColumn(column.name(), column);
+	}
 
     void clear()
     {
@@ -250,30 +248,29 @@
      * If we find an old column that has the same name
      * the ask it to resolve itself else add the new column .
     */
-    void addColumn(IColumn column)
+    void addColumn(String name, IColumn column)
     {
-        String name = column.name();
+    	int newSize = 0;
         IColumn oldColumn = columns_.get(name);
-        if (oldColumn != null)
+        if ( oldColumn != null )
         {
-            if (oldColumn instanceof SuperColumn)
+            int oldSize = oldColumn.size();
+            if( oldColumn.putColumn(column))
             {
-                int oldSize = oldColumn.size();
-                ((SuperColumn) oldColumn).putColumn(column);
-                size_.addAndGet(oldColumn.size() - oldSize);
+            	// This will never be called for super column as put column always returns false.
+                columns_.put(name, column);
+            	newSize = column.size();
             }
             else
             {
-                if (oldColumn.timestamp() <= column.timestamp())
-                {
-                    columns_.put(name, column);
-                    size_.addAndGet(column.size());
-                }
+            	newSize = oldColumn.size();
             }
+            size_.addAndGet(newSize - oldSize);
         }
         else
         {
-            size_.addAndGet(column.size());
+            newSize = column.size();
+            size_.addAndGet(newSize);
             columns_.put(name, column);
         }
     }
@@ -283,12 +280,12 @@
         return columns_.get( name );
     }
 
-    public SortedSet<IColumn> getAllColumns()
+    public Collection<IColumn> getAllColumns()
     {
         return columns_.getSortedColumns();
     }
 
-    public Map<String, IColumn> getColumns()
+    Map<String, IColumn> getColumns()
     {
         return columns_.getColumns();
     }
@@ -298,14 +295,17 @@
     	columns_.remove(columnName);
     }
 
-    void delete(long timestamp)
+    void delete()
     {
-        markedForDeleteAt = timestamp;
+        if ( isMarkedForDelete_ == null )
+            isMarkedForDelete_ = new AtomicBoolean(true);
+        else
+            isMarkedForDelete_.set(true);
     }
 
-    public boolean isMarkedForDelete()
+    boolean isMarkedForDelete()
     {
-        return markedForDeleteAt > Long.MIN_VALUE;
+        return ( ( isMarkedForDelete_ == null ) ? false : isMarkedForDelete_.get() );
     }
 
     /*
@@ -334,8 +334,28 @@
      */
     void repair(ColumnFamily columnFamily)
     {
-        for (IColumn column : columnFamily.getAllColumns()) {
-            addColumn(column);
+        Map<String, IColumn> columns = columnFamily.getColumns();
+        Set<String> cNames = columns.keySet();
+
+        for ( String cName : cNames )
+        {
+        	IColumn columnInternal = columns_.get(cName);
+        	IColumn columnExternal = columns.get(cName);
+
+        	if( columnInternal == null )
+        	{                
+        		if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Super")))
+        		{
+        			columnInternal = new SuperColumn(columnExternal.name());
+        			columns_.put(cName, columnInternal);
+        		}
+        		if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Standard")))
+        		{
+        			columnInternal = columnExternal;
+        			columns_.put(cName, columnInternal);
+        		}
+        	}
+       		columnInternal.repair(columnExternal);
         }
     }
 
@@ -357,14 +377,14 @@
         	IColumn columnExternal = columns.get(cName);
         	if( columnInternal == null )
         	{
-        		cfDiff.addColumn(columnExternal);
+        		cfDiff.addColumn(cName, columnExternal);
         	}
         	else
         	{
             	IColumn columnDiff = columnInternal.diff(columnExternal);
         		if(columnDiff != null)
         		{
-        			cfDiff.addColumn(columnDiff);
+        			cfDiff.addColumn(cName, columnDiff);
         		}
         	}
         }
@@ -403,174 +423,193 @@
     public String toString()
     {
     	StringBuilder sb = new StringBuilder();
-        sb.append("ColumnFamily(");
     	sb.append(name_);
+    	sb.append(":");
+    	sb.append(isMarkedForDelete());
+    	sb.append(":");
+    	Collection<IColumn> columns = getAllColumns();
+        sb.append(columns.size());
+        sb.append(":");
 
-        if (isMarkedForDelete()) {
-            sb.append(" -delete at " + getMarkedForDeleteAt() + "-");
+        for ( IColumn column : columns )
+        {
+            sb.append(column.toString());
         }
-
-    	sb.append(" [");
-        sb.append(StringUtils.join(getAllColumns(), ", "));
-        sb.append("])");
-
+        sb.append(":");
     	return sb.toString();
     }
 
     public byte[] digest()
     {
     	Set<IColumn> columns = columns_.getSortedColumns();
-    	byte[] xorHash = null;
+    	byte[] xorHash = new byte[0];
+    	byte[] tmpHash = new byte[0];
     	for(IColumn column : columns)
     	{
-    		if(xorHash == null)
+    		if(xorHash.length == 0)
     		{
     			xorHash = column.digest();
     		}
     		else
     		{
-                xorHash = FBUtilities.xor(xorHash, column.digest());
+    			tmpHash = column.digest();
+    			xorHash = FBUtilities.xor(xorHash, tmpHash);
     		}
     	}
     	return xorHash;
     }
+}
 
-    public long getMarkedForDeleteAt() {
-        return markedForDeleteAt;
+class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
+{
+	/*
+	 * We are going to create indexes, and write out that information as well. The format
+	 * of the data serialized is as follows.
+	 *
+	 * 1) Without indexes:
+     *  // written by the data
+	 * 	<boolean false (index is not present)>
+	 * 	<column family id>
+	 * 	<is marked for delete>
+	 * 	<total number of columns>
+	 * 	<columns data>
+
+	 * 	<boolean true (index is present)>
+	 *
+	 *  This part is written by the column indexer
+	 * 	<size of index in bytes>
+	 * 	<list of column names and their offsets relative to the first column>
+	 *
+	 *  <size of the cf in bytes>
+	 * 	<column family id>
+	 * 	<is marked for delete>
+	 * 	<total number of columns>
+	 * 	<columns data>
+	*/
+    public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+    {
+    	Collection<IColumn> columns = columnFamily.getAllColumns();
+
+        /* write the column family id */
+        dos.writeUTF(columnFamily.name());
+        /* write if this cf is marked for delete */
+        dos.writeBoolean(columnFamily.isMarkedForDelete());
+    	/* write the size is the number of columns */
+        dos.writeInt(columns.size());                    
+        /* write the column data */
+    	for ( IColumn column : columns )
+        {
+            columnFamily.getColumnSerializer().serialize(column, dos);            
+        }
     }
 
-    public static class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
+    /*
+     * Use this method to create a bare bones Column Family. This column family
+     * does not have any of the Column information.
+    */
+    private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
     {
-        /*
-         * We are going to create indexes, and write out that information as well. The format
-         * of the data serialized is as follows.
-         *
-         * 1) Without indexes:
-         *  // written by the data
-         * 	<boolean false (index is not present)>
-         * 	<column family id>
-         * 	<is marked for delete>
-         * 	<total number of columns>
-         * 	<columns data>
-
-         * 	<boolean true (index is present)>
-         *
-         *  This part is written by the column indexer
-         * 	<size of index in bytes>
-         * 	<list of column names and their offsets relative to the first column>
-         *
-         *  <size of the cf in bytes>
-         * 	<column family id>
-         * 	<is marked for delete>
-         * 	<total number of columns>
-         * 	<columns data>
-        */
-        public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
-        {
-            Collection<IColumn> columns = columnFamily.getAllColumns();
-
-            /* write the column family id */
-            dos.writeUTF(columnFamily.name());
-            /* write if this cf is marked for delete */
-            dos.writeLong(columnFamily.getMarkedForDeleteAt());
-
-            /* write the size is the number of columns */
-            dos.writeInt(columns.size());
+        String name = dis.readUTF();
+        boolean delete = dis.readBoolean();
+        ColumnFamily cf = new ColumnFamily(name);
+        if ( delete )
+            cf.delete();
+        return cf;
+    }
 
-            /* write the column data */
-            for ( IColumn column : columns )
-            {
-                columnFamily.getColumnSerializer().serialize(column, dos);
-            }
+    /*
+     * This method fills the Column Family object with the column information
+     * from the DataInputStream. The "items" parameter tells us whether we need
+     * all the columns or just a subset of all the Columns that make up the
+     * Column Family. If "items" is -1 then we need all the columns if not we
+     * deserialize only as many columns as indicated by the "items" parameter.
+    */
+    private void fillColumnFamily(ColumnFamily cf,  DataInputStream dis) throws IOException
+    {
+        int size = dis.readInt();        	        	
+    	IColumn column = null;           
+        for ( int i = 0; i < size; ++i )
+        {
+        	column = cf.getColumnSerializer().deserialize(dis);
+        	if(column != null)
+        	{
+        		cf.addColumn(column.name(), column);
+        	}
         }
+    }
 
-        /*
-         * Use this method to create a bare bones Column Family. This column family
-         * does not have any of the Column information.
-        */
-        private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
-        {
-            String name = dis.readUTF();
-            ColumnFamily cf = new ColumnFamily(name);
-            cf.delete(dis.readLong());
-            return cf;
-        }
-
-        public ColumnFamily deserialize(DataInputStream dis) throws IOException
-        {
-            ColumnFamily cf = defreezeColumnFamily(dis);
-            int size = dis.readInt();
-            IColumn column = null;
-            for ( int i = 0; i < size; ++i )
-            {
-                column = cf.getColumnSerializer().deserialize(dis);
-                if(column != null)
-                {
-                    cf.addColumn(column);
-                }
-            }
-            return cf;
-        }
+    public ColumnFamily deserialize(DataInputStream dis) throws IOException
+    {       
+        ColumnFamily cf = defreezeColumnFamily(dis);
+        if ( !cf.isMarkedForDelete() )
+            fillColumnFamily(cf,dis);
+        return cf;
+    }
 
-        /*
-         * This version of deserialize is used when we need a specific set if columns for
-         * a column family specified in the name cfName parameter.
-        */
-        public ColumnFamily deserialize(DataInputStream dis, IFilter filter) throws IOException
-        {
-            ColumnFamily cf = defreezeColumnFamily(dis);
-            int size = dis.readInt();
-            IColumn column = null;
+    /*
+     * This version of deserialize is used when we need a specific set if columns for
+     * a column family specified in the name cfName parameter.
+    */
+    public ColumnFamily deserialize(DataInputStream dis, IFilter filter) throws IOException
+    {        
+        ColumnFamily cf = defreezeColumnFamily(dis);
+        if ( !cf.isMarkedForDelete() )
+        {
+            int size = dis.readInt();        	        	
+        	IColumn column = null;
             for ( int i = 0; i < size; ++i )
             {
-                column = cf.getColumnSerializer().deserialize(dis, filter);
-                if(column != null)
-                {
-                    cf.addColumn(column);
-                    column = null;
-                    if(filter.isDone())
-                    {
-                        break;
-                    }
-                }
+            	column = cf.getColumnSerializer().deserialize(dis, filter);
+            	if(column != null)
+            	{
+            		cf.addColumn(column.name(), column);
+            		column = null;
+            		if(filter.isDone())
+            		{
+            			break;
+            		}
+            	}
             }
-            return cf;
         }
+        return cf;
+    }
 
-        /*
-         * Deserialize a particular column or super column or the entire columnfamily given a : seprated name
-         * name could be of the form cf:superColumn:column  or cf:column or cf
-         */
-        public ColumnFamily deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
-        {
-            String[] names = RowMutation.getColumnAndColumnFamily(name);
-            String columnName = "";
-            if ( names.length == 1 )
-                return deserialize(dis, filter);
-            if( names.length == 2 )
-                columnName = names[1];
-            if( names.length == 3 )
-                columnName = names[1]+ ":" + names[2];
+    /*
+     * Deserialize a particular column or super column or the entire columnfamily given a : seprated name
+     * name could be of the form cf:superColumn:column  or cf:column or cf
+     */
+    public ColumnFamily deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
+    {        
+        String[] names = RowMutation.getColumnAndColumnFamily(name);
+        String columnName = "";
+        if ( names.length == 1 )
+            return deserialize(dis, filter);
+        if( names.length == 2 )
+            columnName = names[1];
+        if( names.length == 3 )
+            columnName = names[1]+ ":" + names[2];
 
-            ColumnFamily cf = defreezeColumnFamily(dis);
+        ColumnFamily cf = defreezeColumnFamily(dis);
+        if ( !cf.isMarkedForDelete() )
+        {
             /* read the number of columns */
-            int size = dis.readInt();
+            int size = dis.readInt();            
             for ( int i = 0; i < size; ++i )
             {
-                IColumn column = cf.getColumnSerializer().deserialize(dis, columnName, filter);
-                if ( column != null )
-                {
-                    cf.addColumn(column);
-                    break;
-                }
+	            IColumn column = cf.getColumnSerializer().deserialize(dis, columnName, filter);
+	            if ( column != null )
+	            {
+	                cf.addColumn(column.name(), column);
+	                break;
+	            }
             }
-            return cf;
         }
+        return cf;
+    }
 
-        public void skip(DataInputStream dis) throws IOException
-        {
-            throw new UnsupportedOperationException("This operation is not yet supported.");
-        }
+    public void skip(DataInputStream dis) throws IOException
+    {
+        throw new UnsupportedOperationException("This operation is not yet supported.");
     }
-}
 
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 05:39:40 2009
@@ -18,27 +18,18 @@
 
 package org.apache.cassandra.db;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.StringTokenizer;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.DataInputBuffer;
@@ -47,10 +38,16 @@
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.io.SequenceFile;
 import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.PartitionerType;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -80,7 +77,7 @@
     private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
 
     /* Flag indicates if a compaction is in process */
-    private AtomicBoolean isCompacting_ = new AtomicBoolean(false);
+    public AtomicBoolean isCompacting_ = new AtomicBoolean(false);
 
     ColumnFamilyStore(String table, String columnFamily) throws IOException
     {
@@ -130,7 +127,7 @@
             for (File file : files)
             {
                 String filename = file.getName();
-                if(((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_)) ) && (filename.contains(columnFamily_)))
+                if(((file.length() == 0) || (filename.indexOf("-" + SSTable.temporaryFile_) != -1) ) && (filename.indexOf(columnFamily_) != -1))
                 {
                 	file.delete();
                 	continue;
@@ -139,7 +136,7 @@
                 String[] tblCfName = getTableAndColumnFamilyName(filename);
                 if (tblCfName[0].equals(table_)
                         && tblCfName[1].equals(columnFamily_)
-                        && filename.contains("-Data.db"))
+                        && filename.indexOf("-Data.db") != -1)
                 {
                     ssTables.add(file.getAbsoluteFile());
                 }
@@ -177,7 +174,7 @@
      * disk and the total space oocupied by the data files
      * associated with this Column Family.
     */
-    public String cfStats(String newLineSeparator)
+    public String cfStats(String newLineSeparator, java.text.DecimalFormat df)
     {
         StringBuilder sb = new StringBuilder();
         /*
@@ -260,7 +257,7 @@
     	if( ranges != null)
     		futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
     	else
-    		MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, skip);
+    		MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, ranges, skip);
     	
         boolean result = true;
         try
@@ -333,7 +330,8 @@
     {
     	// Psuedo increment so that we do not generate consecutive numbers 
     	fileIndexGenerator_.incrementAndGet();
-        return table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
+        String name = table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
+        return name;
     }
 
     /*
@@ -343,7 +341,8 @@
     {
     	// Psuedo increment so that we do not generate consecutive numbers 
     	fileIndexGenerator_.incrementAndGet();
-        return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet();
+        String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet() ;
+        return name;
     }
 
     /*
@@ -364,8 +363,9 @@
     	lowestIndex = getIndexFromFileName(files.get(0));
    		
    		index = lowestIndex + 1 ;
-
-        return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index;
+    	
+        String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index ;
+        return name;
     }
 
     
@@ -383,6 +383,14 @@
     }
 
     /*
+     * This version is used when we forceflush.
+     */
+    void switchMemtable() throws IOException
+    {
+        memtable_.set( new Memtable(table_, columnFamily_) );
+    }
+
+    /*
      * This version is used only on start up when we are recovering from logs.
      * In the future we may want to parellelize the log processing for a table
      * by having a thread per log file present for recovery. Re-visit at that
@@ -394,14 +402,14 @@
         binaryMemtable_.get().put(key, buffer);
     }
 
-    void forceFlush() throws IOException
+    void forceFlush(boolean fRecovery) throws IOException
     {
         //MemtableManager.instance().submit(getColumnFamilyName(), memtable_.get() , CommitLog.CommitLogContext.NULL);
         //memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
-        memtable_.get().forceflush(this);
+        memtable_.get().forceflush(this, fRecovery);
     }
 
-    void forceFlushBinary()
+    void forceFlushBinary() throws IOException
     {
         BinaryMemtableManager.instance().submit(getColumnFamilyName(), binaryMemtable_.get());
         //binaryMemtable_.get().flush(true);
@@ -430,36 +438,66 @@
         binaryMemtable_.get().put(key, buffer);
     }
 
-    public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter) throws IOException
-    {
-        List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
-        return resolveAndRemoveDeleted(columnFamilies);
-    }
-
     /**
      *
      * Get the column family in the most efficient order.
      * 1. Memtable
      * 2. Sorted list of files
      */
-    List<ColumnFamily> getColumnFamilies(String key, String columnFamilyColumn, IFilter filter) throws IOException
+    public ColumnFamily getColumnFamily(String key, String cf, IFilter filter) throws IOException
     {
-        List<ColumnFamily> columnFamilies1 = new ArrayList<ColumnFamily>();
+    	List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+    	ColumnFamily columnFamily = null;
+    	long start = System.currentTimeMillis();
         /* Get the ColumnFamily from Memtable */
-        getColumnFamilyFromCurrentMemtable(key, columnFamilyColumn, filter, columnFamilies1);
-        if (columnFamilies1.size() == 0 || !filter.isDone())
+    	getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
+        if(columnFamilies.size() != 0)
         {
-            /* Check if MemtableManager has any historical information */
-            MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies1);
+	        if(filter.isDone())
+	        	return columnFamilies.get(0);
         }
-        List<ColumnFamily> columnFamilies = columnFamilies1;
-        if (columnFamilies.size() == 0 || !filter.isDone())
+        /* Check if MemtableManager has any historical information */
+        MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
+        if(columnFamilies.size() != 0)
+        {
+        	columnFamily = resolve(columnFamilies);
+	        if(filter.isDone())
+	        	return columnFamily;
+	        columnFamilies.clear();
+	        columnFamilies.add(columnFamily);
+        }
+        getColumnFamilyFromDisk(key, cf, columnFamilies, filter);
+        logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start)
+                + " ms.");
+        columnFamily = resolve(columnFamilies);
+       
+        return columnFamily;
+    }
+    
+    public ColumnFamily getColumnFamilyFromMemory(String key, String cf, IFilter filter) 
+    {
+        List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+        ColumnFamily columnFamily = null;
+        long start = System.currentTimeMillis();
+        /* Get the ColumnFamily from Memtable */
+        getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
+        if(columnFamilies.size() != 0)
         {
-            long start = System.currentTimeMillis();
-            getColumnFamilyFromDisk(key, columnFamilyColumn, columnFamilies, filter);
-            logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start) + " ms.");
+            if(filter.isDone())
+                return columnFamilies.get(0);
+        }
+        /* Check if MemtableManager has any historical information */
+        MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
+        if(columnFamilies.size() != 0)
+        {
+            columnFamily = resolve(columnFamilies);
+            if(filter.isDone())
+                return columnFamily;
+            columnFamilies.clear();
+            columnFamilies.add(columnFamily);
         }
-        return columnFamilies;
+        columnFamily = resolve(columnFamilies);
+        return columnFamily;
     }
 
     /**
@@ -499,14 +537,33 @@
             long start = System.currentTimeMillis();
             if (columnFamily != null)
             {
+	            /*
+	             * TODO
+	             * By using the filter before removing deleted columns 
+	             * we have a efficient implementation of timefilter 
+	             * but for count filter this can return wrong results 
+	             * we need to take care of that later.
+	             */
+                /* suppress columns marked for delete */
+                Map<String, IColumn> columns = columnFamily.getColumns();
+                Set<String> cNames = columns.keySet();
+
+                for (String cName : cNames)
+                {
+                    IColumn column = columns.get(cName);
+                    if (column.isMarkedForDelete())
+                        columns.remove(cName);
+                }
                 columnFamilies.add(columnFamily);
                 if(filter.isDone())
                 {
                 	break;
                 }
             }
-            logger_.debug("DISK Data structure population  TIME: " + (System.currentTimeMillis() - start) + " ms.");
+            logger_.debug("DISK Data structure population  TIME: " + (System.currentTimeMillis() - start)
+                    + " ms.");
         }
+        files.clear();  	
     }
 
 
@@ -520,11 +577,12 @@
 		if (bufIn.getLength() == 0)
 			return null;
         start = System.currentTimeMillis();
-        ColumnFamily columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
+        ColumnFamily columnFamily = null;
+       	columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
 		logger_.debug("DISK Deserialize TIME: " + (System.currentTimeMillis() - start) + " ms.");
 		if (columnFamily == null)
-			return null;
-		return columnFamily;
+			return columnFamily;
+		return (!columnFamily.isMarkedForDelete()) ? columnFamily : null;
 	}
 
     private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
@@ -533,66 +591,20 @@
         ColumnFamily columnFamily = memtable_.get().get(key, cf, filter);
         if (columnFamily != null)
         {
-            columnFamilies.add(columnFamily);
+            if (!columnFamily.isMarkedForDelete())
+                columnFamilies.add(columnFamily);
         }
     }
     
-    /** merge all columnFamilies into a single instance, with only the newest versions of columns preserved. */
-    static ColumnFamily resolve(List<ColumnFamily> columnFamilies)
+    private ColumnFamily resolve(List<ColumnFamily> columnFamilies)
     {
         int size = columnFamilies.size();
         if (size == 0)
-            return null;
-
-        // start from nothing so that we don't include potential deleted columns from the first instance
-        String cfname = columnFamilies.get(0).name();
-        ColumnFamily cf = new ColumnFamily(cfname);
-
-        // merge
-        for (ColumnFamily cf2 : columnFamilies)
-        {
-            assert cf.name().equals(cf2.name());
-            cf.addColumns(cf2);
-            cf.delete(Math.max(cf.getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
-        }
-        return cf;
-    }
-
-    /** like resolve, but leaves the resolved CF as the only item in the list */
-    private static void merge(List<ColumnFamily> columnFamilies)
-    {
-        ColumnFamily cf = resolve(columnFamilies);
-        columnFamilies.clear();
-        columnFamilies.add(cf);
-    }
-
-    private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily> columnFamilies) {
-        ColumnFamily cf = resolve(columnFamilies);
-        return removeDeleted(cf);
-    }
-
-    static ColumnFamily removeDeleted(ColumnFamily cf) {
-        if (cf == null) {
-            return null;
-        }
-        for (String cname : new ArrayList<String>(cf.getColumns().keySet())) {
-            IColumn c = cf.getColumns().get(cname);
-            if (c instanceof SuperColumn) {
-                long min_timestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
-                // don't operate directly on the supercolumn, it could be the one in the memtable
-                cf.remove(cname);
-                IColumn sc = new SuperColumn(cname);
-                for (IColumn subColumn : c.getSubColumns()) {
-                    if (!subColumn.isMarkedForDelete() && subColumn.timestamp() >= min_timestamp) {
-                        sc.addColumn(subColumn.name(), subColumn);
-                    }
-                }
-                if (sc.getSubColumns().size() > 0) {
-                    cf.addColumn(sc);
-                }
-            } else if (c.isMarkedForDelete() || c.timestamp() < cf.getMarkedForDeleteAt()) {
-                cf.remove(cname);
-            }
+            return null;        
+        ColumnFamily cf = columnFamilies.get(0);
+        for ( int i = 1; i < size ; ++i )
+        {
+            cf.addColumns(columnFamilies.get(i));
         }
         return cf;
     }
@@ -605,7 +617,19 @@
      */
     void applyNow(String key, ColumnFamily columnFamily) throws IOException
     {
-         memtable_.get().putOnRecovery(key, columnFamily);
+        if (!columnFamily.isMarkedForDelete())
+            memtable_.get().putOnRecovery(key, columnFamily);
+    }
+
+    /*
+     * Delete doesn't mean we can blindly delete. We need to write this to disk
+     * as being marked for delete. This is to prevent a previous value from
+     * resuscitating a column family that has been deleted.
+     */
+    void delete(String key, ColumnFamily columnFamily)
+            throws IOException
+    {
+        memtable_.get().remove(key, columnFamily);
     }
 
     /*
@@ -629,7 +653,7 @@
      * param @ filename - filename just flushed to disk
      * param @ bf - bloom filter which indicates the keys that are in this file.
     */
-    void storeLocation(String filename, BloomFilter bf)
+    void storeLocation(String filename, BloomFilter bf) throws IOException
     {
         boolean doCompaction = false;
         int ssTableSize = 0;
@@ -664,7 +688,7 @@
         }
     }
 
-    PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
+    PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize) throws IOException
     {
         PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
         if (files.size() > 1 || (ranges != null &&  files.size() > 0))
@@ -675,9 +699,13 @@
             {
             	try
             	{
-            		fs = new FileStruct(SequenceFile.bufferedReader(file, bufferSize));
-	                fs.getNextKey();
-	                if(fs.isExhausted())
+            		fs = new FileStruct();
+	                fs.bufIn_ = new DataInputBuffer();
+	                fs.bufOut_ = new DataOutputBuffer();
+	                fs.reader_ = SequenceFile.bufferedReader(file, bufferSize);                    
+	                fs.key_ = null;
+	                fs = getNextKey(fs);
+	                if(fs == null)
 	                	continue;
 	                pq.add(fs);
             	}
@@ -686,16 +714,17 @@
             		ex.printStackTrace();
             		try
             		{
-            			if (fs != null)
+            			if(fs != null)
             			{
-            				fs.close();
+            				fs.reader_.close();
             			}
             		}
             		catch(Exception e)
             		{
             			logger_.warn("Unable to close file :" + file);
             		}
-                }
+                    continue;
+            	}
             }
         }
         return pq;
@@ -747,7 +776,7 @@
     /*
      * Break the files into buckets and then compact.
      */
-    void doCompaction()
+    void doCompaction()  throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
@@ -789,6 +818,7 @@
         {
         	isCompacting_.set(false);
         }
+        return;
     }
 
     void doMajorCompaction(long skip)  throws IOException
@@ -796,13 +826,18 @@
     	doMajorCompactionInternal( skip );
     }
 
+    void doMajorCompaction()  throws IOException
+    {
+    	doMajorCompactionInternal( 0 );
+    }
+    
     /*
      * Compact all the files irrespective of the size.
      * skip : is the ammount in Gb of the files to be skipped
      * all files greater than skip GB are skipped for this compaction.
      * Except if skip is 0 , in that case this is ignored and all files are taken.
      */
-    void doMajorCompactionInternal(long skip)
+    void doMajorCompactionInternal(long skip)  throws IOException
     {
         isCompacting_.set(true);
         List<String> filesInternal = new ArrayList<String>(ssTables_);
@@ -835,6 +870,7 @@
         {
         	isCompacting_.set(false);
         }
+        return ;
     }
 
     /*
@@ -872,14 +908,41 @@
     	return maxFile;
     }
 
-    boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
+    Range getMaxRange( List<Range> ranges )
+    {
+    	Range maxRange = new Range( BigInteger.ZERO, BigInteger.ZERO );
+    	for( Range range : ranges)
+    	{
+    		if( range.left().compareTo(maxRange.left()) > 0 )
+    		{
+    			maxRange = range;
+    		}
+    	}
+    	return maxRange;
+    }
+
+    boolean isLoopAround ( List<Range> ranges )
+    {
+    	boolean isLoop = false;
+    	for( Range range : ranges)
+    	{
+    		if( range.left().compareTo(range.right()) > 0 )
+    		{
+    			isLoop = true;
+    			break;
+    		}
+    	}
+    	return isLoop;
+    }
+
+    boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
         boolean result = true;
         try
         {
-        	 result = doFileAntiCompaction(files, ranges, target, fileList, null);
+        	 result = doFileAntiCompaction(files, ranges, target, bufSize_, fileList, null);
         }
         catch ( Exception ex)
         {
@@ -893,6 +956,38 @@
 
     }
 
+    /*
+     * Read the next key from the data file , this fn will skip teh block index
+     * and read teh next available key into the filestruct that is passed.
+     * If it cannot read or a end of file is reached it will return null.
+     */
+    FileStruct getNextKey(FileStruct filestruct) throws IOException
+    {
+        filestruct.bufOut_.reset();
+        if (filestruct.reader_.isEOF())
+        {
+            filestruct.reader_.close();
+            return null;
+        }
+        
+        long bytesread = filestruct.reader_.next(filestruct.bufOut_);
+        if (bytesread == -1)
+        {
+            filestruct.reader_.close();
+            return null;
+        }
+
+        filestruct.bufIn_.reset(filestruct.bufOut_.getData(), filestruct.bufOut_.getLength());
+        filestruct.key_ = filestruct.bufIn_.readUTF();
+        /* If the key we read is the Block Index Key then we are done reading the keys so exit */
+        if ( filestruct.key_.equals(SSTable.blockIndexKey_) )
+        {
+            filestruct.reader_.close();
+            return null;
+        }
+        return filestruct;
+    }
+
     void forceCleanup()
     {
     	MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
@@ -903,7 +998,7 @@
      * and only keeps keys that this node is responsible for.
      * @throws IOException
      */
-    void doCleanupCompaction()
+    void doCleanupCompaction() throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
@@ -937,7 +1032,7 @@
     	Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
     	myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
     	List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
-        doFileAntiCompaction(files, myRanges, null, newFiles, compactedBloomFilters);
+        doFileAntiCompaction(files, myRanges, null, bufSize_, newFiles, compactedBloomFilters);
         logger_.debug("Original file : " + file + " of size " + new File(file).length());
         lock_.writeLock().lock();
         try
@@ -968,11 +1063,12 @@
      * @param files
      * @param ranges
      * @param target
+     * @param minBufferSize
      * @param fileList
      * @return
      * @throws IOException
      */
-    boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters)
+    boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, int minBufferSize, List<String> fileList, List<BloomFilter> compactedBloomFilters) throws IOException
     {
     	boolean result = false;
         long startTime = System.currentTimeMillis();
@@ -998,7 +1094,7 @@
 	                    + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
 	            return result;
 	        }
-	        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.bufSize_);
+	        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, minBufferSize);
 	        if (pq.size() > 0)
 	        {
 	            mergedFileName = getTempFileName();
@@ -1021,11 +1117,11 @@
 	                    fs = pq.poll();
 	                }
 	                if (fs != null
-	                        && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
+	                        && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
 	                {
 	                    // The keys are the same so we need to add this to the
 	                    // ldfs list
-	                    lastkey = fs.getKey();
+	                    lastkey = fs.key_;
 	                    lfs.add(fs);
 	                }
 	                else
@@ -1040,30 +1136,38 @@
 		                    	try
 		                    	{
 	                                /* read the length although we don't need it */
-	                                filestruct.getBufIn().readInt();
+	                                filestruct.bufIn_.readInt();
 	                                // Skip the Index
-                                    IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+                                    IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
 	                                // We want to add only 2 and resolve them right there in order to save on memory footprint
 	                                if(columnFamilies.size() > 1)
 	                                {
 	    		                        // Now merge the 2 column families
-                                        merge(columnFamilies);
+	    			                    columnFamily = resolve(columnFamilies);
+	    			                    columnFamilies.clear();
+	    			                    if( columnFamily != null)
+	    			                    {
+		    			                    // add the merged columnfamily back to the list
+		    			                    columnFamilies.add(columnFamily);
+	    			                    }
+
 	                                }
 			                        // deserialize into column families
-			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
 		                    	}
 		                    	catch ( Exception ex)
 		                    	{
                                     logger_.warn(LogUtil.throwableToString(ex));
-                                }
+		                            continue;
+		                    	}
 		                    }
 		                    // Now after merging all crap append to the sstable
-		                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
+		                    columnFamily = resolve(columnFamilies);
 		                    columnFamilies.clear();
 		                    if( columnFamily != null )
 		                    {
 			                	/* serialize the cf with column indexes */
-			                    ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+			                    ColumnFamily.serializer2().serialize(columnFamily, bufOut);
 		                    }
 	                    }
 	                    else
@@ -1072,17 +1176,17 @@
 	                    	try
 	                    	{
 		                        /* read the length although we don't need it */
-		                        int size = filestruct.getBufIn().readInt();
-		                        bufOut.write(filestruct.getBufIn(), size);
+		                        int size = filestruct.bufIn_.readInt();
+		                        bufOut.write(filestruct.bufIn_, size);
 	                    	}
 	                    	catch ( Exception ex)
 	                    	{
 	                    		logger_.warn(LogUtil.throwableToString(ex));
-	                            filestruct.close();
+	                            filestruct.reader_.close();
 	                            continue;
 	                    	}
 	                    }
-	                    if ( Range.isKeyInRanges(lastkey, ranges) )
+	                    if ( Range.isKeyInRanges(ranges, lastkey) )
 	                    {
 	                        if(ssTableRange == null )
 	                        {
@@ -1106,28 +1210,28 @@
 	                    {
 	                    	try
 	                    	{
-                                filestruct.getNextKey();
-	                    		if (filestruct.isExhausted())
+	                    		filestruct = getNextKey	( filestruct );
+	                    		if(filestruct == null)
 	                    		{
 	                    			continue;
 	                    		}
 	                    		/* keep on looping until we find a key in the range */
-	                            while ( !Range.isKeyInRanges(filestruct.getKey(), ranges) )
+	                            while ( !Range.isKeyInRanges(ranges, filestruct.key_ ) )
 	                            {
-                                    filestruct.getNextKey();
-                                    if (filestruct.isExhausted())
+		                    		filestruct = getNextKey	( filestruct );
+		                    		if(filestruct == null)
 		                    		{
 		                    			break;
 		                    		}
 	        	                    /* check if we need to continue , if we are done with ranges empty the queue and close all file handles and exit */
-	        	                    //if( !isLoop && StorageService.token(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
+	        	                    //if( !isLoop && StorageService.hash(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
 	        	                    //{
 	                                    //filestruct.reader.close();
 	                                    //filestruct = null;
 	                                    //break;
 	        	                    //}
 	                            }
-	                            if (!filestruct.isExhausted())
+	                            if ( filestruct != null)
 	                            {
 	                            	pq.add(filestruct);
 	                            }
@@ -1139,8 +1243,9 @@
 	                    		// in any case we have read as far as possible from it
 	                    		// and it will be deleted after compaction.
                                 logger_.warn(LogUtil.throwableToString(ex));
-	                            filestruct.close();
-                            }
+	                            filestruct.reader_.close();
+	                            continue;
+	                    	}
 	                    }
 	                    lfs.clear();
 	                    lastkey = null;
@@ -1173,10 +1278,39 @@
                 + totalBytesWritten + "   Total keys read ..." + totalkeysRead);
         return result;
     }
-
-    private void doFill(BloomFilter bf, String decoratedKey)
+    
+    private void doWrite(SSTable ssTable, String key, DataOutputBuffer bufOut) throws IOException
     {
-        bf.fill(StorageService.getPartitioner().undecorateKey(decoratedKey));
+    	PartitionerType pType = StorageService.getPartitionerType();    	
+    	switch ( pType )
+    	{
+    		case OPHF:
+    			ssTable.append(key, bufOut);
+    			break;
+    			
+    	    default:
+    	    	String[] peices = key.split(":");
+    	    	key = peices[1];
+    	    	BigInteger hash = new BigInteger(peices[0]);
+    	    	ssTable.append(key, hash, bufOut);
+    	    	break;
+    	}
+    }
+    
+    private void doFill(BloomFilter bf, String key)
+    {
+    	PartitionerType pType = StorageService.getPartitionerType();    	
+    	switch ( pType )
+    	{
+    		case OPHF:
+    			bf.fill(key);
+    			break;
+    			
+    	    default:
+    	    	String[] peices = key.split(":");    	    	
+    	    	bf.fill(peices[1]);
+    	    	break;
+    	}
     }
     
     /*
@@ -1190,7 +1324,7 @@
      * to get the latest data.
      *
      */
-    void  doFileCompaction(List<String> files,  int minBufferSize)
+    void  doFileCompaction(List<String> files,  int minBufferSize) throws IOException
     {
     	String newfile = null;
         long startTime = System.currentTimeMillis();
@@ -1234,11 +1368,11 @@
 	                    fs = pq.poll();                        
 	                }
 	                if (fs != null
-	                        && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
+	                        && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
 	                {
 	                    // The keys are the same so we need to add this to the
 	                    // ldfs list
-	                    lastkey = fs.getKey();
+	                    lastkey = fs.key_;
 	                    lfs.add(fs);
 	                }
 	                else
@@ -1253,29 +1387,37 @@
 		                    	try
 		                    	{
 	                                /* read the length although we don't need it */
-	                                filestruct.getBufIn().readInt();
+	                                filestruct.bufIn_.readInt();
 	                                // Skip the Index
-                                    IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+                                    IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
 	                                // We want to add only 2 and resolve them right there in order to save on memory footprint
 	                                if(columnFamilies.size() > 1)
 	                                {
-	    		                        merge(columnFamilies);
+	    		                        // Now merge the 2 column families
+	    			                    columnFamily = resolve(columnFamilies);
+	    			                    columnFamilies.clear();
+	    			                    if( columnFamily != null)
+	    			                    {
+		    			                    // add the merged columnfamily back to the list
+		    			                    columnFamilies.add(columnFamily);
+	    			                    }
+
 	                                }
 			                        // deserialize into column families                                    
-			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
 		                    	}
 		                    	catch ( Exception ex)
-		                    	{
-                                    logger_.warn("error in filecompaction", ex);
-                                }
+		                    	{                                    		                    		
+		                            continue;
+		                    	}
 		                    }
 		                    // Now after merging all crap append to the sstable
-		                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
+		                    columnFamily = resolve(columnFamilies);
 		                    columnFamilies.clear();
 		                    if( columnFamily != null )
 		                    {
 			                	/* serialize the cf with column indexes */
-			                    ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+			                    ColumnFamily.serializer2().serialize(columnFamily, bufOut);
 		                    }
 	                    }
 	                    else
@@ -1284,23 +1426,24 @@
 	                    	try
 	                    	{
 		                        /* read the length although we don't need it */
-		                        int size = filestruct.getBufIn().readInt();
-		                        bufOut.write(filestruct.getBufIn(), size);
+		                        int size = filestruct.bufIn_.readInt();
+		                        bufOut.write(filestruct.bufIn_, size);
 	                    	}
 	                    	catch ( Exception ex)
 	                    	{
 	                    		ex.printStackTrace();
-	                            filestruct.close();
+	                            filestruct.reader_.close();
 	                            continue;
 	                    	}
 	                    }
 	                    	         
 	                    if ( ssTable == null )
 	                    {
-	                    	ssTable = new SSTable(compactionFileLocation, mergedFileName);	                    	
+	                    	PartitionerType pType = StorageService.getPartitionerType();
+	                    	ssTable = new SSTable(compactionFileLocation, mergedFileName, pType);	                    	
 	                    }
-                        ssTable.append(lastkey, bufOut);
-
+	                    doWrite(ssTable, lastkey, bufOut);	                 
+	                    
                         /* Fill the bloom filter with the key */
 	                    doFill(compactedBloomFilter, lastkey);                        
 	                    totalkeysWritten++;
@@ -1308,8 +1451,8 @@
 	                    {
 	                    	try
 	                    	{
-                                filestruct.getNextKey();
-	                    		if (filestruct.isExhausted())
+	                    		filestruct = getNextKey(filestruct);
+	                    		if(filestruct == null)
 	                    		{
 	                    			continue;
 	                    		}
@@ -1321,8 +1464,9 @@
 	                    		// Ignore the exception as it might be a corrupted file
 	                    		// in any case we have read as far as possible from it
 	                    		// and it will be deleted after compaction.
-	                            filestruct.close();
-                            }
+	                            filestruct.reader_.close();
+	                            continue;
+	                    	}
 	                    }
 	                    lfs.clear();
 	                    lastkey = null;
@@ -1373,25 +1517,6 @@
         logger_.debug("Total bytes Read for compaction  ..." + totalBytesRead);
         logger_.debug("Total bytes written for compaction  ..."
                 + totalBytesWritten + "   Total keys read ..." + totalkeysRead);
-    }
-
-    public boolean isSuper()
-    {
-        return DatabaseDescriptor.getColumnType(getColumnFamilyName()).equals("Super");
-    }
-
-    public void flushMemtableOnRecovery() throws IOException
-    {
-        memtable_.get().flushOnRecovery();
-    }
-
-    public Object getMemtable()
-    {
-        return memtable_.get();
-    }
-
-    public Set<String> getSSTableFilenames()
-    {
-        return Collections.unmodifiableSet(ssTables_);
+        return;
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java Fri Mar 27 05:39:40 2009
@@ -21,18 +21,22 @@
 import java.io.*;
 import java.util.*;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.IFileReader;
 import org.apache.cassandra.io.IFileWriter;
 import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
 
 /*
  * Commit Log tracks every write operation into the system. The aim
@@ -372,7 +376,7 @@
                     try
                     {                        
                         Row row = Row.serializer().deserialize(bufIn);
-                        Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>(row.getColumnFamilyMap());
+                        Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>(row.getColumnFamilies());
                         /* remove column families that have already been flushed */
                     	Set<String> cNames = columnFamilies.keySet();
 
@@ -419,7 +423,7 @@
     */
     private void updateHeader(Row row) throws IOException
     {
-    	Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+    	Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
         Table table = Table.open(table_);
         Set<String> cNames = columnFamilies.keySet();
         for ( String cName : cNames )
@@ -626,6 +630,9 @@
     public static void main(String[] args) throws Throwable
     {
         LogUtil.init();
+
+        // the return value is not used in this case
+        DatabaseDescriptor.init();
         
         File logDir = new File(DatabaseDescriptor.getLogFileLocation());
         File[] files = logDir.listFiles();