You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:39:42 UTC

svn commit: r759026 [3/3] - /incubator/cassandra/trunk/src/org/apache/cassandra/db/

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java Fri Mar 27 05:39:40 2009
@@ -18,19 +18,21 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.Set;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -39,17 +41,22 @@
 public final class SuperColumn implements IColumn, Serializable
 {
 	private static Logger logger_ = Logger.getLogger(SuperColumn.class);
-	private static SuperColumnSerializer serializer_ = new SuperColumnSerializer();
+	private static ICompactSerializer2<IColumn> serializer_;
 	private final static String seperator_ = ":";
 
-    static SuperColumnSerializer serializer()
+    static
+    {
+        serializer_ = new SuperColumnSerializer();
+    }
+
+    static ICompactSerializer2<IColumn> serializer()
     {
         return serializer_;
     }
 
 	private String name_;
     private EfficientBidiMap columns_ = new EfficientBidiMap(ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP));
-	private long markedForDeleteAt = Long.MIN_VALUE;
+	private AtomicBoolean isMarkedForDelete_ = new AtomicBoolean(false);
     private AtomicInteger size_ = new AtomicInteger(0);
 
     SuperColumn()
@@ -63,7 +70,7 @@
 
 	public boolean isMarkedForDelete()
 	{
-		return markedForDeleteAt > Long.MIN_VALUE;
+		return isMarkedForDelete_.get();
 	}
 
     public String name()
@@ -76,11 +83,12 @@
     	return columns_.getSortedColumns();
     }
 
-    public IColumn getSubColumn(String columnName)
+    public IColumn getSubColumn( String columnName )
     {
-        IColumn column = columns_.get(columnName);
-        assert column instanceof Column;
-        return column;
+    	IColumn column = columns_.get(columnName);
+    	if ( column instanceof SuperColumn )
+    		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
+		return column;
     }
 
     public int compareTo(IColumn superColumn)
@@ -140,7 +148,7 @@
         return size;
     }
 
-    public void remove(String columnName)
+    protected void remove(String columnName)
     {
     	columns_.remove(columnName);
     }
@@ -168,7 +176,8 @@
     public byte[] value(String key)
     {
     	IColumn column = columns_.get(key);
-    	assert column instanceof Column;
+    	if ( column instanceof SuperColumn )
+    		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
     	if ( column != null )
     		return column.value();
     	throw new IllegalArgumentException("Value was requested for a column that does not exist.");
@@ -202,18 +211,19 @@
      * Go through each sub column if it exists then as it to resolve itself
      * if the column does not exist then create it.
      */
-    public void putColumn(IColumn column)
+    public boolean putColumn(IColumn column)
     {
     	if ( !(column instanceof SuperColumn))
     		throw new UnsupportedOperationException("Only Super column objects should be put here");
     	if( !name_.equals(column.name()))
     		throw new IllegalArgumentException("The name should match the name of the current column or super column");
+    	Collection<IColumn> columns = column.getSubColumns();
 
-        for (IColumn subColumn : column.getSubColumns())
+        for ( IColumn subColumn : columns )
         {
-        	addColumn(subColumn.name(), subColumn);
+       		addColumn(subColumn.name(), subColumn);
         }
-        markedForDeleteAt = Math.max(markedForDeleteAt, column.getMarkedForDeleteAt());
+        return false;
     }
 
     public int getObjectCount()
@@ -221,8 +231,10 @@
     	return 1 + columns_.size();
     }
 
-    public long getMarkedForDeleteAt() {
-        return markedForDeleteAt;
+    public void delete()
+    {
+    	columns_.clear();
+    	isMarkedForDelete_.set(true);
     }
 
     int getColumnCount()
@@ -230,6 +242,21 @@
     	return columns_.size();
     }
 
+    public void repair(IColumn column)
+    {
+    	Collection<IColumn> columns = column.getSubColumns();
+
+        for ( IColumn subColumn : columns )
+        {
+        	IColumn columnInternal = columns_.get(subColumn.name());
+        	if( columnInternal == null )
+        		columns_.put(subColumn.name(), subColumn);
+        	else
+        		columnInternal.repair(subColumn);
+        }
+    }
+
+
     public IColumn diff(IColumn column)
     {
     	IColumn  columnDiff = new SuperColumn(column.name());
@@ -260,7 +287,7 @@
     public byte[] digest()
     {
     	Set<IColumn> columns = columns_.getSortedColumns();
-    	byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
+    	byte[] xorHash = new byte[0];
     	if(name_ == null)
     		return xorHash;
     	xorHash = name_.getBytes();
@@ -275,23 +302,23 @@
     public String toString()
     {
     	StringBuilder sb = new StringBuilder();
-        sb.append("SuperColumn(");
     	sb.append(name_);
-
-        if (isMarkedForDelete()) {
-            sb.append(" -delete at " + getMarkedForDeleteAt() + "-");
+    	sb.append(":");
+        sb.append(isMarkedForDelete());
+        sb.append(":");
+
+        Collection<IColumn> columns  = getSubColumns();
+        sb.append(columns.size());
+        sb.append(":");
+        sb.append(size());
+        sb.append(":");
+        for ( IColumn subColumn : columns )
+        {
+            sb.append(subColumn.toString());
         }
-
-        sb.append(" [");
-        sb.append(StringUtils.join(getSubColumns(), ", "));
-        sb.append("])");
-
+        sb.append(":");
         return sb.toString();
     }
-
-    public void markForDeleteAt(long timestamp) {
-        this.markedForDeleteAt = timestamp;
-    }
 }
 
 class SuperColumnSerializer implements ICompactSerializer2<IColumn>
@@ -300,7 +327,7 @@
     {
     	SuperColumn superColumn = (SuperColumn)column;
         dos.writeUTF(superColumn.name());
-        dos.writeLong(superColumn.getMarkedForDeleteAt());
+        dos.writeBoolean(superColumn.isMarkedForDelete());
 
         Collection<IColumn> columns  = column.getSubColumns();
         int size = columns.size();
@@ -327,15 +354,18 @@
     private SuperColumn defreezeSuperColumn(DataInputStream dis) throws IOException
     {
         String name = dis.readUTF();
+        boolean delete = dis.readBoolean();
         SuperColumn superColumn = new SuperColumn(name);
-        superColumn.markForDeleteAt(dis.readLong());
+        if ( delete )
+            superColumn.delete();
         return superColumn;
     }
 
     public IColumn deserialize(DataInputStream dis) throws IOException
     {
         SuperColumn superColumn = defreezeSuperColumn(dis);
-        fillSuperColumn(superColumn, dis);
+        if ( !superColumn.isMarkedForDelete() )
+            fillSuperColumn(superColumn, dis);
         return superColumn;
     }
 
@@ -348,11 +378,12 @@
         int size = dis.readInt();
         dis.skip(size);
     }
-
+    
     private void fillSuperColumn(IColumn superColumn, DataInputStream dis) throws IOException
     {
-        assert dis.available() != 0;
-
+        if ( dis.available() == 0 )
+            return;
+        
         /* read the number of columns */
         int size = dis.readInt();
         /* read the size of all columns */
@@ -368,12 +399,13 @@
     {
         if ( dis.available() == 0 )
             return null;
-
+        
         IColumn superColumn = defreezeSuperColumn(dis);
         superColumn = filter.filter(superColumn, dis);
         if(superColumn != null)
         {
-            fillSuperColumn(superColumn, dis);
+            if ( !superColumn.isMarkedForDelete() )
+                fillSuperColumn(superColumn, dis);
             return superColumn;
         }
         else
@@ -395,29 +427,32 @@
     {
         if ( dis.available() == 0 )
             return null;
-
+        
         String[] names = RowMutation.getColumnAndColumnFamily(name);
         if ( names.length == 1 )
         {
             IColumn superColumn = defreezeSuperColumn(dis);
             if(name.equals(superColumn.name()))
             {
-                /* read the number of columns stored */
-                int size = dis.readInt();
-                /* read the size of all columns */
-                dis.readInt();
-                IColumn column = null;
-                for ( int i = 0; i < size; ++i )
+                if ( !superColumn.isMarkedForDelete() )
                 {
-                    column = Column.serializer().deserialize(dis, filter);
-                    if(column != null)
+                    /* read the number of columns stored */
+                    int size = dis.readInt();
+                    /* read the size of all columns */
+                    dis.readInt();     
+                	IColumn column = null;
+                    for ( int i = 0; i < size; ++i )
                     {
-                        superColumn.addColumn(column.name(), column);
-                        column = null;
-                        if(filter.isDone())
-                        {
-                            break;
-                        }
+                    	column = Column.serializer().deserialize(dis, filter);
+                    	if(column != null)
+                    	{
+                            superColumn.addColumn(column.name(), column);
+                    		column = null;
+                    		if(filter.isDone())
+                    		{
+                    			break;
+                    		}
+                    	}
                     }
                 }
                 return superColumn;

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java Fri Mar 27 05:39:40 2009
@@ -18,13 +18,25 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
 
-import org.apache.log4j.Logger;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.IFileReader;
@@ -32,8 +44,9 @@
 import org.apache.cassandra.io.SequenceFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -47,7 +60,7 @@
     /* Name of the SystemTable */
     public static final String name_ = "System";
     /* Name of the only column family in the Table */
-    public static final String cfName_ = "LocationInfo";
+    static final String cfName_ = "LocationInfo";
     /* Name of columns in this table */
     static final String generation_ = "Generation";
     static final String token_ = "Token";
@@ -150,20 +163,19 @@
      * This method is used to update the SystemTable with the
      * new token.
     */
-    public void updateToken(Token token) throws IOException
+    public void updateToken(BigInteger token) throws IOException
     {
-        IPartitioner p = StorageService.getPartitioner();
         if ( systemRow_ != null )
         {
-            Map<String, ColumnFamily> columnFamilies = systemRow_.getColumnFamilyMap();
+            Map<String, ColumnFamily> columnFamilies = systemRow_.getColumnFamilies();
             /* Retrieve the "LocationInfo" column family */
             ColumnFamily columnFamily = columnFamilies.get(SystemTable.cfName_);
             long oldTokenColumnTimestamp = columnFamily.getColumn(SystemTable.token_).timestamp();
             /* create the "Token" whose value is the new token. */
-            IColumn tokenColumn = new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token), oldTokenColumnTimestamp + 1);
+            IColumn tokenColumn = new Column(SystemTable.token_, token.toByteArray(), oldTokenColumnTimestamp + 1);
             /* replace the old "Token" column with this new one. */
-            logger_.debug("Replacing old token " + p.getTokenFactory().fromByteArray(columnFamily.getColumn(SystemTable.token_).value()) + " with " + token);
-            columnFamily.addColumn(tokenColumn);
+            logger_.debug("Replacing old token " + new BigInteger( columnFamily.getColumn(SystemTable.token_).value() ).toString() + " with token " + token.toString());
+            columnFamily.addColumn(SystemTable.token_, tokenColumn);
             reset(systemRow_);
         }
     }
@@ -183,7 +195,17 @@
     {
         LogUtil.init();
         StorageService.instance().start();
-        SystemTable.openSystemTable(SystemTable.cfName_).updateToken(StorageService.token("503545744:0"));
+        SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.hash("503545744:0") );
         System.out.println("Done");
+
+        /*
+        BigInteger hash = StorageService.hash("304700067:0");
+        List<Range> ranges = new ArrayList<Range>();
+        ranges.add( new Range(new BigInteger("1218069462158869448693347920504606362273788442553"), new BigInteger("1092770595533781724218060956188429069")) );
+        if ( Range.isKeyInRanges(ranges, "304700067:0") )
+        {
+            System.out.println("Done");
+        }
+        */
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 05:39:40 2009
@@ -19,14 +19,18 @@
 package org.apache.cassandra.db;
 
 import java.util.*;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.cassandra.analytics.DBAnalyticsSource;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.DataInputBuffer;
@@ -48,6 +52,9 @@
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.service.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -60,7 +67,7 @@
      * is basically the column family name and the ID associated with
      * this column family. We use this ID in the Commit Log header to
      * determine when a log file that has been rolled can be deleted.
-    */
+    */    
     public static class TableMetadata
     {
         /* Name of the column family */
@@ -153,7 +160,7 @@
             cfTypeMap_.put(cf, type);
         }
         
-        public boolean isEmpty()
+        boolean isEmpty()
         {
             return cfIdMap_.isEmpty();
         }
@@ -193,7 +200,7 @@
             return cfIdMap_.containsKey(cfName);
         }
         
-        public void apply() throws IOException
+        void apply() throws IOException
         {
             String table = DatabaseDescriptor.getTables().get(0);
             DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -454,7 +461,7 @@
         return columnFamilyStores_;
     }
 
-    public ColumnFamilyStore getColumnFamilyStore(String cfName)
+    ColumnFamilyStore getColumnFamilyStore(String cfName)
     {
         return columnFamilyStores_.get(cfName);
     }
@@ -488,7 +495,7 @@
         for ( String cfName : cfNames )
         {
             ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
-            sb.append(cfStore.cfStats(newLineSeparator));
+            sb.append(cfStore.cfStats(newLineSeparator, df));
         }
         int newLength = sb.toString().length();
         
@@ -592,7 +599,7 @@
         {
             ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
             if ( cfStore != null )
-                MinorCompactionManager.instance().submitMajor(cfStore, 0);
+                MinorCompactionManager.instance().submitMajor(cfStore, null, 0);
         }
     }
 
@@ -684,6 +691,26 @@
         dbAnalyticsSource_.updateReadStatistics(timeTaken);
         return row;
     }
+    
+    public Row getRowFromMemory(String key)
+    {
+        Row row = new Row(key);
+        Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
+        long start = System.currentTimeMillis();
+        for ( String columnFamily : columnFamilies )
+        {
+            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily);
+            if ( cfStore != null )
+            {    
+                ColumnFamily cf = cfStore.getColumnFamilyFromMemory(key, columnFamily, new IdentityFilter());
+                if ( cf != null )
+                    row.addColumnFamily(cf);
+            }
+        }
+        long timeTaken = System.currentTimeMillis() - start;
+        dbAnalyticsSource_.updateReadStatistics(timeTaken);
+        return row;
+    }
 
 
     /**
@@ -788,18 +815,22 @@
      * Once this happens the data associated with the individual column families
      * is also written to the column family store's memtable.
     */
-    void apply(Row row) throws IOException
-    {
+    public void apply(Row row) throws IOException
+    {        
+        String key = row.key();
         /* Add row to the commit log. */
         long start = System.currentTimeMillis();
+               
         CommitLog.CommitLogContext cLogCtx = CommitLog.open(table_).add(row);
-
-        for (ColumnFamily columnFamily : row.getColumnFamilies())
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+        Set<String> cNames = columnFamilies.keySet();
+        for ( String cName : cNames )
         {
+        	ColumnFamily columnFamily = columnFamilies.get(cName);
             ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
-            cfStore.apply(row.key(), columnFamily, cLogCtx);
+            cfStore.apply( key, columnFamily, cLogCtx);            
         }
-
+        row.clear();
         long timeTaken = System.currentTimeMillis() - start;
         dbAnalyticsSource_.updateWriteStatistics(timeTaken);
     }
@@ -807,7 +838,7 @@
     void applyNow(Row row) throws IOException
     {
         String key = row.key();
-        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
 
         Set<String> cNames = columnFamilies.keySet();
         for ( String cName : cNames )
@@ -823,11 +854,24 @@
         Set<String> cfNames = columnFamilyStores_.keySet();
         for ( String cfName : cfNames )
         {
-            if (fRecovery) {
-                columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
-            } else {
-                columnFamilyStores_.get(cfName).forceFlush();
-            }
+            columnFamilyStores_.get(cfName).forceFlush(fRecovery);
+        }
+    }
+
+    void delete(Row row) throws IOException
+    {
+        String key = row.key();
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+
+        /* Add row to commit log */
+        CommitLog.open(table_).add(row);
+        Set<String> cNames = columnFamilies.keySet();
+
+        for ( String cName : cNames )
+        {
+        	ColumnFamily columnFamily = columnFamilies.get(cName);
+            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
+            cfStore.delete( key, columnFamily );
         }
     }
 
@@ -837,7 +881,7 @@
         /* Add row to the commit log. */
         long start = System.currentTimeMillis();
                 
-        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+        Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
         Set<String> cNames = columnFamilies.keySet();
         for ( String cName : cNames )
         {
@@ -858,7 +902,7 @@
     	            }
     	            else if(column.timestamp() == 3)
     	            {
-    	            	cfStore.forceFlush();
+    	            	cfStore.forceFlush(false);
     	            }
     	            else if(column.timestamp() == 4)
     	            {
@@ -876,16 +920,12 @@
         dbAnalyticsSource_.updateWriteStatistics(timeTaken);
     }
 
-    public Set<String> getApplicationColumnFamilies()
+    public static void main(String[] args) throws Throwable
     {
-        Set<String> set = new HashSet<String>();
-        for (String cfName : getColumnFamilies())
-        {
-            if (DatabaseDescriptor.isApplicationColumnFamily(cfName))
-            {
-                set.add(cfName);
-            }
-        }
-        return set;
+        StorageService service = StorageService.instance();
+        service.start();
+        Table table = Table.open("Mailbox");
+        Row row = table.get("35300190:1");
+        System.out.println( row.key() );
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java Fri Mar 27 05:39:40 2009
@@ -47,10 +47,7 @@
 
 	public ColumnFamily filter(String cf, ColumnFamily columnFamily)
 	{
-    	if (columnFamily == null)
-    		return columnFamily;
-
-        String[] values = RowMutation.getColumnAndColumnFamily(cf);
+    	String[] values = RowMutation.getColumnAndColumnFamily(cf);
 		String cfName = columnFamily.name();
 		ColumnFamily filteredCf = new ColumnFamily(cfName);
 		if( values.length == 1 && !DatabaseDescriptor.getColumnType(cfName).equals("Super"))
@@ -61,7 +58,7 @@
     		{
     			if ( column.timestamp() >=  timeLimit_ )
     			{
-    				filteredCf.addColumn(column);
+    				filteredCf.addColumn(column.name(), column);
     				++i;
     			}
     			else
@@ -85,8 +82,8 @@
     		for(IColumn column : columns)
     		{
     			SuperColumn superColumn = (SuperColumn)column;
-       			SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
-				filteredCf.addColumn(filteredSuperColumn);
+    			SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+				filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
         		Collection<IColumn> subColumns = superColumn.getSubColumns();
         		int i = 0;
         		for(IColumn subColumn : subColumns)
@@ -146,6 +143,6 @@
 
 	public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
     {
-    	return ssTable.next( key, cf, null, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
+    	return ssTable.next( key, cf, new IndexHelper.TimeRange( timeLimit_, System.currentTimeMillis() ) );
     }
 }