You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/16 17:27:05 UTC

svn commit: r955267 - in /cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/util/ src/java/org/apache/cassandra/tools/ test/unit/org/apache/ca...

Author: jbellis
Date: Wed Jun 16 15:27:05 2010
New Revision: 955267

URL: http://svn.apache.org/viewvc?rev=955267&view=rev
Log:
add LazilyCompactedRow
patch by jbellis; reviewed by Stu Hood for CASSANDRA-16


Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Jun 16 15:27:05 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.security.MessageDigest;
 import java.io.IOException;
@@ -213,6 +214,33 @@ public class Column implements IColumn
         return rel;
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        Column column = (Column)o;
+
+        if (clock != null ? !clock.equals(column.clock) : column.clock != null)
+            return false;
+        if (!Arrays.equals(name, column.name))
+            return false;
+
+        return Arrays.equals(value, column.value);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = name != null ? Arrays.hashCode(name) : 0;
+        result = 31 * result + (value != null ? Arrays.hashCode(value) : 0);
+        result = 31 * result + (clock != null ? clock.hashCode() : 0);
+        return result;
+    }
+
     public String getString(AbstractType comparator)
     {
         StringBuilder sb = new StringBuilder();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Wed Jun 16 15:27:05 2010
@@ -33,9 +33,10 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class ColumnFamily implements IColumnContainer
+public class ColumnFamily implements IColumnContainer, IIterableColumns
 {
     /* The column serializer for this Column Family. Create based on config. */
     private static ColumnFamilySerializer serializer = new ColumnFamilySerializer();
@@ -442,4 +443,14 @@ public class ColumnFamily implements ICo
             return;
         addAll(cf);
     }
+
+    public int getEstimatedColumnCount()
+    {
+        return getColumnCount();
+    }
+
+    public Iterator<IColumn> iterator()
+    {
+        return columns.values().iterator();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Jun 16 15:27:05 2010
@@ -73,9 +73,7 @@ public class ColumnFamilySerializer impl
     {
         try
         {
-            dos.writeInt(columnFamily.localDeletionTime.get());
-            IClock _markedForDeleteAt = columnFamily.markedForDeleteAt.get();
-            columnFamily.getClockType().serializer().serialize(_markedForDeleteAt, dos);
+            serializeCFInfo(columnFamily, dos);
 
             Collection<IColumn> columns = columnFamily.getSortedColumns();
             dos.writeInt(columns.size());
@@ -90,6 +88,13 @@ public class ColumnFamilySerializer impl
         }
     }
 
+    public void serializeCFInfo(ColumnFamily columnFamily, DataOutput dos) throws IOException
+    {
+        dos.writeInt(columnFamily.localDeletionTime.get());
+        IClock _markedForDeleteAt = columnFamily.markedForDeleteAt.get();
+        columnFamily.getClockType().serializer().serialize(_markedForDeleteAt, dos);
+    }
+
     public void serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos)
     {
         ColumnIndexer.serialize(columnFamily, dos);
@@ -108,7 +113,7 @@ public class ColumnFamilySerializer impl
         return cf;
     }
 
-    private void deserializeColumns(DataInput dis, ColumnFamily cf) throws IOException
+    public void deserializeColumns(DataInput dis, ColumnFamily cf) throws IOException
     {
         int size = dis.readInt();
         for (int i = 0; i < size; ++i)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Wed Jun 16 15:27:05 2010
@@ -22,12 +22,12 @@ import java.io.IOError;
 import java.io.IOException;
 import java.io.DataOutput;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.BloomFilter;
 
 
@@ -39,15 +39,15 @@ public class ColumnIndexer
 	/**
 	 * Given a column family this, function creates an in-memory structure that represents the
 	 * column index for the column family, and subsequently writes it to disk.
-	 * @param columnFamily Column family to create index for
+	 * @param columns Column family to create index for
 	 * @param dos data output stream
 	 * @throws IOException
 	 */
-    public static void serialize(ColumnFamily columnFamily, DataOutput dos)
+    public static void serialize(IIterableColumns columns, DataOutput dos)
     {
         try
         {
-            serializeInternal(columnFamily, dos);
+            serializeInternal(columns, dos);
         }
         catch (IOException e)
         {
@@ -55,19 +55,15 @@ public class ColumnIndexer
         }
     }
 
-    public static void serializeInternal(ColumnFamily columnFamily, DataOutput dos) throws IOException
+    public static void serializeInternal(IIterableColumns columns, DataOutput dos) throws IOException
     {
-        Collection<IColumn> columns = columnFamily.getSortedColumns();
-        int columnCount = columns.size();
+        int columnCount = columns.getEstimatedColumnCount();
 
         BloomFilter bf = BloomFilter.getFilter(columnCount, 4);
 
-        if (columns.isEmpty())
+        if (columnCount == 0)
         {
-            // write empty bloom filter and index
-            writeBloomFilter(dos, bf);
-            dos.writeInt(0);
-
+            writeEmptyHeader(dos, bf);
             return;
         }
 
@@ -98,8 +94,16 @@ public class ColumnIndexer
 
             lastColumn = column;
         }
+
+        // all columns were GC'd after all
+        if (lastColumn == null)
+        {
+            writeEmptyHeader(dos, bf);
+            return;
+        }
+
         // the last column may have fallen on an index boundary already.  if not, index it explicitly.
-        if (indexList.isEmpty() || columnFamily.getComparator().compare(indexList.get(indexList.size() - 1).lastName, lastColumn.name()) != 0)
+        if (indexList.isEmpty() || columns.getComparator().compare(indexList.get(indexList.size() - 1).lastName, lastColumn.name()) != 0)
         {
             IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), startPosition, endPosition - startPosition);
             indexList.add(cIndexInfo);
@@ -118,6 +122,13 @@ public class ColumnIndexer
         }
 	}
 
+    private static void writeEmptyHeader(DataOutput dos, BloomFilter bf)
+            throws IOException
+    {
+        writeBloomFilter(dos, bf);
+        dos.writeInt(0);
+    }
+
     private static void writeBloomFilter(DataOutput dos, BloomFilter bf) throws IOException
     {
         DataOutputBuffer bufOut = new DataOutputBuffer();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TimestampClock.java Wed Jun 16 15:27:05 2010
@@ -102,6 +102,18 @@ public class TimestampClock implements I
     {
         return Long.toString(timestamp);
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        return o instanceof TimestampClock && compare((TimestampClock)o) == ClockRelationship.EQUAL;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return (int)(timestamp ^ (timestamp >>> 32));
+    }
 }
 
 class TimestampClockSerializer implements ICompactSerializer2<IClock>

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Wed Jun 16 15:27:05 2010
@@ -48,7 +48,7 @@ public class CompactionIterator extends 
 
     protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
-    private final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+    protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
     private final int gcBefore;
     private final boolean major;
 
@@ -103,7 +103,7 @@ public class CompactionIterator extends 
 
         try
         {
-            PrecompactedRow compactedRow = new PrecompactedRow(rows, major, gcBefore);
+            AbstractCompactedRow compactedRow = getCompactedRow();
             return compactedRow.isEmpty() ? null : compactedRow;
         }
         finally
@@ -120,6 +120,19 @@ public class CompactionIterator extends 
         }
     }
 
+    protected AbstractCompactedRow getCompactedRow()
+    {
+        long rowSize = 0;
+        for (SSTableIdentityIterator row : rows)
+        {
+            rowSize += row.getDataSize();
+        }
+        if (rowSize > 512 * 1024 * 1024)
+            return new LazilyCompactedRow(rows, major, gcBefore);
+        else
+            return new PrecompactedRow(rows, major, gcBefore);
+    }
+
     public void close() throws IOException
     {
         for (SSTableScanner scanner : getScanners())

Added: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=955267&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Wed Jun 16 15:27:05 2010
@@ -0,0 +1,192 @@
+package org.apache.cassandra.io;
+
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.*;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterators;
+import org.apache.commons.collections.iterators.CollatingIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.IIterableColumns;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ReducingIterator;
+
+/**
+ * LazilyCompactedRow only computes the row bloom filter and column index in memory
+ * (at construction time); it does this by reading one column at a time from each
+ * of the rows being compacted, and merging them as it does so.  So the most we have
+ * in memory at a time is the bloom filter, the index, and one column from each
+ * pre-compaction row.
+ *
+ * When write() or update() is called, a second pass is made over the pre-compaction
+ * rows to write the merged columns or update the hash, again with at most one column
+ * from each row deserialized at a time.
+ */
+public class LazilyCompactedRow extends AbstractCompactedRow implements IIterableColumns
+{
+    private final List<SSTableIdentityIterator> rows;
+    private final boolean major;
+    private final int gcBefore;
+    private final DataOutputBuffer headerBuffer;
+    private ColumnFamily emptyColumnFamily;
+    private LazyColumnIterator iter;
+    private int columnCount;
+    private long columnSerializedSize;
+
+    public LazilyCompactedRow(List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
+    {
+        super(rows.get(0).getKey());
+        this.major = major;
+        this.gcBefore = gcBefore;
+        this.rows = new ArrayList<SSTableIdentityIterator>(rows);
+
+        for (SSTableIdentityIterator row : rows)
+        {
+            ColumnFamily cf = row.getColumnFamily();
+
+            if (emptyColumnFamily == null)
+                emptyColumnFamily = cf;
+            else
+                emptyColumnFamily.delete(cf);
+        }
+
+        // initialize row header so isEmpty can be called
+        headerBuffer = new DataOutputBuffer();
+        ColumnIndexer.serialize(this, headerBuffer);
+        // reach into iterator used by ColumnIndexer to get column count and size
+        columnCount = iter.size;
+        columnSerializedSize = iter.serializedSize;
+        iter = null;
+    }
+
+    public void write(DataOutput out) throws IOException
+    {
+        if (rows.size() == 1 && !major)
+        {
+            SSTableIdentityIterator row = rows.get(0);
+            out.writeInt(row.getDataSize());
+            row.echoData(out);
+            return;
+        }
+
+        DataOutputBuffer clockOut = new DataOutputBuffer();
+        ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
+
+        out.writeInt((int)(headerBuffer.getLength() + clockOut.getLength() + columnSerializedSize));
+        out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
+        out.write(clockOut.getData(), 0, clockOut.getLength());
+        out.writeInt(columnCount);
+
+        Iterator<IColumn> iter = iterator();
+        while (iter.hasNext())
+        {
+            IColumn column = iter.next();
+            emptyColumnFamily.getColumnSerializer().serialize(column, out);
+        }
+    }
+
+    public void update(MessageDigest digest)
+    {
+        // no special-case for rows.size == 1, we're actually skipping some bytes here so just
+        // blindly updating everything wouldn't be correct
+        digest.update(headerBuffer.getData(), 0, headerBuffer.getLength());
+        DataOutputBuffer out = new DataOutputBuffer();
+        Iterator<IColumn> iter = iterator();
+        while (iter.hasNext())
+        {
+            IColumn column = iter.next();
+            out.reset();
+            try
+            {
+                emptyColumnFamily.getColumnSerializer().serialize(column, out);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            digest.update(out.getData(), 0, out.getLength());
+        }
+    }
+
+    public boolean isEmpty()
+    {
+        boolean cfIrrelevant = ColumnFamilyStore.removeDeleted(emptyColumnFamily, gcBefore) == null;
+        return cfIrrelevant && columnCount == 0;
+    }
+
+    public int getEstimatedColumnCount()
+    {
+        int n = 0;
+        for (SSTableIdentityIterator row : rows)
+            n += row.getColumnCount();
+        return n;
+    }
+
+    public AbstractType getComparator()
+    {
+        return emptyColumnFamily.getComparator();
+    }
+
+    public Iterator<IColumn> iterator()
+    {
+        for (SSTableIdentityIterator row : rows)
+        {
+            row.reset();
+        }
+        Comparator<IColumn> nameComparator = new Comparator<IColumn>()
+        {
+            public int compare(IColumn o1, IColumn o2)
+            {
+                return getComparator().compare(o1.name(), o2.name());
+            }
+        };
+        iter = new LazyColumnIterator(new CollatingIterator(nameComparator, rows));
+        return Iterators.filter(iter, Predicates.notNull());
+    }
+
+    private class LazyColumnIterator extends ReducingIterator<IColumn, IColumn>
+    {
+        ColumnFamily container = emptyColumnFamily.cloneMeShallow();
+        long serializedSize = 4; // int for column count
+        int size = 0;
+
+        public LazyColumnIterator(Iterator<IColumn> source)
+        {
+            super(source);
+        }
+
+        @Override
+        protected boolean isEqual(IColumn o1, IColumn o2)
+        {
+            return Arrays.equals(o1.name(), o2.name());
+        }
+
+        public void reduce(IColumn current)
+        {
+            container.addColumn(current);
+        }
+
+        protected IColumn getReduced()
+        {
+            assert container != null;
+            IColumn reduced = container.iterator().next();
+            ColumnFamily purged = major ? ColumnFamilyStore.removeDeleted(container, gcBefore) : container;
+            if (purged == null || !purged.iterator().hasNext())
+            {
+                container.clear();
+                return null;
+            }
+            container.clear();
+            serializedSize += reduced.serializedSize();
+            size++;
+            return reduced;
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Wed Jun 16 15:27:05 2010
@@ -42,7 +42,7 @@ public class PrecompactedRow extends Abs
                 ColumnFamily thisCF;
                 try
                 {
-                    thisCF = row.getColumnFamily();
+                    thisCF = row.getColumnFamilyWithColumns();
                 }
                 catch (IOException e)
                 {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Wed Jun 16 15:27:05 2010
@@ -86,7 +86,7 @@ public class IndexHelper
      * @return bloom filter summarizing the column information
      * @throws java.io.IOException
      */
-    public static BloomFilter defreezeBloomFilter(FileDataInput file) throws IOException
+    public static BloomFilter defreezeBloomFilter(DataInput file) throws IOException
     {
         int size = file.readInt();
         byte[] bytes = new byte[size];

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Jun 16 15:27:05 2010
@@ -22,6 +22,7 @@ package org.apache.cassandra.io.sstable;
 
 
 import java.io.*;
+import java.util.ArrayList;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
@@ -29,15 +30,14 @@ import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.filter.IColumnIterator;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 
-import com.google.common.collect.AbstractIterator;
-
-public class SSTableIdentityIterator extends AbstractIterator<IColumn> implements Comparable<SSTableIdentityIterator>, IColumnIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
 {
     private final DecoratedKey key;
     private final long finishedAt;
     private final BufferedRandomAccessFile file;
     private SSTableReader sstable;
     private long dataStart;
+    private final int dataSize;
 
     /**
      * Used to iterate through the columns of a row.
@@ -45,17 +45,18 @@ public class SSTableIdentityIterator ext
      * @param file Reading using this file.
      * @param key Key of this row.
      * @param dataStart Data for this row starts at this pos.
-     * @param finishedAt Data for this row ends at this pos.
+     * @param dataSize length of row data
      * @throws IOException
      */
-    public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long finishedAt)
+    public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, int dataSize)
     throws IOException
     {
         this.sstable = sstable;
         this.file = file;
         this.key = key;
         this.dataStart = dataStart;
-        this.finishedAt = finishedAt;
+        this.dataSize = dataSize;
+        finishedAt = dataStart + dataSize;
     }
 
     public DecoratedKey getKey()
@@ -68,6 +69,11 @@ public class SSTableIdentityIterator ext
         return file.getPath();
     }
 
+    public int getDataSize()
+    {
+        return dataSize;
+    }
+
     public void echoData(DataOutput out) throws IOException
     {
         file.seek(dataStart);
@@ -77,38 +83,77 @@ public class SSTableIdentityIterator ext
         }
     }
 
-    // TODO r/m this and make compaction merge columns iteratively for CASSSANDRA-16
-    public ColumnFamily getColumnFamily() throws IOException
+    public ColumnFamily getColumnFamily()
     {
-        file.seek(dataStart);
-        IndexHelper.skipBloomFilter(file);
-        IndexHelper.skipIndex(file);
-        return ColumnFamily.serializer().deserializeFromSSTable(sstable, file);
+        ColumnFamily cf;
+        try
+        {
+            file.seek(dataStart);
+            IndexHelper.skipBloomFilter(file);
+            IndexHelper.skipIndex(file);
+            cf = sstable.makeColumnFamily();
+            ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, file);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        return cf;
     }
 
-    protected IColumn computeNext()
+    public int getColumnCount()
     {
+        getColumnFamily(); // skips to column count
         try
         {
-            assert file.getFilePointer() <= finishedAt;
-            if (file.getFilePointer() == finishedAt)
-            {
-                return endOfData();
-            }
+            return file.readInt();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
 
+    public ColumnFamily getColumnFamilyWithColumns() throws IOException
+    {
+        ColumnFamily cf = getColumnFamily();
+        ColumnFamily.serializer().deserializeColumns(file, cf);
+        return cf;
+    }
+
+    public boolean hasNext()
+    {
+        return file.getFilePointer() < finishedAt;
+    }
+
+    public IColumn next()
+    {
+        try
+        {
             return sstable.getColumnSerializer().deserialize(file);
         }
         catch (IOException e)
         {
-            throw new RuntimeException(e);
+            throw new IOError(e);
         }
     }
 
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public int compareTo(SSTableIdentityIterator o)
     {
         return key.compareTo(o.key);
     }
 
+    public void reset()
+    {
+        getColumnCount();
+    }
+
     public void close() throws IOException
     {
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Wed Jun 16 15:27:05 2010
@@ -175,7 +175,9 @@ public class SSTableScanner implements I
 
                 if (filter == null)
                 {
-                    return row = new SSTableIdentityIterator(sstable, file, key, dataStart, finishedAt);
+                    row = new SSTableIdentityIterator(sstable, file, key, dataStart, dataSize);
+                    ((SSTableIdentityIterator)row).reset();
+                    return row;
                 }
                 else
                 {

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java?rev=955267&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/IIterableColumns.java Wed Jun 16 15:27:05 2010
@@ -0,0 +1,13 @@
+package org.apache.cassandra.io.util;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public interface IIterableColumns extends Iterable<IColumn>
+{
+    public int getEstimatedColumnCount();
+
+    AbstractType getComparator();
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=955267&r1=955266&r2=955267&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Wed Jun 16 15:27:05 2010
@@ -112,7 +112,7 @@ public class SSTableExport
     
     private static String serializeRow(SSTableIdentityIterator row) throws IOException
     {
-        ColumnFamily cf = row.getColumnFamily();
+        ColumnFamily cf = row.getColumnFamilyWithColumns();
         AbstractType comparator = cf.getComparator();
         StringBuilder json = new StringBuilder(asKey(bytesToHex(row.getKey().key)));
         
@@ -265,8 +265,8 @@ public class SSTableExport
             excludeSet = new HashSet<String>(Arrays.asList(excludes));
 
         outs.println("{");
-        
-        while(scanner.hasNext())
+
+        while (scanner.hasNext())
         {
             SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
             if (excludeSet.contains(bytesToHex(row.getKey().key)))

Added: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=955267&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Wed Jun 16 15:27:05 2010
@@ -0,0 +1,207 @@
+package org.apache.cassandra.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.CollatingOrderPreservingPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertEquals;
+
+
+public class LazilyCompactedRowTest extends CleanupHelper
+{
+    private void assertBytes(Collection<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
+    {
+        CompactionIterator ci1 = new CompactionIterator(sstables, gcBefore, major);
+        LazyCompactionIterator ci2 = new LazyCompactionIterator(sstables, gcBefore, major);
+
+        while (true)
+        {
+            if (!ci1.hasNext())
+            {
+                assert !ci2.hasNext();
+                break;
+            }
+
+            AbstractCompactedRow row1 = ci1.next();
+            AbstractCompactedRow row2 = ci2.next();
+            DataOutputBuffer out1 = new DataOutputBuffer();
+            DataOutputBuffer out2 = new DataOutputBuffer();
+            row1.write(out1);
+            row2.write(out2);
+            DataInputStream in1 = new DataInputStream(new ByteArrayInputStream(out1.getData(), 0, out1.getLength()));
+            DataInputStream in2 = new DataInputStream(new ByteArrayInputStream(out2.getData(), 0, out2.getLength()));
+
+            // key isn't part of what CompactedRow writes, that's done by SSTW.append
+
+            // row size can differ b/c of bloom filter counts being different
+            int rowSize1 = in1.readInt();
+            int rowSize2 = in2.readInt();
+            assertEquals(out1.getLength(), rowSize1 + 4);
+            assertEquals(out2.getLength(), rowSize2 + 4);
+            // bloom filter
+            IndexHelper.defreezeBloomFilter(in1);
+            IndexHelper.defreezeBloomFilter(in2);
+            // index
+            int indexSize1 = in1.readInt();
+            int indexSize2 = in2.readInt();
+            assertEquals(indexSize1, indexSize2);
+            byte[] bytes1 = new byte[indexSize1];
+            byte[] bytes2 = new byte[indexSize2];
+            in1.readFully(bytes1);
+            in2.readFully(bytes2);
+            assert Arrays.equals(bytes1, bytes2);
+            // cf metadata
+            ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+            ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+            ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf1, in1);
+            ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf2, in2);
+            assert cf1.getLocalDeletionTime() == cf2.getLocalDeletionTime();
+            assert cf1.getMarkedForDeleteAt().equals(cf2.getMarkedForDeleteAt());   
+            // columns
+            int columns = in1.readInt();
+            assert columns == in2.readInt();
+            for (int i = 0; i < columns; i++)
+            {
+                IColumn c1 = cf1.getColumnSerializer().deserialize(in1);
+                IColumn c2 = cf2.getColumnSerializer().deserialize(in2);
+                assert c1.equals(c2);
+            }
+            // that should be everything
+            assert in1.available() == 0;
+            assert in2.available() == 0;
+        }
+    }
+
+    @Test
+    public void testOneRow() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+        byte[] key = "k".getBytes();
+        RowMutation rm = new RowMutation("Keyspace1", key);
+        rm.add(new QueryPath("Standard1", null, "c".getBytes()), new byte[0], new TimestampClock(0));
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+    }
+
+    @Test
+    public void testOneRowTwoColumns() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+        byte[] key = "k".getBytes();
+        RowMutation rm = new RowMutation("Keyspace1", key);
+        rm.add(new QueryPath("Standard1", null, "c".getBytes()), new byte[0], new TimestampClock(0));
+        rm.add(new QueryPath("Standard1", null, "d".getBytes()), new byte[0], new TimestampClock(0));
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+    }
+
+    @Test
+    public void testTwoRows() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+        byte[] key = "k".getBytes();
+        RowMutation rm = new RowMutation("Keyspace1", key);
+        rm.add(new QueryPath("Standard1", null, "c".getBytes()), new byte[0], new TimestampClock(0));
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+    }
+
+    @Test
+    public void testTwoRowsTwoColumns() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+        byte[] key = "k".getBytes();
+        RowMutation rm = new RowMutation("Keyspace1", key);
+        rm.add(new QueryPath("Standard1", null, "c".getBytes()), new byte[0], new TimestampClock(0));
+        rm.add(new QueryPath("Standard1", null, "d".getBytes()), new byte[0], new TimestampClock(0));
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+    }
+
+    @Test
+    public void testManyRows() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("Standard1");
+
+        final int ROWS_PER_SSTABLE = 10;
+        for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+            for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+                byte[] key = String.valueOf(i % 2).getBytes();
+                RowMutation rm = new RowMutation("Keyspace1", key);
+                rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], new TimestampClock(j * ROWS_PER_SSTABLE + i));
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        assertBytes(cfs.getSSTables(), Integer.MAX_VALUE, true);
+    }
+
+    private static class LazyCompactionIterator extends CompactionIterator
+    {
+        public LazyCompactionIterator(Iterable<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
+        {
+            super(sstables, gcBefore, major);
+        }
+
+        @Override
+        protected AbstractCompactedRow getCompactedRow()
+        {
+            return new LazilyCompactedRow(rows, true, Integer.MAX_VALUE);
+        }
+    }
+}