You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/07 03:50:41 UTC

svn commit: r1143627 - in /cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/sstable/ src/j...

Author: jbellis
Date: Thu Jul  7 01:50:40 2011
New Revision: 1143627

URL: http://svn.apache.org/viewvc?rev=1143627&view=rev
Log:
track max client timestamp per-sstable
patch by Alan Liang; reviewed by jbellis for CASSANDRA-2753

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
    cassandra/trunk/test/unit/org/apache/cassandra/Util.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu Jul  7 01:50:40 2011
@@ -32,8 +32,10 @@ import org.apache.avro.util.Utf8;
 import org.apache.cassandra.cache.IRowCacheProvider;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.migration.avro.ColumnDef;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.SuperColumn;
 import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -43,6 +45,7 @@ import org.apache.cassandra.db.marshal.T
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.SerDeUtils;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -1088,6 +1091,13 @@ public final class CFMetaData
         return comparator.getString(columnName).replaceAll("\\W", "") + "_idx";
     }
 
+    public IColumnSerializer getColumnSerializer()
+    {
+        if (cfType == ColumnFamilyType.Standard)
+            return Column.serializer();
+        return SuperColumn.serializer(subcolumnComparator);
+    }
+
     @Override
     public String toString()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jul  7 01:50:40 2011
@@ -728,6 +728,11 @@ public class DatabaseDescriptor
         return conf.in_memory_compaction_limit_in_mb * 1024 * 1024;
     }
 
+    public static void setInMemoryCompactionLimit(int sizeInMB)
+    {
+        conf.in_memory_compaction_limit_in_mb = sizeInMB;
+    }
+
     public static int getConcurrentCompactors()
     {
         return conf.concurrent_compactors;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Thu Jul  7 01:50:40 2011
@@ -98,6 +98,11 @@ public class Column implements IColumn
         return timestamp;
     }
 
+    public long maxTimestamp()
+    {
+        return timestamp;
+    }
+
     public boolean isMarkedForDelete()
     {
         return false;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Thu Jul  7 01:50:40 2011
@@ -73,8 +73,8 @@ public class ColumnFamily extends Abstra
         super(map);
         assert cfm != null;
         this.cfm = cfm;
-        columnSerializer = cfm.cfType == ColumnFamilyType.Standard ? Column.serializer() : SuperColumn.serializer(cfm.subcolumnComparator);
-     }
+        columnSerializer = cfm.getColumnSerializer();
+    }
     
     public ColumnFamily cloneMeShallow()
     {
@@ -237,6 +237,14 @@ public class ColumnFamily extends Abstra
         return size;
     }
 
+    public long maxTimestamp()
+    {
+        long maxTimestamp = Long.MIN_VALUE;
+        for (IColumn column : columns.values())
+            maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
+        return maxTimestamp;
+    }
+
     public int hashCode()
     {
         throw new RuntimeException("Not implemented.");

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Jul  7 01:50:40 2011
@@ -24,14 +24,15 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Collection;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.ICompactSerializer3;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
 
 public class ColumnFamilySerializer implements ICompactSerializer3<ColumnFamily>
 {
@@ -148,4 +149,18 @@ public class ColumnFamilySerializer impl
     {
         return cf.serializedSize();
     }
+
+    /**
+     * Observes columns in a single row, without adding them to the column family.
+     */
+    public void observeColumnsInSSTable(CFMetaData cfm, RandomAccessFile dis, SSTableMetadata.Collector sstableMetadataCollector) throws IOException
+    {
+        int size = dis.readInt();
+        sstableMetadataCollector.addColumnCount(size);
+        for (int i = 0; i < size; ++i)
+        {
+            IColumn column = cfm.getColumnSerializer().deserialize(dis);
+            sstableMetadataCollector.updateMaxTimestamp(column.maxTimestamp());
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jul  7 01:50:40 2011
@@ -2089,13 +2089,24 @@ public class ColumnFamilyStore implement
 
     public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize, ReplayPosition context) throws IOException
     {
-        return new SSTableWriter(getFlushPath(estimatedSize, Descriptor.CURRENT_VERSION), estimatedRows, metadata, partitioner, context);
+        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(context);
+        return new SSTableWriter(getFlushPath(estimatedSize, Descriptor.CURRENT_VERSION),
+                                 estimatedRows,
+                                 metadata,
+                                 partitioner,
+                                 sstableMetadataCollector);
     }
 
     public SSTableWriter createCompactionWriter(long estimatedRows, String location, Collection<SSTableReader> sstables) throws IOException
     {
         ReplayPosition rp = ReplayPosition.getReplayPosition(sstables);
-        return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner, rp);
+        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(rp);
+
+        // get the max timestamp of the precompacted sstables
+        for (SSTableReader sstable : sstables)
+            sstableMetadataCollector.updateMaxTimestamp(sstable.getMaxTimestamp());
+
+        return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner, sstableMetadataCollector);
     }
 
     public Iterable<ColumnFamilyStore> concatWithIndexes()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java Thu Jul  7 01:50:40 2011
@@ -69,4 +69,9 @@ public class EchoedRow extends AbstractC
     {
         return row.columnCount;
     }
+
+    public long maxTimestamp()
+    {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Thu Jul  7 01:50:40 2011
@@ -60,4 +60,10 @@ public interface IColumn
      * supercolumn deleted-at time.
      */
     boolean isLive();
+
+    /**
+     * For a standard column, this is the same as timestamp().
+     * For a super column, this is max the column value timestamp of the sub columns.
+     */
+    public long maxTimestamp();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Thu Jul  7 01:50:40 2011
@@ -133,6 +133,14 @@ public class SuperColumn extends Abstrac
     	throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
     }
 
+    public long maxTimestamp()
+    {
+        long maxTimestamp = Long.MIN_VALUE;
+        for (IColumn subColumn : getSubColumns())
+            maxTimestamp = Math.max(maxTimestamp, subColumn.maxTimestamp());
+        return maxTimestamp;
+    }
+
     public long mostRecentLiveChangeAt()
     {
         long max = Long.MIN_VALUE;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java Thu Jul  7 01:50:40 2011
@@ -31,7 +31,7 @@ import com.google.common.collect.Iterabl
 import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.io.ICompactSerializer2;
-import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
 
 public class ReplayPosition implements Comparable<ReplayPosition>
 {
@@ -48,16 +48,16 @@ public class ReplayPosition implements C
      * @param sstables
      * @return the most recent (highest) replay position
      */
-    public static ReplayPosition getReplayPosition(Iterable<? extends SSTable> sstables)
+    public static ReplayPosition getReplayPosition(Iterable<? extends SSTableReader> sstables)
     {
         if (Iterables.isEmpty(sstables))
             return NONE;
 
-        Function<SSTable, ReplayPosition> f = new Function<SSTable, ReplayPosition>()
+        Function<SSTableReader, ReplayPosition> f = new Function<SSTableReader, ReplayPosition>()
         {
-            public ReplayPosition apply(SSTable sstable)
+            public ReplayPosition apply(SSTableReader sstable)
             {
-                return sstable.replayPosition;
+                return sstable.getReplayPosition();
             }
         };
         Ordering<ReplayPosition> ordering = Ordering.from(ReplayPosition.comparator);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java Thu Jul  7 01:50:40 2011
@@ -60,4 +60,9 @@ public abstract class AbstractCompactedR
      * @return the number of columns in the row
      */
     public abstract int columnCount();
+
+    /**
+     * @return the max column timestamp in the row
+     */
+    public abstract long maxTimestamp();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Thu Jul  7 01:50:40 2011
@@ -30,11 +30,13 @@ import java.util.*;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterators;
 
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ColumnIndexer;
 import org.apache.cassandra.db.CounterColumn;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.SuperColumn;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -61,6 +63,7 @@ public class LazilyCompactedRow extends 
     private ColumnFamily emptyColumnFamily;
     private Reducer reducer;
     private int columnCount;
+    private long maxTimestamp;
     private long columnSerializedSize;
 
     public LazilyCompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
@@ -83,9 +86,10 @@ public class LazilyCompactedRow extends 
         // initialize row header so isEmpty can be called
         headerBuffer = new DataOutputBuffer();
         ColumnIndexer.serialize(this, headerBuffer);
-        // reach into the reducer used during iteration to get column count and size
+        // reach into the reducer used during iteration to get column count, size, max column timestamp
         columnCount = reducer.size;
         columnSerializedSize = reducer.serializedSize;
+        maxTimestamp = reducer.maxTimestampSeen;
         reducer = null;
     }
 
@@ -166,11 +170,17 @@ public class LazilyCompactedRow extends 
         return columnCount;
     }
 
+    public long maxTimestamp()
+    {
+        return maxTimestamp;
+    }
+
     private class Reducer extends MergeIterator.Reducer<IColumn, IColumn>
     {
         ColumnFamily container = emptyColumnFamily.cloneMeShallow();
         long serializedSize = 4; // int for column count
         int size = 0;
+        long maxTimestampSeen = Long.MIN_VALUE;
 
         public void reduce(IColumn current)
         {
@@ -194,6 +204,7 @@ public class LazilyCompactedRow extends 
             container.clear();
             serializedSize += reduced.serializedSize();
             size++;
+            maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
             return reduced;
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Thu Jul  7 01:50:40 2011
@@ -132,6 +132,11 @@ public class PrecompactedRow extends Abs
         return compactedCf == null ? 0 : compactedCf.getColumnCount();
     }
 
+    public long maxTimestamp()
+    {
+        return compactedCf.maxTimestamp();
+    }
+
     /**
      * @return the full column family represented by this compacted row.
      *

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Thu Jul  7 01:50:40 2011
@@ -39,7 +39,7 @@ import org.apache.cassandra.utils.Pair;
 public class Descriptor
 {
     public static final String LEGACY_VERSION = "a";
-    public static final String CURRENT_VERSION = "g";
+    public static final String CURRENT_VERSION = "h";
 
     public final File directory;
     public final String version;
@@ -54,6 +54,7 @@ public class Descriptor
     public final boolean hasEncodedKeys;
     public final boolean isLatestVersion;
     public final boolean usesOldBloomFilter;
+    public final boolean usesHistogramAndReplayPositionStatsFile;
 
     public enum TempState
     {
@@ -93,6 +94,7 @@ public class Descriptor
         hasIntRowSize = version.compareTo("d") < 0;
         hasEncodedKeys = version.compareTo("e") < 0;
         usesOldBloomFilter = version.compareTo("f") < 0;
+        usesHistogramAndReplayPositionStatsFile = version.compareTo("h") < 0;
         isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Thu Jul  7 01:50:40 2011
@@ -30,12 +30,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -67,31 +65,18 @@ public abstract class SSTable
     public final CFMetaData metadata;
     public final IPartitioner partitioner;
 
-    public final ReplayPosition replayPosition;
-
-    protected final EstimatedHistogram estimatedRowSize;
-    protected final EstimatedHistogram estimatedColumnCount;
-
-    protected SSTable(Descriptor descriptor, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner)
-    {
-        this(descriptor, new HashSet<Component>(), metadata, replayPosition, partitioner);
-    }
-
-    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner)
+    protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
     {
-        this(descriptor, components, metadata, replayPosition, partitioner, defaultRowHistogram(), defaultColumnHistogram());
+        this(descriptor, new HashSet<Component>(), metadata, partitioner);
     }
 
-    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner, EstimatedHistogram rowSizes, EstimatedHistogram columnCounts)
+    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner)
     {
         // In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without
         // full schema definition. SSTableLoader use that ability
         assert descriptor != null;
         assert components != null;
-        assert replayPosition != null;
         assert partitioner != null;
-        assert rowSizes != null;
-        assert columnCounts != null;
 
         this.descriptor = descriptor;
         Set<Component> dataComponents = new HashSet<Component>(components);
@@ -99,30 +84,7 @@ public abstract class SSTable
             assert component.type != Component.Type.COMPACTED_MARKER;
         this.components = Collections.unmodifiableSet(dataComponents);
         this.metadata = metadata;
-        this.replayPosition = replayPosition;
         this.partitioner = partitioner;
-        estimatedRowSize = rowSizes;
-        estimatedColumnCount = columnCounts;
-    }
-
-    static EstimatedHistogram defaultColumnHistogram()
-    {
-        return new EstimatedHistogram(114);
-    }
-
-    static EstimatedHistogram defaultRowHistogram()
-    {
-        return new EstimatedHistogram(150);
-    }
-
-    public EstimatedHistogram getEstimatedRowSize()
-    {
-        return estimatedRowSize;
-    }
-
-    public EstimatedHistogram getEstimatedColumnCount()
-    {
-        return estimatedColumnCount;
     }
 
     /**

Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java?rev=1143627&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java Thu Jul  7 01:50:40 2011
@@ -0,0 +1,224 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.BufferedInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+/**
+ * Metadata for a SSTable.
+ * Metadata includes:
+ *  - estimated row size histogram
+ *  - estimated column count histogram
+ *  - replay position
+ *  - max column timestamp
+ *
+ * An SSTableMetadata should be instantiated via the Collector, openFromDescriptor()
+ * or createDefaultInstance()
+ */
+public class SSTableMetadata
+{
+    private static Logger logger = LoggerFactory.getLogger(SSTableMetadata.class);
+    protected final EstimatedHistogram estimatedRowSize;
+    protected final EstimatedHistogram estimatedColumnCount;
+    protected final ReplayPosition replayPosition;
+    protected final long maxTimestamp;
+    public static final SSTableMetadataSerializer serializer = new SSTableMetadataSerializer();
+
+    private SSTableMetadata()
+    {
+        this(defaultRowHistogram(), defaultColumnHistogram(), ReplayPosition.NONE);
+    }
+
+    // when there is no max timestamp recorded, default to max long
+    private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition)
+    {
+        this(rowSizes, columnCounts, replayPosition, Long.MAX_VALUE);
+    }
+
+    private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp)
+    {
+        this.estimatedRowSize = rowSizes;
+        this.estimatedColumnCount = columnCounts;
+        this.replayPosition = replayPosition;
+        this.maxTimestamp = maxTimestamp;
+    }
+
+    public static SSTableMetadata createDefaultInstance()
+    {
+        return new SSTableMetadata();
+    }
+
+    public static Collector createCollector()
+    {
+        return new Collector();
+    }
+
+    public EstimatedHistogram getEstimatedRowSize()
+    {
+        return estimatedRowSize;
+    }
+
+    public EstimatedHistogram getEstimatedColumnCount()
+    {
+        return estimatedColumnCount;
+    }
+
+    public ReplayPosition getReplayPosition()
+    {
+        return replayPosition;
+    }
+
+    public long getMaxTimestamp()
+    {
+        return maxTimestamp;
+    }
+
+    static EstimatedHistogram defaultColumnHistogram()
+    {
+        return new EstimatedHistogram(114);
+    }
+
+    static EstimatedHistogram defaultRowHistogram()
+    {
+        return new EstimatedHistogram(150);
+    }
+
+    public static class Collector
+    {
+        protected EstimatedHistogram estimatedRowSize;
+        protected EstimatedHistogram estimatedColumnCount;
+        protected ReplayPosition replayPosition;
+        protected long maxTimestamp;
+
+        private Collector()
+        {
+            this.estimatedRowSize = defaultColumnHistogram();
+            this.estimatedColumnCount = defaultRowHistogram();
+            this.replayPosition = ReplayPosition.NONE;
+            this.maxTimestamp = Long.MIN_VALUE;
+        }
+
+        public void addRowSize(long rowSize)
+        {
+            estimatedRowSize.add(rowSize);
+        }
+
+        public void addColumnCount(long columnCount)
+        {
+            estimatedColumnCount.add(columnCount);
+        }
+
+        public void updateMaxTimestamp(long potentialMax)
+        {
+            maxTimestamp = Math.max(maxTimestamp, potentialMax);
+        }
+
+        public SSTableMetadata finalizeMetadata()
+        {
+            return new SSTableMetadata(estimatedRowSize, estimatedColumnCount, replayPosition, maxTimestamp);
+        }
+
+        public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
+        {
+            this.estimatedRowSize = estimatedRowSize;
+            return this;
+        }
+
+        public Collector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
+        {
+            this.estimatedColumnCount = estimatedColumnCount;
+            return this;
+        }
+
+        public Collector replayPosition(ReplayPosition replayPosition)
+        {
+            this.replayPosition = replayPosition;
+            return this;
+        }
+    }
+
+    public static class SSTableMetadataSerializer implements ICompactSerializer2<SSTableMetadata>
+    {
+        private static final Logger logger = LoggerFactory.getLogger(SSTableMetadataSerializer.class);
+
+        public void serialize(SSTableMetadata sstableStats, DataOutput dos) throws IOException
+        {
+            EstimatedHistogram.serializer.serialize(sstableStats.getEstimatedRowSize(), dos);
+            EstimatedHistogram.serializer.serialize(sstableStats.getEstimatedColumnCount(), dos);
+            ReplayPosition.serializer.serialize(sstableStats.getReplayPosition(), dos);
+            dos.writeLong(sstableStats.getMaxTimestamp());
+        }
+
+        public SSTableMetadata deserialize(Descriptor descriptor) throws IOException
+        {
+            File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
+            if (!statsFile.exists())
+            {
+                logger.debug("No sstable stats for {}", descriptor);
+                return new SSTableMetadata();
+            }
+
+            DataInputStream dis = null;
+            try
+            {
+                logger.debug("Load metadata for {}", descriptor);
+                dis = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
+
+                if (!descriptor.usesHistogramAndReplayPositionStatsFile)
+                  return deserialize(dis);
+
+                EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(dis);
+                EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(dis);
+                ReplayPosition replayPosition = descriptor.hasReplayPosition()
+                                              ? ReplayPosition.serializer.deserialize(dis)
+                                              : ReplayPosition.NONE;
+
+                return new SSTableMetadata(rowSizes, columnCounts, replayPosition);
+            }
+            finally
+            {
+                FileUtils.closeQuietly(dis);
+            }
+        }
+
+        public SSTableMetadata deserialize(DataInput dis) throws IOException
+        {
+            EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(dis);
+            EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(dis);
+            ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(dis);
+            long maxTimestamp = dis.readLong();
+            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp);
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Jul  7 01:50:40 2011
@@ -121,6 +121,8 @@ public class SSTableReader extends SSTab
 
     private volatile SSTableDeletingReference phantomReference;
 
+    private final SSTableMetadata sstableMetadata;
+
     public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
     {
         long count = 0;
@@ -157,35 +159,20 @@ public class SSTableReader extends SSTab
         long start = System.currentTimeMillis();
         logger.info("Opening " + descriptor);
 
-        EstimatedHistogram rowSizes;
-        EstimatedHistogram columnCounts;
-        File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
-        ReplayPosition rp = ReplayPosition.NONE;
-        if (components.contains(Component.STATS) && statsFile.exists())
-        {
-            DataInputStream dis = null;
-            try
-            {
-                logger.debug("Load metadata for {}", descriptor);
-                dis = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
-                rowSizes = EstimatedHistogram.serializer.deserialize(dis);
-                columnCounts = EstimatedHistogram.serializer.deserialize(dis);
-                if (descriptor.hasReplayPosition())
-                    rp = ReplayPosition.serializer.deserialize(dis);
-            }
-            finally
-            {
-                FileUtils.closeQuietly(dis);
-            }
-        }
-        else
-        {
-            logger.debug("No statistics for {}", descriptor);
-            rowSizes = SSTable.defaultRowHistogram();
-            columnCounts = SSTable.defaultColumnHistogram();
-        }
-
-        SSTableReader sstable = new SSTableReader(descriptor, components, metadata, rp, partitioner, null, null, null, null, System.currentTimeMillis(), rowSizes, columnCounts);
+        SSTableMetadata sstableMetadata = components.contains(Component.STATS)
+                                        ? SSTableMetadata.serializer.deserialize(descriptor)
+                                        : SSTableMetadata.createDefaultInstance();
+
+        SSTableReader sstable = new SSTableReader(descriptor,
+                                                  components,
+                                                  metadata,
+                                                  partitioner,
+                                                  null,
+                                                  null,
+                                                  null,
+                                                  null,
+                                                  System.currentTimeMillis(),
+                                                  sstableMetadata);
         sstable.setTrackedBy(tracker);
 
         // versions before 'c' encoded keys as utf-16 before hashing to the filter
@@ -210,28 +197,43 @@ public class SSTableReader extends SSTab
     /**
      * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
      */
-    static SSTableReader internalOpen(Descriptor desc, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, Filter bf, long maxDataAge, EstimatedHistogram rowsize,
-                                      EstimatedHistogram columncount) throws IOException
-    {
-        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null;
-        return new SSTableReader(desc, components, metadata, replayPosition, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount);
+    static SSTableReader internalOpen(Descriptor desc,
+                                      Set<Component> components,
+                                      CFMetaData metadata,
+                                      IPartitioner partitioner,
+                                      SegmentedFile ifile,
+                                      SegmentedFile dfile,
+                                      IndexSummary isummary,
+                                      Filter bf,
+                                      long maxDataAge,
+                                      SSTableMetadata sstableMetadata) throws IOException
+    {
+        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
+        return new SSTableReader(desc,
+                                 components,
+                                 metadata,
+                                 partitioner,
+                                 ifile, dfile,
+                                 isummary,
+                                 bf,
+                                 maxDataAge,
+                                 sstableMetadata);
     }
 
     private SSTableReader(Descriptor desc,
                           Set<Component> components,
                           CFMetaData metadata,
-                          ReplayPosition replayPosition,
                           IPartitioner partitioner,
                           SegmentedFile ifile,
                           SegmentedFile dfile,
                           IndexSummary indexSummary,
                           Filter bloomFilter,
                           long maxDataAge,
-                          EstimatedHistogram rowSizes,
-                          EstimatedHistogram columnCounts)
+                          SSTableMetadata sstableMetadata)
     throws IOException
     {
-        super(desc, components, metadata, replayPosition, partitioner, rowSizes, columnCounts);
+        super(desc, components, metadata, partitioner);
+        this.sstableMetadata = sstableMetadata;
         this.maxDataAge = maxDataAge;
 
         this.ifile = ifile;
@@ -773,4 +775,24 @@ public class SSTableReader extends SSTab
     {
         return keyCache;
     }
+
+    public EstimatedHistogram getEstimatedRowSize()
+    {
+        return sstableMetadata.getEstimatedRowSize();
+    }
+
+    public EstimatedHistogram getEstimatedColumnCount()
+    {
+        return sstableMetadata.getEstimatedColumnCount();
+    }
+
+    public ReplayPosition getReplayPosition()
+    {
+        return sstableMetadata.getReplayPosition();
+    }
+
+    public long getMaxTimestamp()
+    {
+        return sstableMetadata.getMaxTimestamp();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Jul  7 01:50:40 2011
@@ -27,20 +27,19 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.FileUtils;
@@ -48,7 +47,7 @@ import org.apache.cassandra.io.util.Segm
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class SSTableWriter extends SSTable
@@ -60,24 +59,31 @@ public class SSTableWriter extends SSTab
     private final BufferedRandomAccessFile dataFile;
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
+    private SSTableMetadata.Collector sstableMetadataCollector;
 
     public SSTableWriter(String filename, long keyCount) throws IOException
     {
-        this(filename, keyCount, DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner(), ReplayPosition.NONE);
+        this(filename,
+             keyCount,
+             DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)),
+             StorageService.getPartitioner(),
+             SSTableMetadata.createCollector());
     }
 
-    public SSTableWriter(String filename, long keyCount, CFMetaData metadata, IPartitioner partitioner, ReplayPosition replayPosition) throws IOException
+    public SSTableWriter(String filename,
+                         long keyCount,
+                         CFMetaData metadata,
+                         IPartitioner partitioner,
+                         SSTableMetadata.Collector sstableMetadataCollector) throws IOException
     {
         super(Descriptor.fromFilename(filename),
               new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS)),
               metadata,
-              replayPosition,
-              partitioner,
-              SSTable.defaultRowHistogram(),
-              SSTable.defaultColumnHistogram());
+              partitioner);
         iwriter = new IndexWriter(descriptor, partitioner, keyCount);
         dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
         dataFile = new BufferedRandomAccessFile(new File(getFilename()), "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
+        this.sstableMetadataCollector = sstableMetadataCollector;
     }
     
     public void mark()
@@ -130,8 +136,10 @@ public class SSTableWriter extends SSTab
         long currentPosition = beforeAppend(row.key);
         ByteBufferUtil.writeWithShortLength(row.key.key, dataFile);
         row.write(dataFile);
-        estimatedRowSize.add(dataFile.getFilePointer() - currentPosition);
-        estimatedColumnCount.add(row.columnCount());
+        // max timestamp is not collected here, because we want to avoid deserializing an EchoedRow
+        // instead, it is collected when calling ColumnFamilyStore.createCompactionWriter
+        sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
+        sstableMetadataCollector.addColumnCount(row.columnCount());
         afterAppend(row.key, currentPosition);
         return currentPosition;
     }
@@ -154,8 +162,10 @@ public class SSTableWriter extends SSTab
         // finally, reset for next row
         dataFile.seek(endPosition);
         afterAppend(decoratedKey, startPosition);
-        estimatedRowSize.add(endPosition - startPosition);
-        estimatedColumnCount.add(columnCount);
+        // track max column timestamp
+        sstableMetadataCollector.updateMaxTimestamp(cf.maxTimestamp());
+        sstableMetadataCollector.addRowSize(endPosition - startPosition);
+        sstableMetadataCollector.addColumnCount(columnCount);
     }
 
     public void append(DecoratedKey decoratedKey, ByteBuffer value) throws IOException
@@ -204,7 +214,8 @@ public class SSTableWriter extends SSTab
         FileUtils.truncate(dataFile.getPath(), position);
 
         // write sstable statistics
-        writeMetadata(descriptor, estimatedRowSize, estimatedColumnCount, replayPosition);
+        SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata();
+        writeMetadata(descriptor, sstableMetadata);
 
         // remove the 'tmp' marker from all components
         final Descriptor newdesc = rename(descriptor, components);
@@ -212,21 +223,28 @@ public class SSTableWriter extends SSTab
         // finalize in-memory state for the reader
         SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
         SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
-        SSTableReader sstable = SSTableReader.internalOpen(newdesc, components, metadata, replayPosition, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount);
+        SSTableReader sstable = SSTableReader.internalOpen(newdesc,
+                                                           components,
+                                                           metadata,
+                                                           partitioner,
+                                                           ifile,
+                                                           dfile,
+                                                           iwriter.summary,
+                                                           iwriter.bf,
+                                                           maxDataAge,
+                                                           sstableMetadata);
         iwriter = null;
         dbuilder = null;
         return sstable;
     }
 
-    private static void writeMetadata(Descriptor desc, EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition rp) throws IOException
+    private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata) throws IOException
     {
         BufferedRandomAccessFile out = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_STATS)),
                                                                      "rw",
                                                                      BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
                                                                      true);
-        EstimatedHistogram.serializer.serialize(rowSizes, out);
-        EstimatedHistogram.serializer.serialize(columnCounts, out);
-        ReplayPosition.serializer.serialize(rp, out);
+        SSTableMetadata.serializer.serialize(sstableMetadata, out);
         out.close();
     }
 
@@ -374,6 +392,7 @@ public class SSTableWriter extends SSTab
 
         protected IndexWriter iwriter;
         protected ColumnFamilyStore cfs;
+        protected final SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector();
 
         RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
         {
@@ -480,11 +499,10 @@ public class SSTableWriter extends SSTab
 
         protected long doIndexing() throws IOException
         {
-            EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
-            EstimatedHistogram columnCounts = SSTable.defaultColumnHistogram();
             long rows = 0;
             DecoratedKey key;
             long rowPosition = 0;
+            ColumnFamily cf = ColumnFamily.create(cfs.metadata);
             while (rowPosition < dfile.length())
             {
                 // read key
@@ -497,19 +515,23 @@ public class SSTableWriter extends SSTab
 
                 IndexHelper.skipBloomFilter(dfile);
                 IndexHelper.skipIndex(dfile);
-                ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(cfs.metadata), dfile);
+                ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
+
+                // We can't simply get the max column timestamp here by calling cf.maxTimestamp() because
+                // the columns have not been deserialized yet. observeColumnsInSSTable() will deserialize
+                // and get the max timestamp instead.
+                ColumnFamily.serializer().observeColumnsInSSTable(cfs.metadata, dfile, sstableMetadataCollector);
 
                 // don't move that statement around, it expects the dfile to be before the columns
                 updateCache(key, dataSize, null);
 
-                rowSizes.add(dataSize);
-                columnCounts.add(dfile.readInt());
+                sstableMetadataCollector.addRowSize(dataSize);
                 
                 dfile.seek(rowPosition);
 
                 rows++;
             }
-            writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
+            writeMetadata(desc, sstableMetadataCollector.finalizeMetadata());
             return rows;
         }
 
@@ -543,8 +565,6 @@ public class SSTableWriter extends SSTab
         @Override
         protected long doIndexing() throws IOException
         {
-            EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
-            EstimatedHistogram columnCounts = SSTable.defaultColumnHistogram();
             long rows = 0L;
             DecoratedKey key;
 
@@ -561,8 +581,9 @@ public class SSTableWriter extends SSTab
                 AbstractCompactedRow row = controller.getCompactedRow(iter);
                 updateCache(key, dataSize, row);
 
-                rowSizes.add(dataSize);
-                columnCounts.add(row.columnCount());
+                sstableMetadataCollector.addRowSize(dataSize);
+                sstableMetadataCollector.addColumnCount(row.columnCount());
+                sstableMetadataCollector.updateMaxTimestamp(row.maxTimestamp());
 
                 // update index writer
                 iwriter.afterAppend(key, writerDfile.getFilePointer());
@@ -572,7 +593,7 @@ public class SSTableWriter extends SSTab
 
                 rows++;
             }
-            writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
+            writeMetadata(desc, sstableMetadataCollector.finalizeMetadata());
 
             if (writerDfile.getFilePointer() != dfile.getFilePointer())
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java Thu Jul  7 01:50:40 2011
@@ -188,6 +188,12 @@ public class EstimatedHistogram
         return buckets.get(buckets.length() - 1) > 0;
     }
 
+    public boolean equals(EstimatedHistogram o)
+    {
+        return Arrays.equals(getBucketOffsets(), o.getBucketOffsets()) &&
+               Arrays.equals(getBuckets(false), o.getBuckets(false));
+    }
+
     public static class EstimatedHistogramSerializer implements ICompactSerializer2<EstimatedHistogram>
     {
         public void serialize(EstimatedHistogram eh, DataOutput dos) throws IOException

Modified: cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/Util.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/Util.java Thu Jul  7 01:50:40 2011
@@ -103,8 +103,13 @@ public class Util
     
     public static List<Row> getRangeSlice(ColumnFamilyStore cfs) throws IOException, ExecutionException, InterruptedException
     {
+        return getRangeSlice(cfs, null);
+    }
+
+    public static List<Row> getRangeSlice(ColumnFamilyStore cfs, ByteBuffer superColumn) throws IOException, ExecutionException, InterruptedException
+    {
         Token min = StorageService.getPartitioner().getMinimumToken();
-        return cfs.getRangeSlice(null,
+        return cfs.getRangeSlice(superColumn,
                                  new Bounds(min, min),
                                  10000,
                                  new IdentityQueryFilter());

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java Thu Jul  7 01:50:40 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.net.InetAddress;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -41,6 +42,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
@@ -75,12 +77,19 @@ public class CleanupTest extends Cleanup
 
         // insert data and verify we get it back w/ range query
         fillCF(cfs, LOOPS);
+
+        // record max timestamps of the sstables pre-cleanup
+        List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
+
         rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
         assertEquals(LOOPS, rows.size());
 
         // with one token in the ring, owned by the local node, cleanup should be a no-op
         CompactionManager.instance.performCleanup(cfs, new NodeId.OneShotRenewer());
 
+        // ensure max timestamp of the sstables are retained post-cleanup
+        assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
+
         // check data is still there
         rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
         assertEquals(LOOPS, rows.size());
@@ -151,4 +160,12 @@ public class CleanupTest extends Cleanup
 
         cfs.forceBlockingFlush();
     }
+
+    protected List<Long> getMaxTimestampList(ColumnFamilyStore cfs)
+    {
+        List<Long> list = new LinkedList<Long>();
+        for (SSTableReader sstable : cfs.getSSTables())
+            list.add(sstable.getMaxTimestamp());
+        return list;
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Thu Jul  7 01:50:40 2011
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Collection;
@@ -45,33 +46,100 @@ public class CompactionsTest extends Cle
     public static final String TABLE1 = "Keyspace1";
 
     @Test
-    public void testCompactions() throws IOException, ExecutionException, InterruptedException
+    public void testStandardColumnCompactions() throws IOException, ExecutionException, InterruptedException
     {
         // this test does enough rows to force multiple block indexes to be used
         Table table = Table.open(TABLE1);
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
 
         final int ROWS_PER_SSTABLE = 10;
-        final int SSTABLES = (DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE);
+        final int SSTABLES = DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE;
 
         // disable compaction while flushing
         store.disableAutoCompaction();
 
+        long maxTimestampExpected = Long.MIN_VALUE;
         Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
         for (int j = 0; j < SSTABLES; j++) {
             for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
                 DecoratedKey key = Util.dk(String.valueOf(i % 2));
                 RowMutation rm = new RowMutation(TABLE1, key.key);
-                rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(String.valueOf(i / 2))), ByteBufferUtil.EMPTY_BYTE_BUFFER, j * ROWS_PER_SSTABLE + i);
+                long timestamp = j * ROWS_PER_SSTABLE + i;
+                rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(String.valueOf(i / 2))),
+                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                       timestamp);
+                maxTimestampExpected = Math.max(timestamp, maxTimestampExpected);
                 rm.apply();
                 inserted.add(key);
             }
             store.forceBlockingFlush();
+            assertMaxTimestamp(store, maxTimestampExpected);
             assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size());
         }
+
+        forceCompactions(store);
+
+        assertEquals(inserted.size(), Util.getRangeSlice(store).size());
+
+        // make sure max timestamp of compacted sstables is recorded properly after compaction.
+        assertMaxTimestamp(store, maxTimestampExpected);
+    }
+
+
+    @Test
+    public void testSuperColumnCompactions() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open(TABLE1);
+        ColumnFamilyStore store = table.getColumnFamilyStore("Super1");
+
+        final int ROWS_PER_SSTABLE = 10;
+        final int SSTABLES = DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE;
+
+        //disable compaction while flushing
+        store.disableAutoCompaction();
+
+        long maxTimestampExpected = Long.MIN_VALUE;
+        Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+        ByteBuffer superColumn = ByteBufferUtil.bytes("TestSuperColumn");
+        for (int j = 0; j < SSTABLES; j++) {
+            for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+                DecoratedKey key = Util.dk(String.valueOf(i % 2));
+                RowMutation rm = new RowMutation(TABLE1, key.key);
+                long timestamp = j * ROWS_PER_SSTABLE + i;
+                rm.add(new QueryPath("Super1", superColumn, ByteBufferUtil.bytes(String.valueOf(i / 2))),
+                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                       timestamp);
+                maxTimestampExpected = Math.max(timestamp, maxTimestampExpected);
+                rm.apply();
+                inserted.add(key);
+            }
+            store.forceBlockingFlush();
+            assertMaxTimestamp(store, maxTimestampExpected);
+            assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store, superColumn).size());
+        }
+
+        forceCompactions(store);
+
+        assertEquals(inserted.size(), Util.getRangeSlice(store, superColumn).size());
+
+        // make sure max timestamp of compacted sstables is recorded properly after compaction.
+        assertMaxTimestamp(store, maxTimestampExpected);
+    }
+
+    public void assertMaxTimestamp(ColumnFamilyStore store, long maxTimestampExpected)
+    {
+        long maxTimestampObserved = Long.MIN_VALUE;
+        for (SSTableReader sstable : store.getSSTables())
+            maxTimestampObserved = Math.max(sstable.getMaxTimestamp(), maxTimestampObserved);
+        assertEquals(maxTimestampExpected, maxTimestampObserved);
+    }
+
+    private void forceCompactions(ColumnFamilyStore store) throws ExecutionException, InterruptedException
+    {
         // re-enable compaction with thresholds low enough to force a few rounds
         store.setMinimumCompactionThreshold(2);
         store.setMaximumCompactionThreshold(4);
+
         // loop submitting parallel compactions until they all return 0
         while (true)
         {
@@ -91,7 +159,6 @@ public class CompactionsTest extends Cle
         {
             CompactionManager.instance.performMaximal(store);
         }
-        assertEquals(inserted.size(), Util.getRangeSlice(store).size());
     }
 
     @Test

Added: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java?rev=1143627&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java Thu Jul  7 01:50:40 2011
@@ -0,0 +1,73 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.sstable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+public class SSTableMetadataSerializerTest
+{
+    @Test
+    public void testSerialization() throws IOException
+    {
+        EstimatedHistogram rowSizes = new EstimatedHistogram(
+            new long[] { 1L, 2L },
+            new long[] { 3L, 4L, 5L });
+        EstimatedHistogram columnCounts = new EstimatedHistogram(
+            new long[] { 6L, 7L },
+            new long[] { 8L, 9L, 10L });
+        ReplayPosition rp = new ReplayPosition(11L, 12);
+        long maxTimestamp = 4162517136L;
+
+        SSTableMetadata.Collector collector = SSTableMetadata.createCollector()
+                                                             .estimatedRowSize(rowSizes)
+                                                             .estimatedColumnCount(columnCounts)
+                                                             .replayPosition(rp);
+        collector.updateMaxTimestamp(maxTimestamp);
+        SSTableMetadata originalMetadata = collector.finalizeMetadata();
+
+        ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(byteOutput);
+
+        SSTableMetadata.serializer.serialize(originalMetadata, dos);
+
+        ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
+        DataInputStream dis = new DataInputStream(byteInput);
+
+        SSTableMetadata stats = SSTableMetadata.serializer.deserialize(dis);
+
+        assert stats.getEstimatedRowSize().equals(originalMetadata.getEstimatedRowSize());
+        assert stats.getEstimatedRowSize().equals(rowSizes);
+        assert stats.getEstimatedColumnCount().equals(originalMetadata.getEstimatedColumnCount());
+        assert stats.getEstimatedColumnCount().equals(columnCounts);
+        assert stats.getReplayPosition().equals(originalMetadata.getReplayPosition());
+        assert stats.getReplayPosition().equals(rp);
+        assert stats.getMaxTimestamp() == maxTimestamp;
+        assert stats.getMaxTimestamp() == originalMetadata.getMaxTimestamp();
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java Thu Jul  7 01:50:40 2011
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionExc
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.context.CounterContext;
@@ -54,10 +55,39 @@ public class SSTableWriterCommutativeTes
     private static final CounterColumnType ctype = CounterColumnType.instance;
 
     @Test
-    public void testRecoverAndOpenCommutative() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+    public void testStandardColumn() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+    {
+        testRecoverAndOpenCommutative(false, false);
+    }
+
+    @Test
+    public void testStandardColumnExceedMemoryLimit() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+    {
+        testRecoverAndOpenCommutative(false, true);
+    }
+
+
+    @Test
+    public void testSuperColumn() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+    {
+        testRecoverAndOpenCommutative(true, false);
+    }
+
+    @Test
+    public void testSuperColumnExceedMemoryLimit() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+    {
+        testRecoverAndOpenCommutative(true, true);
+    }
+
+    /**
+     * test recovery and opening of commutative columns
+     * @param superColumns whether to test with super columns
+     * @param forceExceedMemoryLimit if true, sets "in_memory_compaction_limit_in_mb" to 0 to force use of LazilyCompactedRow, otherwise, PreCompactedRow is used
+     */
+    public void testRecoverAndOpenCommutative(boolean superColumns, boolean forceExceedMemoryLimit) throws IOException, ExecutionException, InterruptedException, UnknownHostException
     {
         String keyspace = "Keyspace1";
-        String cfname   = "Counter1";
+        String cfname   = superColumns ? "SuperCounter1" : "Counter1";
 
         Map<String, ColumnFamily> entries = new HashMap<String, ColumnFamily>();
         Map<String, ColumnFamily> cleanedEntries = new HashMap<String, ColumnFamily>();
@@ -65,6 +95,9 @@ public class SSTableWriterCommutativeTes
         ColumnFamily cf;
         ColumnFamily cfCleaned;
         CounterContext.ContextState state;
+        IColumn column;
+        IColumn columnCleaned;
+        ByteBuffer superColumnName;
 
         // key: k
         cf = ColumnFamily.create(keyspace, cfname);
@@ -74,16 +107,36 @@ public class SSTableWriterCommutativeTes
         state.writeElement(NodeId.fromInt(4), 4L, 2L);
         state.writeElement(NodeId.fromInt(6), 3L, 3L);
         state.writeElement(NodeId.fromInt(8), 2L, 4L);
-        cf.addColumn(new CounterColumn( ByteBufferUtil.bytes("x"), state.context, 0L));
-        cfCleaned.addColumn(new CounterColumn( ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 0L));
+        column = new CounterColumn( ByteBufferUtil.bytes("x"), state.context, 0L);
+        columnCleaned = new CounterColumn( ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 0L);
+
+        if (superColumns)
+        {
+            superColumnName = ByteBufferUtil.bytes("TestSuperColumn1");
+            column = superCounterColumnify(superColumnName, column);
+            columnCleaned = superCounterColumnify(superColumnName, columnCleaned);
+        }
+
+        cf.addColumn(column);
+        cfCleaned.addColumn(columnCleaned);
 
         state = CounterContext.ContextState.allocate(4, 1);
         state.writeElement(NodeId.fromInt(1), 7L, 12L);
         state.writeElement(NodeId.fromInt(2), 5L, 3L, true);
         state.writeElement(NodeId.fromInt(3), 2L, 33L);
         state.writeElement(NodeId.fromInt(9), 1L, 24L);
-        cf.addColumn(new CounterColumn( ByteBufferUtil.bytes("y"), state.context, 0L));
-        cfCleaned.addColumn(new CounterColumn( ByteBufferUtil.bytes("y"), cc.clearAllDelta(state.context), 0L));
+        column = new CounterColumn( ByteBufferUtil.bytes("y"), state.context, 0L);
+        columnCleaned = new CounterColumn( ByteBufferUtil.bytes("y"), cc.clearAllDelta(state.context), 0L);
+
+        if (superColumns)
+        {
+            superColumnName = ByteBufferUtil.bytes("TestSuperColumn2");
+            column = superCounterColumnify(superColumnName, column);
+            columnCleaned = superCounterColumnify(superColumnName, columnCleaned);
+        }
+
+        cf.addColumn(column);
+        cfCleaned.addColumn(columnCleaned);
 
         entries.put("k", cf);
         cleanedEntries.put("k", cfCleaned);
@@ -96,32 +149,59 @@ public class SSTableWriterCommutativeTes
         state.writeElement(NodeId.fromInt(4), 4L, 2L);
         state.writeElement(NodeId.fromInt(6), 3L, 3L);
         state.writeElement(NodeId.fromInt(8), 2L, 4L);
-        cf.addColumn(new CounterColumn( ByteBufferUtil.bytes("x"), state.context, 0L));
-        cfCleaned.addColumn(new CounterColumn( ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 0L));
+        column = new CounterColumn( ByteBufferUtil.bytes("x"), state.context, 0L);
+        columnCleaned = new CounterColumn( ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 0L);
+
+        if (superColumns)
+        {
+            superColumnName = ByteBufferUtil.bytes("TestSuperColumn3");
+            column = superCounterColumnify(superColumnName, column);
+            columnCleaned = superCounterColumnify(superColumnName, columnCleaned);
+        }
+
+        cf.addColumn(column);
+        cfCleaned.addColumn(columnCleaned);
 
         state = CounterContext.ContextState.allocate(3, 0);
         state.writeElement(NodeId.fromInt(1), 7L, 12L);
         state.writeElement(NodeId.fromInt(3), 2L, 33L);
         state.writeElement(NodeId.fromInt(9), 1L, 24L);
-        cf.addColumn(new CounterColumn( ByteBufferUtil.bytes("y"), state.context, 0L));
-        cfCleaned.addColumn(new CounterColumn( ByteBufferUtil.bytes("y"), cc.clearAllDelta(state.context), 0L));
+        column = new CounterColumn( ByteBufferUtil.bytes("y"), state.context, 0L);
+        columnCleaned = new CounterColumn( ByteBufferUtil.bytes("y"), cc.clearAllDelta(state.context), 0L);
+
+        if (superColumns)
+        {
+            superColumnName = ByteBufferUtil.bytes("TestSuperColumn4");
+            column = superCounterColumnify(superColumnName, column);
+            columnCleaned = superCounterColumnify(superColumnName, columnCleaned);
+        }
+
+        cf.addColumn(column);
+        cfCleaned.addColumn(columnCleaned);
 
         entries.put("l", cf);
         cleanedEntries.put("l", cfCleaned);
 
         // write out unmodified CF
         SSTableReader orig = SSTableUtils.prepare().ks(keyspace).cf(cfname).generation(0).write(entries);
+        long origMaxTimestamp = orig.getMaxTimestamp();
 
         // whack the index to trigger the recover
         FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.PRIMARY_INDEX));
         FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
 
+        // set in_memory_compaction_limit_in_mb small to force use of LazilyCompactedRow, otherwise, PreCompactedRow is used
+        DatabaseDescriptor.setInMemoryCompactionLimit(forceExceedMemoryLimit ? 0 : 256);
+
         // re-build inline
         SSTableReader rebuilt = CompactionManager.instance.submitSSTableBuild(
             orig.descriptor,
             OperationType.AES
             ).get();
 
+        // ensure max timestamp is captured during rebuild
+        assert rebuilt.getMaxTimestamp() == origMaxTimestamp;
+
         // write out cleaned CF
         SSTableReader cleaned = SSTableUtils.prepare().ks(keyspace).cf(cfname).generation(0).write(cleanedEntries);
 
@@ -136,4 +216,11 @@ public class SSTableWriterCommutativeTes
         assert origFile.getFilePointer() == origFile.length();
         assert cleanedFile.getFilePointer() == cleanedFile.length();
     }
+
+    private IColumn superCounterColumnify(ByteBuffer superColumnName, IColumn column)
+    {
+        SuperColumn superColumn = new SuperColumn(superColumnName, CounterColumnType.instance);
+        superColumn.addColumn(column);
+        return superColumn;
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java Thu Jul  7 01:50:40 2011
@@ -31,22 +31,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import org.junit.Test;
+
 import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.IFilter;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
-import org.junit.Test;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SSTableWriterTest extends CleanupHelper {
@@ -65,7 +67,7 @@ public class SSTableWriterTest extends C
         // "k2"
         cf = ColumnFamily.create("Keyspace1", "Indexed1");        
         cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 0));
-        cf.addColumn(new Column(ByteBufferUtil.bytes("anydate"), ByteBufferUtil.bytes(1L), 0));
+        cf.addColumn(new Column(ByteBufferUtil.bytes("anydate"), ByteBufferUtil.bytes(1L), 1234L));
         entries.put("k2", cf);        
         
         // "k3"
@@ -74,12 +76,17 @@ public class SSTableWriterTest extends C
         entries.put("k3", cf);        
         
         SSTableReader orig = SSTableUtils.prepare().cf("Indexed1").write(entries);        
+
         // whack the index to trigger the recover
         FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.PRIMARY_INDEX));
         FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
 
         SSTableReader sstr = CompactionManager.instance.submitSSTableBuild(orig.descriptor, OperationType.AES).get();
         assert sstr != null;
+
+        // ensure max timestamp is captured during rebuild
+        assert sstr.getMaxTimestamp() == 1234L;
+
         ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Indexed1");
         cfs.addSSTable(sstr);
         cfs.buildSecondaryIndexes(cfs.getSSTables(), cfs.getIndexedColumns());
@@ -94,4 +101,40 @@ public class SSTableWriterTest extends C
         assertEquals("IndexExpression should return two rows on recoverAndOpen", 2, rows.size());
         assertTrue("First result should be 'k1'",ByteBufferUtil.bytes("k1").equals(rows.get(0).key.key));
     }
+
+    @Test
+    public void testRecoverAndOpenSuperColumn() throws IOException, ExecutionException, InterruptedException
+    {
+        // add data via the usual write path
+        RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("k1"));
+        ByteBuffer superColumnName = ByteBufferUtil.bytes("TestSuperColumn1");
+        rm.add(new QueryPath("Super1", superColumnName, ByteBufferUtil.bytes("birthdate")), ByteBufferUtil.bytes(1L), 0);
+        rm.apply();
+
+        // and add an sstable outside the right path (as if via streaming)
+        Map<String, ColumnFamily> entries = new HashMap<String, ColumnFamily>();
+        ColumnFamily cf = ColumnFamily.create("Keyspace1", "Super1");
+        SuperColumn superColumn = new SuperColumn(superColumnName, LongType.instance);
+        superColumn.addColumn(new Column(ByteBufferUtil.bytes("city"), ByteBufferUtil.bytes(1L), 4321L));
+        cf.addColumn(superColumn);
+        entries.put("k2", cf);
+
+        cf = ColumnFamily.create("Keyspace1", "Super1");
+        superColumn = new SuperColumn(ByteBufferUtil.bytes("TestSuperColumn2"), LongType.instance);
+        superColumn.addColumn(new Column(ByteBufferUtil.bytes("country"), ByteBufferUtil.bytes(1L), 1234L));
+        superColumn.addColumn(new Column(ByteBufferUtil.bytes("address"), ByteBufferUtil.bytes(1L), 0L));
+        cf.addColumn(superColumn);
+        entries.put("k3", cf);
+
+        SSTableReader orig = SSTableUtils.prepare().cf("Super1").write(entries);
+
+        // whack the index to trigger the recover
+        FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.PRIMARY_INDEX));
+        FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
+
+        SSTableReader sstr = CompactionManager.instance.submitSSTableBuild(orig.descriptor, OperationType.AES).get();
+
+        // ensure max timestamp is captured during rebuild
+        assert sstr.getMaxTimestamp() == 4321L;
+    }
 }