You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/12/13 23:52:54 UTC

[1/2] SSTable metadata(Stats.db) format change

Updated Branches:
  refs/heads/trunk 84d85ee70 -> 74bf5aa16


http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
new file mode 100644
index 0000000..a691591
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+/**
+ * Serializer for SSTable from legacy versions
+ */
+@Deprecated
+public class LegacyMetadataSerializer extends MetadataSerializer
+{
+    public static final double NO_BLOOM_FILTER_FP_CHANCE = -1.0;
+
+    /**
+     * Legacy serialization is only used for SSTable level reset.
+     */
+    @Override
+    public void serialize(Map<MetadataType, MetadataComponent> components, DataOutput out) throws IOException
+    {
+        ValidationMetadata validation = (ValidationMetadata) components.get(MetadataType.VALIDATION);
+        StatsMetadata stats = (StatsMetadata) components.get(MetadataType.STATS);
+        CompactionMetadata compaction = (CompactionMetadata) components.get(MetadataType.COMPACTION);
+
+        assert validation != null && stats != null && compaction != null && validation.partitioner != null;
+
+        EstimatedHistogram.serializer.serialize(stats.estimatedRowSize, out);
+        EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
+        ReplayPosition.serializer.serialize(stats.replayPosition, out);
+        out.writeLong(stats.minTimestamp);
+        out.writeLong(stats.maxTimestamp);
+        out.writeInt(stats.maxLocalDeletionTime);
+        out.writeDouble(validation.bloomFilterFPChance);
+        out.writeDouble(stats.compressionRatio);
+        out.writeUTF(validation.partitioner);
+        out.writeInt(compaction.ancestors.size());
+        for (Integer g : compaction.ancestors)
+            out.writeInt(g);
+        StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out);
+        out.writeInt(stats.sstableLevel);
+        out.writeInt(stats.minColumnNames.size());
+        for (ByteBuffer columnName : stats.minColumnNames)
+            ByteBufferUtil.writeWithShortLength(columnName, out);
+        out.writeInt(stats.maxColumnNames.size());
+        for (ByteBuffer columnName : stats.maxColumnNames)
+            ByteBufferUtil.writeWithShortLength(columnName, out);
+    }
+
+    /**
+     * Legacy serializer deserialize all components no matter what types are specified.
+     */
+    @Override
+    public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
+    {
+        Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
+
+        File statsFile = new File(descriptor.filenameFor(Component.STATS));
+        if (!statsFile.exists() && types.contains(MetadataType.STATS))
+        {
+            components.put(MetadataType.STATS, MetadataCollector.defaultStatsMetadata());
+        }
+        else
+        {
+            try (DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile))))
+            {
+                EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
+                EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
+                ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+                long minTimestamp = in.readLong();
+                long maxTimestamp = in.readLong();
+                int maxLocalDeletionTime = descriptor.version.tracksMaxLocalDeletionTime ? in.readInt() : Integer.MAX_VALUE;
+                double bloomFilterFPChance = descriptor.version.hasBloomFilterFPChance ? in.readDouble() : NO_BLOOM_FILTER_FP_CHANCE;
+                double compressionRatio = in.readDouble();
+                String partitioner = in.readUTF();
+                int nbAncestors = in.readInt();
+                Set<Integer> ancestors = new HashSet<>(nbAncestors);
+                for (int i = 0; i < nbAncestors; i++)
+                    ancestors.add(in.readInt());
+                StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
+                int sstableLevel = 0;
+                if (in.available() > 0)
+                    sstableLevel = in.readInt();
+
+                List<ByteBuffer> minColumnNames;
+                List<ByteBuffer> maxColumnNames;
+                if (descriptor.version.tracksMaxMinColumnNames)
+                {
+                    int colCount = in.readInt();
+                    minColumnNames = new ArrayList<>(colCount);
+                    for (int i = 0; i < colCount; i++)
+                    {
+                        minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                    }
+                    colCount = in.readInt();
+                    maxColumnNames = new ArrayList<>(colCount);
+                    for (int i = 0; i < colCount; i++)
+                    {
+                        maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                    }
+                }
+                else
+                {
+                    minColumnNames = Collections.emptyList();
+                    maxColumnNames = Collections.emptyList();
+                }
+
+                if (types.contains(MetadataType.VALIDATION))
+                    components.put(MetadataType.VALIDATION,
+                                   new ValidationMetadata(partitioner, bloomFilterFPChance));
+                if (types.contains(MetadataType.STATS))
+                    components.put(MetadataType.STATS,
+                                   new StatsMetadata(rowSizes,
+                                                     columnCounts,
+                                                     replayPosition,
+                                                     minTimestamp,
+                                                     maxTimestamp,
+                                                     maxLocalDeletionTime,
+                                                     compressionRatio,
+                                                     tombstoneHistogram,
+                                                     sstableLevel,
+                                                     minColumnNames,
+                                                     maxColumnNames));
+                if (types.contains(MetadataType.COMPACTION))
+                    components.put(MetadataType.COMPACTION,
+                                   new CompactionMetadata(ancestors));
+            }
+        }
+        return components;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
new file mode 100644
index 0000000..4b9329f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+public class MetadataCollector
+{
+    public static final double NO_COMPRESSION_RATIO = -1.0;
+
+    static EstimatedHistogram defaultColumnCountHistogram()
+    {
+        // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
+        return new EstimatedHistogram(114);
+    }
+
+    static EstimatedHistogram defaultRowSizeHistogram()
+    {
+        // EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
+        return new EstimatedHistogram(150);
+    }
+
+    static StreamingHistogram defaultTombstoneDropTimeHistogram()
+    {
+        return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
+    }
+
+    public static StatsMetadata defaultStatsMetadata()
+    {
+        return new StatsMetadata(defaultRowSizeHistogram(),
+                                 defaultColumnCountHistogram(),
+                                 ReplayPosition.NONE,
+                                 Long.MIN_VALUE,
+                                 Long.MAX_VALUE,
+                                 Integer.MAX_VALUE,
+                                 NO_COMPRESSION_RATIO,
+                                 defaultTombstoneDropTimeHistogram(),
+                                 0,
+                                 Collections.<ByteBuffer>emptyList(),
+                                 Collections.<ByteBuffer>emptyList());
+    }
+
+    protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
+    protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
+    protected ReplayPosition replayPosition = ReplayPosition.NONE;
+    protected long minTimestamp = Long.MAX_VALUE;
+    protected long maxTimestamp = Long.MIN_VALUE;
+    protected int maxLocalDeletionTime = Integer.MIN_VALUE;
+    protected double compressionRatio = NO_COMPRESSION_RATIO;
+    protected Set<Integer> ancestors = new HashSet<>();
+    protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
+    protected int sstableLevel;
+    protected List<ByteBuffer> minColumnNames = Collections.emptyList();
+    protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+    private final AbstractType<?> columnNameComparator;
+
+    public MetadataCollector(AbstractType<?> columnNameComparator)
+    {
+        this.columnNameComparator = columnNameComparator;
+    }
+
+    public MetadataCollector(Collection<SSTableReader> sstables, AbstractType<?> columnNameComparator, int level)
+    {
+        this(columnNameComparator);
+
+        replayPosition(ReplayPosition.getReplayPosition(sstables));
+        sstableLevel(level);
+        // Get the max timestamp of the precompacted sstables
+        // and adds generation of live ancestors
+        for (SSTableReader sstable : sstables)
+        {
+            addAncestor(sstable.descriptor.generation);
+            for (Integer i : sstable.getAncestors())
+            {
+                if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+                    addAncestor(i);
+            }
+        }
+    }
+
+    public void addRowSize(long rowSize)
+    {
+        estimatedRowSize.add(rowSize);
+    }
+
+    public void addColumnCount(long columnCount)
+    {
+        estimatedColumnCount.add(columnCount);
+    }
+
+    public void mergeTombstoneHistogram(StreamingHistogram histogram)
+    {
+        estimatedTombstoneDropTime.merge(histogram);
+    }
+
+    /**
+     * Ratio is compressed/uncompressed and it is
+     * if you have 1.x then compression isn't helping
+     */
+    public void addCompressionRatio(long compressed, long uncompressed)
+    {
+        compressionRatio = (double) compressed/uncompressed;
+    }
+
+    public void updateMinTimestamp(long potentialMin)
+    {
+        minTimestamp = Math.min(minTimestamp, potentialMin);
+    }
+
+    public void updateMaxTimestamp(long potentialMax)
+    {
+        maxTimestamp = Math.max(maxTimestamp, potentialMax);
+    }
+
+    public void updateMaxLocalDeletionTime(int maxLocalDeletionTime)
+    {
+        this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
+    }
+
+    public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize)
+    {
+        this.estimatedRowSize = estimatedRowSize;
+        return this;
+    }
+
+    public MetadataCollector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
+    {
+        this.estimatedColumnCount = estimatedColumnCount;
+        return this;
+    }
+
+    public MetadataCollector replayPosition(ReplayPosition replayPosition)
+    {
+        this.replayPosition = replayPosition;
+        return this;
+    }
+
+    public MetadataCollector addAncestor(int generation)
+    {
+        this.ancestors.add(generation);
+        return this;
+    }
+
+    public void update(long size, ColumnStats stats)
+    {
+        updateMinTimestamp(stats.minTimestamp);
+        updateMaxTimestamp(stats.maxTimestamp);
+        updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
+        addRowSize(size);
+        addColumnCount(stats.columnCount);
+        mergeTombstoneHistogram(stats.tombstoneHistogram);
+        updateMinColumnNames(stats.minColumnNames);
+        updateMaxColumnNames(stats.maxColumnNames);
+    }
+
+    public MetadataCollector sstableLevel(int sstableLevel)
+    {
+        this.sstableLevel = sstableLevel;
+        return this;
+    }
+
+    public MetadataCollector updateMinColumnNames(List<ByteBuffer> minColumnNames)
+    {
+        if (minColumnNames.size() > 0)
+            this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator);
+        return this;
+    }
+
+    public MetadataCollector updateMaxColumnNames(List<ByteBuffer> maxColumnNames)
+    {
+        if (maxColumnNames.size() > 0)
+            this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator);
+        return this;
+    }
+
+    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance)
+    {
+        Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
+        components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
+        components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize,
+                                                             estimatedColumnCount,
+                                                             replayPosition,
+                                                             minTimestamp,
+                                                             maxTimestamp,
+                                                             maxLocalDeletionTime,
+                                                             compressionRatio,
+                                                             estimatedTombstoneDropTime,
+                                                             sstableLevel,
+                                                             minColumnNames,
+                                                             maxColumnNames));
+        components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors));
+        return components;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataComponent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataComponent.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataComponent.java
new file mode 100644
index 0000000..bf8a9af
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataComponent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+/**
+ * MetadataComponent is a component for SSTable metadata and serialized to Stats.db.
+ */
+public abstract class MetadataComponent implements Comparable<MetadataComponent>
+{
+    /**
+     * @return Metadata component type
+     */
+    public abstract MetadataType getType();
+
+    public int compareTo(MetadataComponent o)
+    {
+        return this.getType().compareTo(o.getType());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
new file mode 100644
index 0000000..d7962de
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.*;
+import java.util.*;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Metadata serializer for SSTables version >= 'k'.
+ *
+ * <pre>
+ * File format := | number of components (4 bytes) | toc | component1 | component2 | ... |
+ * toc         := | component type (4 bytes) | position of component |
+ * </pre>
+ *
+ * IMetadataComponent.Type's ordinal() defines the order of serialization.
+ */
+public class MetadataSerializer implements IMetadataSerializer
+{
+    private static final Logger logger = LoggerFactory.getLogger(MetadataSerializer.class);
+
+    public void serialize(Map<MetadataType, MetadataComponent> components, DataOutput out) throws IOException
+    {
+        // sort components by type
+        List<MetadataComponent> sortedComponents = Lists.newArrayList(components.values());
+        Collections.sort(sortedComponents);
+
+        // write number of component
+        out.writeInt(components.size());
+        // build and write toc
+        int lastPosition = 4 + (8 * sortedComponents.size());
+        for (MetadataComponent component : sortedComponents)
+        {
+            MetadataType type = component.getType();
+            // serialize type
+            out.writeInt(type.ordinal());
+            // serialize position
+            out.writeInt(lastPosition);
+            lastPosition += type.serializer.serializedSize(component);
+        }
+        // serialize components
+        for (MetadataComponent component : sortedComponents)
+        {
+            component.getType().serializer.serialize(component, out);
+        }
+    }
+
+    public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
+    {
+        Map<MetadataType, MetadataComponent> components;
+        logger.debug("Load metadata for {}", descriptor);
+        File statsFile = new File(descriptor.filenameFor(Component.STATS));
+        if (!statsFile.exists())
+        {
+            logger.debug("No sstable stats for {}", descriptor);
+            components = Maps.newHashMap();
+            components.put(MetadataType.STATS, MetadataCollector.defaultStatsMetadata());
+        }
+        else
+        {
+            try (RandomAccessReader r = RandomAccessReader.open(statsFile))
+            {
+                components = deserialize(descriptor, r, types);
+            }
+        }
+        return components;
+    }
+
+    public MetadataComponent deserialize(Descriptor descriptor, MetadataType type) throws IOException
+    {
+        return deserialize(descriptor, EnumSet.of(type)).get(type);
+    }
+
+    public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, FileDataInput in, EnumSet<MetadataType> types) throws IOException
+    {
+        Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
+        // read number of components
+        int numComponents = in.readInt();
+        // read toc
+        Map<MetadataType, Integer> toc = new HashMap<>(numComponents);
+        for (int i = 0; i < numComponents; i++)
+        {
+            toc.put(MetadataType.values()[in.readInt()], in.readInt());
+        }
+        for (MetadataType type : types)
+        {
+            MetadataComponent component = null;
+            if (toc.containsKey(type))
+            {
+                in.seek(toc.get(type));
+                component = type.serializer.deserialize(descriptor.version, in);
+            }
+            components.put(type, component);
+        }
+        return components;
+    }
+
+    public void mutateLevel(Descriptor descriptor, int newLevel) throws IOException
+    {
+        logger.debug("Mutating {} to level {}", descriptor.filenameFor(Component.STATS), newLevel);
+        Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
+        StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
+        // mutate level
+        currentComponents.put(MetadataType.STATS, stats.mutateLevel(newLevel));
+        Descriptor tmpDescriptor = descriptor.asTemporary(true);
+
+        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
+        {
+            serialize(currentComponents, out);
+            out.flush();
+        }
+        // we cant move a file on top of another file in windows:
+        if (!FBUtilities.isUnix())
+            FileUtils.delete(descriptor.filenameFor(Component.STATS));
+        FileUtils.renameWithConfirm(tmpDescriptor.filenameFor(Component.STATS), descriptor.filenameFor(Component.STATS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
new file mode 100644
index 0000000..9717da1
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+/**
+ * Defines Metadata component type.
+ */
+public enum MetadataType
+{
+    /** Metadata only used for SSTable validation */
+    VALIDATION(ValidationMetadata.serializer),
+    /** Metadata only used at compaction */
+    COMPACTION(CompactionMetadata.serializer),
+    /** Metadata always keep in memory */
+    STATS(StatsMetadata.serializer);
+
+    public final IMetadataComponentSerializer<MetadataComponent> serializer;
+
+    private MetadataType(IMetadataComponentSerializer<MetadataComponent> serializer)
+    {
+        this.serializer = serializer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
new file mode 100644
index 0000000..8055c77
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+/**
+ * SSTable metadata that always stay on heap.
+ */
+public class StatsMetadata extends MetadataComponent
+{
+    public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
+
+    public final EstimatedHistogram estimatedRowSize;
+    public final EstimatedHistogram estimatedColumnCount;
+    public final ReplayPosition replayPosition;
+    public final long minTimestamp;
+    public final long maxTimestamp;
+    public final int maxLocalDeletionTime;
+    public final double compressionRatio;
+    public final StreamingHistogram estimatedTombstoneDropTime;
+    public final int sstableLevel;
+    public final List<ByteBuffer> maxColumnNames;
+    public final List<ByteBuffer> minColumnNames;
+
+    public StatsMetadata(EstimatedHistogram estimatedRowSize,
+                         EstimatedHistogram estimatedColumnCount,
+                         ReplayPosition replayPosition,
+                         long minTimestamp,
+                         long maxTimestamp,
+                         int maxLocalDeletionTime,
+                         double compressionRatio,
+                         StreamingHistogram estimatedTombstoneDropTime,
+                         int sstableLevel,
+                         List<ByteBuffer> minColumnNames,
+                         List<ByteBuffer> maxColumnNames)
+    {
+        this.estimatedRowSize = estimatedRowSize;
+        this.estimatedColumnCount = estimatedColumnCount;
+        this.replayPosition = replayPosition;
+        this.minTimestamp = minTimestamp;
+        this.maxTimestamp = maxTimestamp;
+        this.maxLocalDeletionTime = maxLocalDeletionTime;
+        this.compressionRatio = compressionRatio;
+        this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
+        this.sstableLevel = sstableLevel;
+        this.minColumnNames = minColumnNames;
+        this.maxColumnNames = maxColumnNames;
+    }
+
+    public MetadataType getType()
+    {
+        return MetadataType.STATS;
+    }
+
+    /**
+     * @param gcBefore gc time in seconds
+     * @return estimated droppable tombstone ratio at given gcBefore time.
+     */
+    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
+    {
+        long estimatedColumnCount = this.estimatedColumnCount.mean() * this.estimatedColumnCount.count();
+        if (estimatedColumnCount > 0)
+        {
+            double droppable = getDroppableTombstonesBefore(gcBefore);
+            return droppable / estimatedColumnCount;
+        }
+        return 0.0f;
+    }
+
+    /**
+     * @param gcBefore gc time in seconds
+     * @return amount of droppable tombstones
+     */
+    public double getDroppableTombstonesBefore(int gcBefore)
+    {
+        return estimatedTombstoneDropTime.sum(gcBefore);
+    }
+
+    public StatsMetadata mutateLevel(int newLevel)
+    {
+        return new StatsMetadata(estimatedRowSize,
+                                 estimatedColumnCount,
+                                 replayPosition,
+                                 minTimestamp,
+                                 maxTimestamp,
+                                 maxLocalDeletionTime,
+                                 compressionRatio,
+                                 estimatedTombstoneDropTime,
+                                 newLevel,
+                                 maxColumnNames,
+                                 minColumnNames);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        StatsMetadata that = (StatsMetadata) o;
+        return new EqualsBuilder()
+                       .append(estimatedRowSize, that.estimatedRowSize)
+                       .append(estimatedColumnCount, that.estimatedColumnCount)
+                       .append(replayPosition, that.replayPosition)
+                       .append(minTimestamp, that.minTimestamp)
+                       .append(maxTimestamp, that.maxTimestamp)
+                       .append(maxLocalDeletionTime, that.maxLocalDeletionTime)
+                       .append(compressionRatio, that.compressionRatio)
+                       .append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime)
+                       .append(sstableLevel, that.sstableLevel)
+                       .append(maxColumnNames, that.maxColumnNames)
+                       .append(minColumnNames, that.minColumnNames)
+                       .build();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return new HashCodeBuilder()
+                       .append(estimatedRowSize)
+                       .append(estimatedColumnCount)
+                       .append(replayPosition)
+                       .append(minTimestamp)
+                       .append(maxTimestamp)
+                       .append(maxLocalDeletionTime)
+                       .append(compressionRatio)
+                       .append(estimatedTombstoneDropTime)
+                       .append(sstableLevel)
+                       .append(maxColumnNames)
+                       .append(minColumnNames)
+                       .build();
+    }
+
+    public static class StatsMetadataSerializer implements IMetadataComponentSerializer<StatsMetadata>
+    {
+        public int serializedSize(StatsMetadata component) throws IOException
+        {
+            int size = 0;
+            size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
+            size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
+            size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE);
+            size += 8 + 8 + 4 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double)
+            size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
+            size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
+            // min column names
+            size += 4;
+            for (ByteBuffer columnName : component.minColumnNames)
+                size += 2 + columnName.remaining(); // with short length
+            // max column names
+            size += 4;
+            for (ByteBuffer columnName : component.maxColumnNames)
+                size += 2 + columnName.remaining(); // with short length
+            return size;
+        }
+
+        public void serialize(StatsMetadata component, DataOutput out) throws IOException
+        {
+            EstimatedHistogram.serializer.serialize(component.estimatedRowSize, out);
+            EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
+            ReplayPosition.serializer.serialize(component.replayPosition, out);
+            out.writeLong(component.minTimestamp);
+            out.writeLong(component.maxTimestamp);
+            out.writeInt(component.maxLocalDeletionTime);
+            out.writeDouble(component.compressionRatio);
+            StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
+            out.writeInt(component.sstableLevel);
+            out.writeInt(component.minColumnNames.size());
+            for (ByteBuffer columnName : component.minColumnNames)
+                ByteBufferUtil.writeWithShortLength(columnName, out);
+            out.writeInt(component.maxColumnNames.size());
+            for (ByteBuffer columnName : component.maxColumnNames)
+                ByteBufferUtil.writeWithShortLength(columnName, out);
+        }
+
+        public StatsMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        {
+            EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
+            EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
+            ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+            long minTimestamp = in.readLong();
+            long maxTimestamp = in.readLong();
+            int maxLocalDeletionTime = version.tracksMaxLocalDeletionTime ? in.readInt() : Integer.MAX_VALUE;
+            double compressionRatio = in.readDouble();
+            StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
+            int sstableLevel = in.readInt();
+            List<ByteBuffer> minColumnNames;
+            List<ByteBuffer> maxColumnNames;
+            if (version.tracksMaxMinColumnNames)
+            {
+                int colCount = in.readInt();
+                minColumnNames = new ArrayList<>(colCount);
+                for (int i = 0; i < colCount; i++)
+                {
+                    minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                }
+                colCount = in.readInt();
+                maxColumnNames = new ArrayList<>(colCount);
+                for (int i = 0; i < colCount; i++)
+                {
+                    maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                }
+            }
+            else
+            {
+                minColumnNames = Collections.emptyList();
+                maxColumnNames = Collections.emptyList();
+            }
+            return new StatsMetadata(rowSizes,
+                                     columnCounts,
+                                     replayPosition,
+                                     minTimestamp,
+                                     maxTimestamp,
+                                     maxLocalDeletionTime,
+                                     compressionRatio,
+                                     tombstoneHistogram,
+                                     sstableLevel,
+                                     minColumnNames,
+                                     maxColumnNames);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
new file mode 100644
index 0000000..c2722f5
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * SSTable metadata component used only for validating SSTable.
+ *
+ * This part is read before opening main Data.db file for validation
+ * and discarded immediately after that.
+ */
+public class ValidationMetadata extends MetadataComponent
+{
+    public static final IMetadataComponentSerializer serializer = new ValidationMetadataSerializer();
+
+    public final String partitioner;
+    public final double bloomFilterFPChance;
+
+    public ValidationMetadata(String partitioner, double bloomFilterFPChance)
+    {
+        this.partitioner = partitioner;
+        this.bloomFilterFPChance = bloomFilterFPChance;
+    }
+
+    public MetadataType getType()
+    {
+        return MetadataType.VALIDATION;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ValidationMetadata that = (ValidationMetadata) o;
+        return Double.compare(that.bloomFilterFPChance, bloomFilterFPChance) == 0 && partitioner.equals(that.partitioner);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result;
+        long temp;
+        result = partitioner.hashCode();
+        temp = Double.doubleToLongBits(bloomFilterFPChance);
+        result = 31 * result + (int) (temp ^ (temp >>> 32));
+        return result;
+    }
+
+    public static class ValidationMetadataSerializer implements IMetadataComponentSerializer<ValidationMetadata>
+    {
+        public int serializedSize(ValidationMetadata component) throws IOException
+        {
+            return TypeSizes.NATIVE.sizeof(component.partitioner) + 8;
+        }
+
+        public void serialize(ValidationMetadata component, DataOutput out) throws IOException
+        {
+            out.writeUTF(component.partitioner);
+            out.writeDouble(component.bloomFilterFPChance);
+        }
+
+        public ValidationMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        {
+
+            return new ValidationMetadata(in.readUTF(), in.readDouble());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index b61cb57..bb5d465 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -25,8 +25,8 @@ import com.yammer.metrics.util.RatioGauge;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.EstimatedHistogram;
 
 /**
@@ -166,13 +166,13 @@ public class ColumnFamilyMetrics
                 int total = 0;
                 for (SSTableReader sstable : cfs.getSSTables())
                 {
-                    if (sstable.getCompressionRatio() != SSTableMetadata.NO_COMPRESSION_RATIO)
+                    if (sstable.getCompressionRatio() != MetadataCollector.NO_COMPRESSION_RATIO)
                     {
                         sum += sstable.getCompressionRatio();
                         total++;
                     }
                 }
-                return total != 0 ? (double) sum / total : 0;
+                return total != 0 ? sum / total : 0;
             }
         });
         readLatency = new LatencyMetrics(factory, "Read");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
index 999f440..aa555fc 100644
--- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
@@ -23,11 +23,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 
 /**
  * Reset level to 0 on a given set of sstables
@@ -65,9 +64,9 @@ public class SSTableLevelResetter
             {
                 foundSSTable = true;
                 Descriptor descriptor = sstable.getKey();
-                Pair<SSTableMetadata, Set<Integer>> metadata = SSTableMetadata.serializer.deserialize(descriptor);
-                out.println("Changing level from " + metadata.left.sstableLevel + " to 0 on " + descriptor.filenameFor(Component.DATA));
-                LeveledManifest.mutateLevel(metadata, descriptor, descriptor.filenameFor(Component.STATS), 0);
+                StatsMetadata metadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
+                out.println("Changing level from " + metadata.sstableLevel + " to 0 on " + descriptor.filenameFor(Component.DATA));
+                descriptor.getMetadataSerializer().mutateLevel(descriptor, 0);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 5cfd778..aeec284 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -19,9 +19,11 @@ package org.apache.cassandra.tools;
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.EnumSet;
+import java.util.Map;
 
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.sstable.metadata.*;
 
 /**
  * Shows the contents of sstable metadata
@@ -43,21 +45,31 @@ public class SSTableMetadataViewer
         for (String fname : args)
         {
             Descriptor descriptor = Descriptor.fromFilename(fname);
-            SSTableMetadata metadata = SSTableMetadata.serializer.deserialize(descriptor).left;
+            Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer().deserialize(descriptor, EnumSet.allOf(MetadataType.class));
+            ValidationMetadata validation = (ValidationMetadata) metadata.get(MetadataType.VALIDATION);
+            StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
+            CompactionMetadata compaction = (CompactionMetadata) metadata.get(MetadataType.COMPACTION);
 
             out.printf("SSTable: %s%n", descriptor);
-            out.printf("Partitioner: %s%n", metadata.partitioner);
-            out.printf("Maximum timestamp: %s%n", metadata.maxTimestamp);
-            out.printf("SSTable max local deletion time: %s%n", metadata.maxLocalDeletionTime);
-            out.printf("Compression ratio: %s%n", metadata.compressionRatio);
-            out.printf("Estimated droppable tombstones: %s%n", metadata.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
-            out.printf("SSTable Level: %d%n", metadata.sstableLevel);
-            out.println(metadata.replayPosition);
-            printHistograms(metadata, out);
+            if (validation != null)
+            {
+                out.printf("Partitioner: %s%n", validation.partitioner);
+                out.printf("Bloom Filter FP chance: %f%n", validation.bloomFilterFPChance);
+            }
+            if (stats != null)
+            {
+                out.printf("Maximum timestamp: %s%n", stats.maxTimestamp);
+                out.printf("SSTable max local deletion time: %s%n", stats.maxLocalDeletionTime);
+                out.printf("Compression ratio: %s%n", stats.compressionRatio);
+                out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
+                out.printf("SSTable Level: %d%n", stats.sstableLevel);
+                out.println(stats.replayPosition);
+            }
+            printHistograms(stats, out);
         }
     }
 
-    private static void printHistograms(SSTableMetadata metadata, PrintStream out)
+    private static void printHistograms(StatsMetadata metadata, PrintStream out)
     {
         long[] offsets = metadata.estimatedRowSize.getBucketOffsets();
         long[] ersh = metadata.estimatedRowSize.getBuckets(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
index 1b8ffe1..b05abe7 100644
--- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
+++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
@@ -337,9 +337,19 @@ public class EstimatedHistogram
             return new EstimatedHistogram(offsets, buckets);
         }
 
-        public long serializedSize(EstimatedHistogram object, TypeSizes typeSizes)
+        public long serializedSize(EstimatedHistogram eh, TypeSizes typeSizes)
         {
-            throw new UnsupportedOperationException();
+            int size = 0;
+
+            long[] offsets = eh.getBucketOffsets();
+            long[] buckets = eh.getBuckets(false);
+            size += typeSizes.sizeof(buckets.length);
+            for (int i = 0; i < buckets.length; i++)
+            {
+                size += typeSizes.sizeof(offsets[i == 0 ? 0 : i - 1]);
+                size += typeSizes.sizeof(buckets[i]);
+            }
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index c4ba956..daa702f 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -17,15 +17,16 @@
  */
 package org.apache.cassandra.utils;
 
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.ISerializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
+
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+
 /**
  * Histogram that can be constructed from streaming of data.
  *
@@ -50,13 +51,13 @@ public class StreamingHistogram
     public StreamingHistogram(int maxBinSize)
     {
         this.maxBinSize = maxBinSize;
-        bin = new TreeMap<Double, Long>();
+        bin = new TreeMap<>();
     }
 
     private StreamingHistogram(int maxBinSize, Map<Double, Long> bin)
     {
         this.maxBinSize = maxBinSize;
-        this.bin = new TreeMap<Double, Long>(bin);
+        this.bin = new TreeMap<>(bin);
     }
 
     /**
@@ -185,7 +186,7 @@ public class StreamingHistogram
         {
             int maxBinSize = in.readInt();
             int size = in.readInt();
-            Map<Double, Long> tmp = new HashMap<Double, Long>(size);
+            Map<Double, Long> tmp = new HashMap<>(size);
             for (int i = 0; i < size; i++)
             {
                 tmp.put(in.readDouble(), in.readLong());
@@ -196,7 +197,12 @@ public class StreamingHistogram
 
         public long serializedSize(StreamingHistogram histogram, TypeSizes typeSizes)
         {
-            throw new UnsupportedOperationException();
+            long size = typeSizes.sizeof(histogram.maxBinSize);
+            Map<Double, Long> entries = histogram.getAsMap();
+            size += typeSizes.sizeof(entries.size());
+            // size of entries = size * (8(double) + 8(long))
+            size += entries.size() * (8 + 8);
+            return size;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index e9d7336..ace32e7 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.marshal.LexicalUUIDType;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -1593,7 +1594,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         {
             protected SSTableWriter getWriter()
             {
-                SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+                MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
                 collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
                 return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName),
                                          0,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index a008de1..f8c3c00 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -183,7 +183,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         {
             assertTrue(s.getSSTableLevel() != 6);
             strategy.manifest.remove(s);
-            LeveledManifest.mutateLevel(Pair.create(s.getSSTableMetadata(), s.getAncestors()), s.descriptor, s.descriptor.filenameFor(Component.STATS), 6);
+            s.descriptor.getMetadataSerializer().mutateLevel(s.descriptor, 6);
             s.reloadSSTableMetadata();
             strategy.manifest.add(s);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index ee32a0e..fa96270 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.*;
 
 import static org.junit.Assert.assertEquals;
@@ -55,7 +55,7 @@ public class CompressedRandomAccessReaderTest
 
         try
         {
-            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(BytesType.instance).replayPosition(null);
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(BytesType.instance).replayPosition(null);
             SequentialWriter writer = compressed
                 ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
                 : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
@@ -106,7 +106,7 @@ public class CompressedRandomAccessReaderTest
         File metadata = new File(file.getPath() + ".meta");
         metadata.deleteOnExit();
 
-        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(BytesType.instance).replayPosition(null);
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(BytesType.instance).replayPosition(null);
         SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
 
         writer.write(CONTENT.getBytes());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 8d013f9..e9cca75 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -82,7 +82,7 @@ public class IndexSummaryTest
         dos.writeUTF("JUNK");
         FileUtils.closeQuietly(dos);
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
-        IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false);
+        IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false, 1);
         for (int i = 0; i < 100; i++)
             assertEquals(i, is.binarySearch(random.left.get(i)));
         // read the junk
@@ -106,7 +106,7 @@ public class IndexSummaryTest
         DataOutputStream dos = new DataOutputStream(aos);
         IndexSummary.serializer.serialize(summary, dos, false);
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
-        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false);
+        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1);
 
         assertEquals(1, loaded.size());
         assertEquals(summary.getPosition(0), loaded.getPosition(0));
@@ -253,4 +253,4 @@ public class IndexSummaryTest
         assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(0, 2, indexInterval));
         assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(1, 2, indexInterval));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
deleted file mode 100644
index 13b2f26..0000000
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.io.sstable;
-
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.DataOutputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Test;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.utils.EstimatedHistogram;
-import org.apache.cassandra.utils.Pair;
-
-public class SSTableMetadataSerializerTest
-{
-    @Test
-    public void testSerialization() throws IOException
-    {
-        EstimatedHistogram rowSizes = new EstimatedHistogram(
-            new long[] { 1L, 2L },
-            new long[] { 3L, 4L, 5L });
-        EstimatedHistogram columnCounts = new EstimatedHistogram(
-            new long[] { 6L, 7L },
-            new long[] { 8L, 9L, 10L });
-        ReplayPosition rp = new ReplayPosition(11L, 12);
-        long minTimestamp = 2162517136L;
-        long maxTimestamp = 4162517136L;
-
-        SSTableMetadata.Collector collector = SSTableMetadata.createCollector(BytesType.instance)
-                                                             .estimatedRowSize(rowSizes)
-                                                             .estimatedColumnCount(columnCounts)
-                                                             .replayPosition(rp);
-        collector.updateMinTimestamp(minTimestamp);
-        collector.updateMaxTimestamp(maxTimestamp);
-        SSTableMetadata originalMetadata = collector.finalizeMetadata(RandomPartitioner.class.getCanonicalName(), 0.1);
-
-        ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(byteOutput);
-        
-        Set<Integer> ancestors = new HashSet<Integer>();
-        ancestors.addAll(Arrays.asList(1,2,3,4));
-
-        SSTableMetadata.serializer.serialize(originalMetadata, ancestors, out);
-
-        ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
-        DataInputStream in = new DataInputStream(byteInput);
-        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, new File("."), "", "", 0, false);
-        Pair<SSTableMetadata, Set<Integer>> statsPair = SSTableMetadata.serializer.deserialize(in, desc);
-        SSTableMetadata stats = statsPair.left;
-
-        assert stats.estimatedRowSize.equals(originalMetadata.estimatedRowSize);
-        assert stats.estimatedRowSize.equals(rowSizes);
-        assert stats.estimatedColumnCount.equals(originalMetadata.estimatedColumnCount);
-        assert stats.estimatedColumnCount.equals(columnCounts);
-        assert stats.replayPosition.equals(originalMetadata.replayPosition);
-        assert stats.replayPosition.equals(rp);
-        assert stats.minTimestamp == minTimestamp;
-        assert stats.maxTimestamp == maxTimestamp;
-        assert stats.minTimestamp == originalMetadata.minTimestamp;
-        assert stats.maxTimestamp == originalMetadata.maxTimestamp;
-        assert stats.bloomFilterFPChance == originalMetadata.bloomFilterFPChance;
-        assert RandomPartitioner.class.getCanonicalName().equals(stats.partitioner);
-        assert ancestors.equals(statsPair.right);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
new file mode 100644
index 0000000..e0b87cd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+import static org.junit.Assert.assertEquals;
+
+public class MetadataSerializerTest
+{
+    @Test
+    public void testSerialization() throws IOException
+    {
+        EstimatedHistogram rowSizes = new EstimatedHistogram(new long[] { 1L, 2L },
+                                                             new long[] { 3L, 4L, 5L });
+        EstimatedHistogram columnCounts = new EstimatedHistogram(new long[] { 6L, 7L },
+                                                                 new long[] { 8L, 9L, 10L });
+        ReplayPosition rp = new ReplayPosition(11L, 12);
+        long minTimestamp = 2162517136L;
+        long maxTimestamp = 4162517136L;
+
+        MetadataCollector collector = new MetadataCollector(BytesType.instance)
+                                                      .estimatedRowSize(rowSizes)
+                                                      .estimatedColumnCount(columnCounts)
+                                                      .replayPosition(rp);
+        collector.updateMinTimestamp(minTimestamp);
+        collector.updateMaxTimestamp(maxTimestamp);
+
+        Set<Integer> ancestors = Sets.newHashSet(1, 2, 3, 4);
+        for (int i : ancestors)
+            collector.addAncestor(i);
+
+        String partitioner = RandomPartitioner.class.getCanonicalName();
+        double bfFpChance = 0.1;
+        Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance);
+
+        MetadataSerializer serializer = new MetadataSerializer();
+        // Serialize to tmp file
+        File statsFile = File.createTempFile(Component.STATS.name, null);
+        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(statsFile)))
+        {
+            serializer.serialize(originalMetadata, out);
+        }
+
+        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(), "", "", 0, false);
+        try (RandomAccessReader in = RandomAccessReader.open(statsFile))
+        {
+            Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));
+
+            for (MetadataType type : MetadataType.values())
+            {
+                assertEquals(originalMetadata.get(type), deserialized.get(type));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 027c84c..efe51c6 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -60,7 +60,7 @@ public class CompressedInputStreamTest
         // write compressed data file of longs
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
-        SSTableMetadata.Collector collector = SSTableMetadata.createCollector(BytesType.instance);
+        MetadataCollector collector = new MetadataCollector(BytesType.instance);
         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), false, param, collector);
         Map<Long, Long> index = new HashMap<Long, Long>();


[2/2] git commit: SSTable metadata(Stats.db) format change

Posted by yu...@apache.org.
SSTable metadata(Stats.db) format change

patch by yukim; reviewed by thobbs for CASSANDRA-6356


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/74bf5aa1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/74bf5aa1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/74bf5aa1

Branch: refs/heads/trunk
Commit: 74bf5aa16e7080360febca1745307a4d7ced32dc
Parents: 84d85ee
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Dec 13 16:33:26 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Dec 13 16:33:26 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  14 +-
 .../org/apache/cassandra/db/DataTracker.java    |   9 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   4 +-
 .../cassandra/db/commitlog/ReplayPosition.java  |   4 +-
 .../db/compaction/CompactionManager.java        |   3 +-
 .../cassandra/db/compaction/CompactionTask.java |   3 +-
 .../db/compaction/LeveledManifest.java          |  31 +-
 .../cassandra/db/compaction/Upgrader.java       |   3 +-
 .../io/compress/CompressedSequentialWriter.java |   9 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   3 +-
 .../apache/cassandra/io/sstable/Descriptor.java |  50 +-
 .../cassandra/io/sstable/IndexSummary.java      |   9 +-
 .../cassandra/io/sstable/SSTableMetadata.java   | 518 -------------------
 .../cassandra/io/sstable/SSTableReader.java     | 183 ++++---
 .../cassandra/io/sstable/SSTableWriter.java     |  32 +-
 .../io/sstable/metadata/CompactionMetadata.java |  93 ++++
 .../metadata/IMetadataComponentSerializer.java  |  58 +++
 .../sstable/metadata/IMetadataSerializer.java   |  68 +++
 .../metadata/LegacyMetadataSerializer.java      | 156 ++++++
 .../io/sstable/metadata/MetadataCollector.java  | 220 ++++++++
 .../io/sstable/metadata/MetadataComponent.java  |  34 ++
 .../io/sstable/metadata/MetadataSerializer.java | 144 ++++++
 .../io/sstable/metadata/MetadataType.java       |  38 ++
 .../io/sstable/metadata/StatsMetadata.java      | 253 +++++++++
 .../io/sstable/metadata/ValidationMetadata.java |  91 ++++
 .../cassandra/metrics/ColumnFamilyMetrics.java  |   6 +-
 .../cassandra/tools/SSTableLevelResetter.java   |  11 +-
 .../cassandra/tools/SSTableMetadataViewer.java  |  34 +-
 .../cassandra/utils/EstimatedHistogram.java     |  14 +-
 .../cassandra/utils/StreamingHistogram.java     |  20 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   3 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../CompressedRandomAccessReaderTest.java       |   6 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |   6 +-
 .../sstable/SSTableMetadataSerializerTest.java  |  90 ----
 .../metadata/MetadataSerializerTest.java        |  88 ++++
 .../compress/CompressedInputStreamTest.java     |   4 +-
 38 files changed, 1505 insertions(+), 810 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e00201..4c74ea9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@
  * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
  * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
  * Secondary index support for collections (CASSANDRA-4511)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
 
 
 2.0.4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4e54af0..a98e30b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -62,6 +62,8 @@ import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.ColumnFamilyMetrics;
 import org.apache.cassandra.service.CacheService;
@@ -486,7 +488,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Directories directories = Directories.create(keyspace, columnfamily);
 
         // sanity-check unfinishedGenerations
-        Set<Integer> allGenerations = new HashSet<Integer>();
+        Set<Integer> allGenerations = new HashSet<>();
         for (Descriptor desc : directories.sstableLister().list().keySet())
             allGenerations.add(desc.generation);
         if (!allGenerations.containsAll(unfinishedGenerations))
@@ -497,7 +499,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         // remove new sstables from compactions that didn't complete, and compute
         // set of ancestors that shouldn't exist anymore
-        Set<Integer> completedAncestors = new HashSet<Integer>();
+        Set<Integer> completedAncestors = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
@@ -506,7 +508,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             Set<Integer> ancestors;
             try
             {
-                ancestors = SSTableMetadata.serializer.deserialize(desc).right;
+                CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION);
+                ancestors = compactionMetadata.ancestors;
             }
             catch (IOException e)
             {
@@ -595,10 +598,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 if (new File(descriptor.filenameFor(Component.STATS)).exists())
-                {
-                    Pair<SSTableMetadata, Set<Integer>> oldMetadata = SSTableMetadata.serializer.deserialize(descriptor);
-                    LeveledManifest.mutateLevel(oldMetadata, descriptor, descriptor.filenameFor(Component.STATS), 0);
-                }
+                    descriptor.getMetadataSerializer().mutateLevel(descriptor, 0);
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 64b088d..692c5c5 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.StorageMetrics;
@@ -460,11 +459,7 @@ public class DataTracker
             allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds());
             allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count();
         }
-        if (allColumns > 0)
-        {
-            return allDroppable / allColumns;
-        }
-        return 0;
+        return allColumns > 0 ? allDroppable / allColumns : 0;
     }
 
     public void notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType)
@@ -630,4 +625,4 @@ public class DataTracker
             return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", memtablesPendingFlush.size(), sstables, compacting);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 064851a..785f0c2 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -35,9 +35,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.utils.Allocator;
 import org.github.jamm.MemoryMeter;
@@ -381,7 +381,7 @@ public class Memtable
 
         public SSTableWriter createFlushWriter(String filename) throws ExecutionException, InterruptedException
         {
-            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(cfs.metadata.comparator).replayPosition(context.get());
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context.get());
             return new SSTableWriter(filename,
                                      rows.size(),
                                      cfs.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 354444b..fb78ed3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -130,9 +130,9 @@ public class ReplayPosition implements Comparable<ReplayPosition>
             return new ReplayPosition(in.readLong(), in.readInt());
         }
 
-        public long serializedSize(ReplayPosition object, TypeSizes typeSizes)
+        public long serializedSize(ReplayPosition rp, TypeSizes typeSizes)
         {
-            throw new UnsupportedOperationException();
+            return typeSizes.sizeof(rp.segment) + typeSizes.sizeof(rp.position);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5bcaca9..2090b6f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
@@ -738,7 +739,7 @@ public class CompactionManager implements CompactionManagerMBean
                                  expectedBloomFilterSize,
                                  cfs.metadata,
                                  cfs.partitioner,
-                                 SSTableMetadata.createCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
+                                 new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f4cc500..59f2f2f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CloseableIterator;
 
 public class CompactionTask extends AbstractCompactionTask
@@ -281,7 +282,7 @@ public class CompactionTask extends AbstractCompactionTask
                                  keysPerSSTable,
                                  cfs.metadata,
                                  cfs.partitioner,
-                                 SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator, getLevel()));
+                                 new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));
     }
 
     protected int getLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 232d1f7..2ec42e4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -188,7 +188,7 @@ public class LeveledManifest
         String metaDataFile = sstable.descriptor.filenameFor(Component.STATS);
         try
         {
-            mutateLevel(Pair.create(sstable.getSSTableMetadata(), sstable.getAncestors()), sstable.descriptor, metaDataFile, 0);
+            sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
             sstable.reloadSSTableMetadata();
             add(sstable);
         }
@@ -571,33 +571,4 @@ public class LeveledManifest
         return newLevel;
 
     }
-
-    /**
-     * Scary method mutating existing sstable component
-     *
-     * Tries to do it safely by moving the new file on top of the old one
-     *
-     * Caller needs to reload the sstable metadata (sstableReader.reloadSSTableMetadata())
-     *
-     * @see org.apache.cassandra.io.sstable.SSTableReader#reloadSSTableMetadata()
-     *
-     * @param oldMetadata
-     * @param descriptor
-     * @param filename
-     * @param level
-     * @throws IOException
-     */
-    public static synchronized void mutateLevel(Pair<SSTableMetadata, Set<Integer>> oldMetadata, Descriptor descriptor, String filename, int level) throws IOException
-    {
-        logger.debug("Mutating {} to level {}", descriptor.filenameFor(Component.STATS), level);
-        SSTableMetadata metadata = SSTableMetadata.copyWithNewSSTableLevel(oldMetadata.left, level);
-        DataOutputStream out = new DataOutputStream(new FileOutputStream(filename + "-tmp"));
-        SSTableMetadata.serializer.legacySerialize(metadata, oldMetadata.right, descriptor, out);
-        out.flush();
-        out.close();
-        // we cant move a file on top of another file in windows:
-        if (!FBUtilities.isUnix())
-            FileUtils.delete(filename);
-        FileUtils.renameWithConfirm(filename + "-tmp", filename);
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 383ff00..ef881c4 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -26,6 +26,7 @@ import com.google.common.base.Throwables;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.OutputHandler;
 
@@ -62,7 +63,7 @@ public class Upgrader
 
     private SSTableWriter createCompactionWriter()
     {
-        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(cfs.getComparator());
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
 
         // Get the max timestamp of the precompacted sstables
         // and adds generation of live ancestors

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 54b990f..99bbd85 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -21,13 +21,12 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.zip.Adler32;
-import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.SSTableMetadata.Collector;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
 
@@ -37,7 +36,7 @@ public class CompressedSequentialWriter extends SequentialWriter
                                         String indexFilePath,
                                         boolean skipIOCache,
                                         CompressionParameters parameters,
-                                        Collector sstableMetadataCollector)
+                                        MetadataCollector sstableMetadataCollector)
     {
         return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters, sstableMetadataCollector);
     }
@@ -60,13 +59,13 @@ public class CompressedSequentialWriter extends SequentialWriter
 
     private long originalSize = 0, compressedSize = 0;
 
-    private final Collector sstableMetadataCollector;
+    private final MetadataCollector sstableMetadataCollector;
 
     public CompressedSequentialWriter(File file,
                                       String indexFilePath,
                                       boolean skipIOCache,
                                       CompressionParameters parameters,
-                                      Collector sstableMetadataCollector)
+                                      MetadataCollector sstableMetadataCollector)
     {
         super(file, parameters.chunkLength(), skipIOCache);
         this.compressor = parameters.sstableCompressor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 0059fda..6018369 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.Pair;
 
@@ -56,7 +57,7 @@ public abstract class AbstractSSTableSimpleWriter
             0, // We don't care about the bloom filter
             metadata,
             DatabaseDescriptor.getPartitioner(),
-            SSTableMetadata.createCollector(metadata.comparator));
+            new MetadataCollector(metadata.comparator));
     }
 
     // find available generation and pick up filename from that

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index fef6a1e..6f84296 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -22,6 +22,9 @@ import java.util.StringTokenizer;
 
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
+import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
+import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
 import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.io.sstable.Component.separator;
@@ -43,8 +46,8 @@ public class Descriptor
     // we always incremented the major version.
     public static class Version
     {
-        // This needs to be at the beginning for initialization sake
-        public static final String current_version = "jc";
+        // This needs to be at the begining for initialization sake
+        public static final String current_version = "ka";
 
         // ic (1.2.5): omits per-row bloom filter of column names
         // ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
@@ -57,7 +60,8 @@ public class Descriptor
         //             tracks max/min column values (according to comparator)
         // jb (2.0.1): switch from crc32 to adler32 for compression checksums
         //             checksum the compressed data
-        // jc (2.1.0): index summaries can be downsampled and the sampling level is persisted
+        // ka (2.1.0): new Statistics.db file format
+        //             index summaries can be downsampled and the sampling level is persisted
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -72,6 +76,7 @@ public class Descriptor
         public final boolean tracksMaxMinColumnNames;
         public final boolean hasPostCompressionAdlerChecksums;
         public final boolean hasSamplingLevel;
+        public final boolean newStatsFile;
 
         public Version(String version)
         {
@@ -84,7 +89,8 @@ public class Descriptor
             hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
             tracksMaxMinColumnNames = version.compareTo("ja") >= 0;
             hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
-            hasSamplingLevel = version.compareTo("jc") >= 0;
+            hasSamplingLevel = version.compareTo("ka") >= 0;
+            newStatsFile = version.compareTo("ka") >= 0;
         }
 
         /**
@@ -102,11 +108,6 @@ public class Descriptor
             return version.compareTo("ic") >= 0 && version.charAt(0) <= CURRENT.version.charAt(0);
         }
 
-        public boolean isStreamCompatible()
-        {
-            return isCompatible() && version.charAt(0) >= 'j';
-        }
-
         @Override
         public String toString()
         {
@@ -116,11 +117,7 @@ public class Descriptor
         @Override
         public boolean equals(Object o)
         {
-            if (o == this)
-                return true;
-            if (!(o instanceof Version))
-                return false;
-            return version.equals(((Version)o).version);
+            return o == this || o instanceof Version && version.equals(((Version) o).version);
         }
 
         @Override
@@ -256,23 +253,20 @@ public class Descriptor
         return new Descriptor(version, directory, ksname, cfname, generation, temporary);
     }
 
-    /**
-     * @return true if the current Cassandra version can read the given sstable version
-     */
-    public boolean isCompatible()
+    public IMetadataSerializer getMetadataSerializer()
     {
-        return version.isCompatible();
+        if (version.newStatsFile)
+            return new MetadataSerializer();
+        else
+            return new LegacyMetadataSerializer();
     }
 
     /**
-     * @return true if the current Cassandra version can stream the given sstable version
-     * from another node.  This is stricter than opening it locally [isCompatible] because
-     * streaming needs to rebuild all the non-data components, and it only knows how to write
-     * the latest version.
+     * @return true if the current Cassandra version can read the given sstable version
      */
-    public boolean isStreamCompatible()
+    public boolean isCompatible()
     {
-        return version.isStreamCompatible();
+        return version.isCompatible();
     }
 
     @Override
@@ -289,7 +283,11 @@ public class Descriptor
         if (!(o instanceof Descriptor))
             return false;
         Descriptor that = (Descriptor)o;
-        return that.directory.equals(this.directory) && that.generation == this.generation && that.ksname.equals(this.ksname) && that.cfname.equals(this.cfname) && that.temporary == this.temporary;
+        return that.directory.equals(this.directory)
+                       && that.generation == this.generation
+                       && that.ksname.equals(this.ksname)
+                       && that.cfname.equals(this.cfname)
+                       && that.temporary == this.temporary;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 4fc4737..6d8712a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -202,9 +202,14 @@ public class IndexSummary implements Closeable
             FBUtilities.copy(new MemoryInputStream(t.bytes), out, t.bytes.size());
         }
 
-        public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel) throws IOException
+        public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedIndexInterval) throws IOException
         {
             int indexInterval = in.readInt();
+            if (indexInterval != expectedIndexInterval)
+            {
+                throw new IOException(String.format("Cannot read index summary because Index Interval changed from %d to %d.",
+                                                           indexInterval, expectedIndexInterval));
+            }
             int summarySize = in.readInt();
             long offheapSize = in.readLong();
             int samplingLevel, fullSamplingSummarySize;
@@ -229,4 +234,4 @@ public class IndexSummary implements Closeable
     {
         bytes.free();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
deleted file mode 100644
index 8ddfdd7..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ /dev/null
@@ -1,518 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.StreamingHistogram;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.EstimatedHistogram;
-
-/**
- * Metadata for a SSTable.
- * Metadata includes:
- *  - estimated row size histogram
- *  - estimated column count histogram
- *  - replay position
- *  - max column timestamp
- *  - max local deletion time
- *  - bloom filter fp chance
- *  - compression ratio
- *  - partitioner
- *  - generations of sstables from which this sstable was compacted, if any
- *  - tombstone drop time histogram
- *
- * An SSTableMetadata should be instantiated via the Collector, openFromDescriptor()
- * or createDefaultInstance()
- */
-public class SSTableMetadata
-{
-    public static final double NO_BLOOM_FLITER_FP_CHANCE = -1.0;
-    public static final double NO_COMPRESSION_RATIO = -1.0;
-    public static final SSTableMetadataSerializer serializer = new SSTableMetadataSerializer();
-
-    public final EstimatedHistogram estimatedRowSize;
-    public final EstimatedHistogram estimatedColumnCount;
-    public final ReplayPosition replayPosition;
-    public final long minTimestamp;
-    public final long maxTimestamp;
-    public final int maxLocalDeletionTime;
-    public final double bloomFilterFPChance;
-    public final double compressionRatio;
-    public final String partitioner;
-    public final StreamingHistogram estimatedTombstoneDropTime;
-    public final int sstableLevel;
-    public final List<ByteBuffer> maxColumnNames;
-    public final List<ByteBuffer> minColumnNames;
-
-    private SSTableMetadata()
-    {
-        this(defaultRowSizeHistogram(),
-             defaultColumnCountHistogram(),
-             ReplayPosition.NONE,
-             Long.MIN_VALUE,
-             Long.MAX_VALUE,
-             Integer.MAX_VALUE,
-             NO_BLOOM_FLITER_FP_CHANCE,
-             NO_COMPRESSION_RATIO,
-             null,
-             defaultTombstoneDropTimeHistogram(),
-             0,
-             Collections.<ByteBuffer>emptyList(),
-             Collections.<ByteBuffer>emptyList());
-    }
-
-    private SSTableMetadata(EstimatedHistogram rowSizes,
-                            EstimatedHistogram columnCounts,
-                            ReplayPosition replayPosition,
-                            long minTimestamp,
-                            long maxTimestamp,
-                            int maxLocalDeletionTime,
-                            double bloomFilterFPChance,
-                            double compressionRatio,
-                            String partitioner,
-                            StreamingHistogram estimatedTombstoneDropTime,
-                            int sstableLevel,
-                            List<ByteBuffer> minColumnNames,
-                            List<ByteBuffer> maxColumnNames)
-    {
-        this.estimatedRowSize = rowSizes;
-        this.estimatedColumnCount = columnCounts;
-        this.replayPosition = replayPosition;
-        this.minTimestamp = minTimestamp;
-        this.maxTimestamp = maxTimestamp;
-        this.maxLocalDeletionTime = maxLocalDeletionTime;
-        this.bloomFilterFPChance = bloomFilterFPChance;
-        this.compressionRatio = compressionRatio;
-        this.partitioner = partitioner;
-        this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
-        this.sstableLevel = sstableLevel;
-        this.minColumnNames = minColumnNames;
-        this.maxColumnNames = maxColumnNames;
-    }
-
-    public static Collector createCollector(AbstractType<?> columnNameComparator)
-    {
-        return new Collector(columnNameComparator);
-    }
-
-    public static Collector createCollector(Collection<SSTableReader> sstables, AbstractType<?> columnNameComparator, int level)
-    {
-        Collector collector = new Collector(columnNameComparator);
-
-        collector.replayPosition(ReplayPosition.getReplayPosition(sstables));
-        collector.sstableLevel(level);
-        // Get the max timestamp of the precompacted sstables
-        // and adds generation of live ancestors
-        for (SSTableReader sstable : sstables)
-        {
-            collector.addAncestor(sstable.descriptor.generation);
-            for (Integer i : sstable.getAncestors())
-            {
-                if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
-                    collector.addAncestor(i);
-            }
-        }
-
-        return collector;
-    }
-
-    /**
-     * Used when updating sstablemetadata files with an sstable level
-     * @param metadata
-     * @param sstableLevel
-     * @return
-     */
-    @Deprecated
-    public static SSTableMetadata copyWithNewSSTableLevel(SSTableMetadata metadata, int sstableLevel)
-    {
-        return new SSTableMetadata(metadata.estimatedRowSize,
-                                   metadata.estimatedColumnCount,
-                                   metadata.replayPosition,
-                                   metadata.minTimestamp,
-                                   metadata.maxTimestamp,
-                                   metadata.maxLocalDeletionTime,
-                                   metadata.bloomFilterFPChance,
-                                   metadata.compressionRatio,
-                                   metadata.partitioner,
-                                   metadata.estimatedTombstoneDropTime,
-                                   sstableLevel,
-                                   metadata.minColumnNames,
-                                   metadata.maxColumnNames);
-
-    }
-
-    static EstimatedHistogram defaultColumnCountHistogram()
-    {
-        // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
-        return new EstimatedHistogram(114);
-    }
-
-    static EstimatedHistogram defaultRowSizeHistogram()
-    {
-        // EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
-        return new EstimatedHistogram(150);
-    }
-
-    static StreamingHistogram defaultTombstoneDropTimeHistogram()
-    {
-        return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
-    }
-
-    /**
-     * @param gcBefore
-     * @return estimated droppable tombstone ratio at given gcBefore time.
-     */
-    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
-    {
-        long estimatedColumnCount = this.estimatedColumnCount.mean() * this.estimatedColumnCount.count();
-        if (estimatedColumnCount > 0)
-        {
-            double droppable = getDroppableTombstonesBefore(gcBefore);
-            return droppable / estimatedColumnCount;
-        }
-        return 0.0f;
-    }
-
-    /**
-     * Get the amount of droppable tombstones
-     * @param gcBefore the gc time
-     * @return amount of droppable tombstones
-     */
-    public double getDroppableTombstonesBefore(int gcBefore)
-    {
-        return estimatedTombstoneDropTime.sum(gcBefore);
-    }
-
-    public static class Collector
-    {
-        protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
-        protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
-        protected ReplayPosition replayPosition = ReplayPosition.NONE;
-        protected long minTimestamp = Long.MAX_VALUE;
-        protected long maxTimestamp = Long.MIN_VALUE;
-        protected int maxLocalDeletionTime = Integer.MIN_VALUE;
-        protected double compressionRatio = NO_COMPRESSION_RATIO;
-        protected Set<Integer> ancestors = new HashSet<Integer>();
-        protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
-        protected int sstableLevel;
-        protected List<ByteBuffer> minColumnNames = Collections.emptyList();
-        protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
-        private final AbstractType<?> columnNameComparator;
-
-        private Collector(AbstractType<?> columnNameComparator)
-        {
-            this.columnNameComparator = columnNameComparator;
-        }
-        public void addRowSize(long rowSize)
-        {
-            estimatedRowSize.add(rowSize);
-        }
-
-        public void addColumnCount(long columnCount)
-        {
-            estimatedColumnCount.add(columnCount);
-        }
-
-        public void mergeTombstoneHistogram(StreamingHistogram histogram)
-        {
-            estimatedTombstoneDropTime.merge(histogram);
-        }
-
-        /**
-         * Ratio is compressed/uncompressed and it is
-         * if you have 1.x then compression isn't helping
-         */
-        public void addCompressionRatio(long compressed, long uncompressed)
-        {
-            compressionRatio = (double) compressed/uncompressed;
-        }
-
-        public void updateMinTimestamp(long potentialMin)
-        {
-            minTimestamp = Math.min(minTimestamp, potentialMin);
-        }
-
-        public void updateMaxTimestamp(long potentialMax)
-        {
-            maxTimestamp = Math.max(maxTimestamp, potentialMax);
-        }
-
-        public void updateMaxLocalDeletionTime(int maxLocalDeletionTime)
-        {
-            this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
-        }
-
-        public SSTableMetadata finalizeMetadata(String partitioner, double bloomFilterFPChance)
-        {
-            return new SSTableMetadata(estimatedRowSize,
-                                       estimatedColumnCount,
-                                       replayPosition,
-                                       minTimestamp,
-                                       maxTimestamp,
-                                       maxLocalDeletionTime,
-                                       bloomFilterFPChance,
-                                       compressionRatio,
-                                       partitioner,
-                                       estimatedTombstoneDropTime,
-                                       sstableLevel,
-                                       minColumnNames,
-                                       maxColumnNames);
-        }
-
-        public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
-        {
-            this.estimatedRowSize = estimatedRowSize;
-            return this;
-        }
-
-        public Collector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
-        {
-            this.estimatedColumnCount = estimatedColumnCount;
-            return this;
-        }
-
-        public Collector replayPosition(ReplayPosition replayPosition)
-        {
-            this.replayPosition = replayPosition;
-            return this;
-        }
-
-        public Collector addAncestor(int generation)
-        {
-            this.ancestors.add(generation);
-            return this;
-        }
-
-        void update(long size, ColumnStats stats)
-        {
-            updateMinTimestamp(stats.minTimestamp);
-            /*
-             * The max timestamp is not always collected here (more precisely, row.maxTimestamp() may return Long.MIN_VALUE),
-             * to avoid deserializing an EchoedRow.
-             * This is the reason why it is collected first when calling ColumnFamilyStore.createCompactionWriter
-             * However, for old sstables without timestamp, we still want to update the timestamp (and we know
-             * that in this case we will not use EchoedRow, since CompactionControler.needsDeserialize() will be true).
-            */
-            updateMaxTimestamp(stats.maxTimestamp);
-            updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
-            addRowSize(size);
-            addColumnCount(stats.columnCount);
-            mergeTombstoneHistogram(stats.tombstoneHistogram);
-            updateMinColumnNames(stats.minColumnNames);
-            updateMaxColumnNames(stats.maxColumnNames);
-        }
-
-        public Collector sstableLevel(int sstableLevel)
-        {
-            this.sstableLevel = sstableLevel;
-            return this;
-        }
-
-        public Collector updateMinColumnNames(List<ByteBuffer> minColumnNames)
-        {
-            if (minColumnNames.size() > 0)
-                this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator);
-            return this;
-        }
-
-        public Collector updateMaxColumnNames(List<ByteBuffer> maxColumnNames)
-        {
-            if (maxColumnNames.size() > 0)
-                this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator);
-            return this;
-        }
-    }
-
-    public static class SSTableMetadataSerializer
-    {
-        private static final Logger logger = LoggerFactory.getLogger(SSTableMetadataSerializer.class);
-
-        public void serialize(SSTableMetadata sstableStats, Set<Integer> ancestors, DataOutput out) throws IOException
-        {
-            assert sstableStats.partitioner != null;
-
-            EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, out);
-            EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, out);
-            ReplayPosition.serializer.serialize(sstableStats.replayPosition, out);
-            out.writeLong(sstableStats.minTimestamp);
-            out.writeLong(sstableStats.maxTimestamp);
-            out.writeInt(sstableStats.maxLocalDeletionTime);
-            out.writeDouble(sstableStats.bloomFilterFPChance);
-            out.writeDouble(sstableStats.compressionRatio);
-            out.writeUTF(sstableStats.partitioner);
-            out.writeInt(ancestors.size());
-            for (Integer g : ancestors)
-                out.writeInt(g);
-            StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, out);
-            out.writeInt(sstableStats.sstableLevel);
-            serializeMinMaxColumnNames(sstableStats.minColumnNames, sstableStats.maxColumnNames, out);
-        }
-
-        private void serializeMinMaxColumnNames(List<ByteBuffer> minColNames, List<ByteBuffer> maxColNames, DataOutput out) throws IOException
-        {
-            out.writeInt(minColNames.size());
-            for (ByteBuffer columnName : minColNames)
-                ByteBufferUtil.writeWithShortLength(columnName, out);
-            out.writeInt(maxColNames.size());
-            for (ByteBuffer columnName : maxColNames)
-                ByteBufferUtil.writeWithShortLength(columnName, out);
-        }
-        /**
-         * Used to serialize to an old version - needed to be able to update sstable level without a full compaction.
-         *
-         * @deprecated will be removed when it is assumed that the minimum upgrade-from-version is the version that this
-         * patch made it into
-         *
-         * @param sstableStats
-         * @param legacyDesc
-         * @param out
-         * @throws IOException
-         */
-        @Deprecated
-        public void legacySerialize(SSTableMetadata sstableStats, Set<Integer> ancestors, Descriptor legacyDesc, DataOutput out) throws IOException
-        {
-            EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, out);
-            EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, out);
-            ReplayPosition.serializer.serialize(sstableStats.replayPosition, out);
-            out.writeLong(sstableStats.minTimestamp);
-            out.writeLong(sstableStats.maxTimestamp);
-            if (legacyDesc.version.tracksMaxLocalDeletionTime)
-                out.writeInt(sstableStats.maxLocalDeletionTime);
-            if (legacyDesc.version.hasBloomFilterFPChance)
-                out.writeDouble(sstableStats.bloomFilterFPChance);
-            out.writeDouble(sstableStats.compressionRatio);
-            out.writeUTF(sstableStats.partitioner);
-            out.writeInt(ancestors.size());
-            for (Integer g : ancestors)
-                out.writeInt(g);
-            StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, out);
-            out.writeInt(sstableStats.sstableLevel);
-            if (legacyDesc.version.tracksMaxMinColumnNames)
-                serializeMinMaxColumnNames(sstableStats.minColumnNames, sstableStats.maxColumnNames, out);
-        }
-
-        /**
-         * deserializes the metadata
-         *
-         * returns a pair containing the part of the metadata meant to be kept-in memory and the part
-         * that should not.
-         *
-         * @param descriptor the descriptor
-         * @return a pair containing data that needs to be in memory and data that is potentially big and is not needed
-         *         in memory
-         * @throws IOException
-         */
-        public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor) throws IOException
-        {
-            return deserialize(descriptor, true);
-        }
-
-        public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor, boolean loadSSTableLevel) throws IOException
-        {
-            logger.debug("Load metadata for {}", descriptor);
-            File statsFile = new File(descriptor.filenameFor(Component.STATS));
-            if (!statsFile.exists())
-            {
-                logger.debug("No sstable stats for {}", descriptor);
-                return Pair.create(new SSTableMetadata(), Collections.<Integer>emptySet());
-            }
-
-            DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
-            try
-            {
-                return deserialize(in, descriptor, loadSSTableLevel);
-            }
-            finally
-            {
-                FileUtils.closeQuietly(in);
-            }
-        }
-        public Pair<SSTableMetadata, Set<Integer>> deserialize(DataInputStream in, Descriptor desc) throws IOException
-        {
-            return deserialize(in, desc, true);
-        }
-
-        public Pair<SSTableMetadata, Set<Integer>> deserialize(DataInputStream in, Descriptor desc, boolean loadSSTableLevel) throws IOException
-        {
-            EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
-            EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
-            ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
-            long minTimestamp = in.readLong();
-            long maxTimestamp = in.readLong();
-            int maxLocalDeletionTime = desc.version.tracksMaxLocalDeletionTime ? in.readInt() : Integer.MAX_VALUE;
-            double bloomFilterFPChance = desc.version.hasBloomFilterFPChance ? in.readDouble() : NO_BLOOM_FLITER_FP_CHANCE;
-            double compressionRatio = in.readDouble();
-            String partitioner = in.readUTF();
-            int nbAncestors = in.readInt();
-            Set<Integer> ancestors = new HashSet<Integer>(nbAncestors);
-            for (int i = 0; i < nbAncestors; i++)
-                ancestors.add(in.readInt());
-            StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
-            int sstableLevel = 0;
-
-            if (loadSSTableLevel && in.available() > 0)
-                sstableLevel = in.readInt();
-
-            List<ByteBuffer> minColumnNames;
-            List<ByteBuffer> maxColumnNames;
-            if (desc.version.tracksMaxMinColumnNames)
-            {
-                int colCount = in.readInt();
-                minColumnNames = new ArrayList<ByteBuffer>(colCount);
-                for (int i = 0; i < colCount; i++)
-                {
-                    minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
-                }
-                colCount = in.readInt();
-                maxColumnNames = new ArrayList<ByteBuffer>(colCount);
-                for (int i = 0; i < colCount; i++)
-                {
-                    maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
-                }
-            }
-            else
-            {
-                minColumnNames = Collections.emptyList();
-                maxColumnNames = Collections.emptyList();
-            }
-            return Pair.create(new SSTableMetadata(rowSizes,
-                                       columnCounts,
-                                       replayPosition,
-                                       minTimestamp,
-                                       maxTimestamp,
-                                       maxLocalDeletionTime,
-                                       bloomFilterFPChance,
-                                       compressionRatio,
-                                       partitioner,
-                                       tombstoneHistogram,
-                                       sstableLevel,
-                                       minColumnNames,
-                                       maxColumnNames), ancestors);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index f82af1f..055f4b6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -46,9 +46,11 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.metadata.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.service.CacheService;
@@ -123,7 +125,7 @@ public class SSTableReader extends SSTable implements Closeable
 
     private final SSTableDeletingTask deletingTask;
     // not final since we need to be able to change level on a file.
-    private volatile SSTableMetadata sstableMetadata;
+    private volatile StatsMetadata sstableMetadata;
 
     private final AtomicLong keyCacheHit = new AtomicLong(0);
     private final AtomicLong keyCacheRequest = new AtomicLong(0);
@@ -173,38 +175,68 @@ public class SSTableReader extends SSTable implements Closeable
         return open(desc, componentsFor(desc), metadata, p);
     }
 
+    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+    {
+        return open(descriptor, components, metadata, partitioner, true);
+    }
+
     public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
     {
         return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
     }
 
+    /**
+     * Open SSTable reader to be used in batch mode(such as sstableloader).
+     *
+     * @param descriptor
+     * @param components
+     * @param metadata
+     * @param partitioner
+     * @return opened SSTableReader
+     * @throws IOException
+     */
     public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
-        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+        // Minimum components without which we can't do anything
+        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+                                                                                                               EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+        // Check if sstable is created using same partitioner.
+        // Partitioner can be null, which indicates older version of sstable or no stats available.
+        // In that case, we skip the check.
+        String partitionerName = partitioner.getClass().getCanonicalName();
+        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+        {
+            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+                                              descriptor, validationMetadata.partitioner, partitionerName));
+            System.exit(1);
+        }
+
+        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
         SSTableReader sstable = new SSTableReader(descriptor,
                                                   components,
                                                   metadata,
                                                   partitioner,
                                                   System.currentTimeMillis(),
-                                                  sstableMetadata);
+                                                  statsMetadata);
 
         // special implementation of load to use non-pooled SegmentedFile builders
         SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
         SegmentedFile.Builder dbuilder = sstable.compression
                                        ? new CompressedSegmentedFile.Builder()
                                        : new BufferedSegmentedFile.Builder();
-        if (!sstable.loadSummary(ibuilder, dbuilder, sstable.metadata))
+        if (!sstable.loadSummary(ibuilder, dbuilder))
             sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
         sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
         sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
-
         sstable.bf = FilterFactory.AlwaysPresent;
-        return sstable;
-    }
 
-    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
-    {
-        return open(descriptor, components, metadata, partitioner, true);
+        return sstable;
     }
 
     private static SSTableReader open(Descriptor descriptor,
@@ -213,53 +245,48 @@ public class SSTableReader extends SSTable implements Closeable
                                       IPartitioner partitioner,
                                       boolean validate) throws IOException
     {
-        long start = System.nanoTime();
-        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+        // Minimum components without which we can't do anything
+        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+                                                                                                               EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+        // Check if sstable is created using same partitioner.
+        // Partitioner can be null, which indicates older version of sstable or no stats available.
+        // In that case, we skip the check.
+        String partitionerName = partitioner.getClass().getCanonicalName();
+        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+        {
+            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+                                              descriptor, validationMetadata.partitioner, partitionerName));
+            System.exit(1);
+        }
 
+        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
         SSTableReader sstable = new SSTableReader(descriptor,
                                                   components,
                                                   metadata,
                                                   partitioner,
                                                   System.currentTimeMillis(),
-                                                  sstableMetadata);
+                                                  statsMetadata);
 
-        sstable.load();
+        // load index and filter
+        long start = System.nanoTime();
+        sstable.load(validationMetadata);
+        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 
         if (validate)
             sstable.validate();
 
-        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
         if (sstable.getKeyCache() != null)
             logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 
         return sstable;
     }
 
-    private static SSTableMetadata openMetadata(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
-    {
-        assert partitioner != null;
-        // Minimum components without which we can't do anything
-        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
-        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
-
-        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
-
-        SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left;
-
-        // Check if sstable is created using same partitioner.
-        // Partitioner can be null, which indicates older version of sstable or no stats available.
-        // In that case, we skip the check.
-        String partitionerName = partitioner.getClass().getCanonicalName();
-        if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
-        {
-            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
-                                       descriptor, sstableMetadata.partitioner, partitionerName));
-            System.exit(1);
-        }
-        return sstableMetadata;
-    }
-
     public static void logOpenException(Descriptor descriptor, IOException e)
     {
         if (e instanceof FileNotFoundException)
@@ -323,7 +350,7 @@ public class SSTableReader extends SSTable implements Closeable
                                       IndexSummary isummary,
                                       IFilter bf,
                                       long maxDataAge,
-                                      SSTableMetadata sstableMetadata)
+                                      StatsMetadata sstableMetadata)
     {
         assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
         return new SSTableReader(desc,
@@ -343,7 +370,7 @@ public class SSTableReader extends SSTable implements Closeable
                           CFMetaData metadata,
                           IPartitioner partitioner,
                           long maxDataAge,
-                          SSTableMetadata sstableMetadata)
+                          StatsMetadata sstableMetadata)
     {
         super(desc, components, metadata, partitioner);
         this.sstableMetadata = sstableMetadata;
@@ -384,7 +411,7 @@ public class SSTableReader extends SSTable implements Closeable
                           IndexSummary indexSummary,
                           IFilter bloomFilter,
                           long maxDataAge,
-                          SSTableMetadata sstableMetadata)
+                          StatsMetadata sstableMetadata)
     {
         this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata);
 
@@ -436,7 +463,7 @@ public class SSTableReader extends SSTable implements Closeable
         keyCache = CacheService.instance.keyCache;
     }
 
-    private void load() throws IOException
+    private void load(ValidationMetadata validation) throws IOException
     {
         if (metadata.getBloomFilterFpChance() == 1.0)
         {
@@ -444,12 +471,12 @@ public class SSTableReader extends SSTable implements Closeable
             load(false, true);
             bf = FilterFactory.AlwaysPresent;
         }
-        else if (!components.contains(Component.FILTER))
+        else if (!components.contains(Component.FILTER) || validation == null)
         {
             // bf is enabled, but filter component is missing.
             load(true, true);
         }
-        else if (descriptor.version.hasBloomFilterFPChance && sstableMetadata.bloomFilterFPChance != metadata.getBloomFilterFpChance())
+        else if (descriptor.version.hasBloomFilterFPChance && validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
         {
             // bf fp chance in sstable metadata and it has changed since compaction.
             load(true, true);
@@ -462,7 +489,12 @@ public class SSTableReader extends SSTable implements Closeable
         }
     }
 
-    void loadBloomFilter() throws IOException
+    /**
+     * Load bloom filter from Filter.db file.
+     *
+     * @throws IOException
+     */
+    private void loadBloomFilter() throws IOException
     {
         DataInputStream stream = null;
         try
@@ -488,7 +520,7 @@ public class SSTableReader extends SSTable implements Closeable
                                          ? SegmentedFile.getCompressedBuilder()
                                          : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
-        boolean summaryLoaded = loadSummary(ibuilder, dbuilder, metadata);
+        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
         if (recreateBloomFilter || !summaryLoaded)
             buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 
@@ -498,6 +530,15 @@ public class SSTableReader extends SSTable implements Closeable
             saveSummary(ibuilder, dbuilder);
     }
 
+    /**
+     * Build index summary(and optionally bloom filter) by reading through Index.db file.
+     *
+     * @param recreateBloomFilter true if recreate bloom filter
+     * @param ibuilder
+     * @param dbuilder
+     * @param summaryLoaded true if index summary is already loaded and not need to build again
+     * @throws IOException
+     */
     private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
     {
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
@@ -552,7 +593,17 @@ public class SSTableReader extends SSTable implements Closeable
         last = getMinimalKey(last);
     }
 
-    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
+    /**
+     * Load index summary from Summary.db file if it exists.
+     *
+     * if loaded index summary has different index interval from current value stored in schema,
+     * then Summary.db file will be deleted and this returns false to rebuild summary.
+     *
+     * @param ibuilder
+     * @param dbuilder
+     * @return true if index summary is loaded successfully from Summary.db file.
+     */
+    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
     {
         File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
         if (!descriptor.version.offHeapSummaries || !summariesFile.exists())
@@ -562,15 +613,7 @@ public class SSTableReader extends SSTable implements Closeable
         try
         {
             iStream = new DataInputStream(new FileInputStream(summariesFile));
-            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel);
-            if (indexSummary.getIndexInterval() != metadata.getIndexInterval())
-            {
-                iStream.close();
-                logger.debug("Cannot read the saved summary for {} because Index Interval changed from {} to {}.",
-                             toString(), indexSummary.getIndexInterval(), metadata.getIndexInterval());
-                FileUtils.deleteWithConfirm(summariesFile);
-                return false;
-            }
+            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel, metadata.getIndexInterval());
             first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             ibuilder.deserializeBounds(iStream);
@@ -578,9 +621,10 @@ public class SSTableReader extends SSTable implements Closeable
         }
         catch (IOException e)
         {
-            logger.debug("Cannot deserialize SSTable Summary: ", e);
+            logger.debug("Cannot deserialize SSTable {} Summary: {}", toString(), e.getMessage());
             // corrupted; delete it and fall back to creating a new summary
             FileUtils.closeQuietly(iStream);
+            // delete it and fall back to creating a new summary
             FileUtils.deleteWithConfirm(summariesFile);
             return false;
         }
@@ -592,6 +636,12 @@ public class SSTableReader extends SSTable implements Closeable
         return true;
     }
 
+    /**
+     * Save index summary to Summary.db file.
+     *
+     * @param ibuilder
+     * @param dbuilder
+     */
     public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
     {
         saveSummary(ibuilder, dbuilder, indexSummary);
@@ -830,7 +880,7 @@ public class SSTableReader extends SSTable implements Closeable
     private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
     {
         // use the index to determine a minimal section for each range
-        List<Pair<Integer,Integer>> positions = new ArrayList<Pair<Integer,Integer>>();
+        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 
         for (Range<Token> range : Range.normalize(ranges))
         {
@@ -864,7 +914,7 @@ public class SSTableReader extends SSTable implements Closeable
             if (left > right)
                 // empty range
                 continue;
-            positions.add(Pair.create(Integer.valueOf(left), Integer.valueOf(right)));
+            positions.add(Pair.create(left, right));
         }
         return positions;
     }
@@ -924,7 +974,7 @@ public class SSTableReader extends SSTable implements Closeable
     public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
     {
         // use the index to determine a minimal section for each range
-        List<Pair<Long,Long>> positions = new ArrayList<Pair<Long,Long>>();
+        List<Pair<Long,Long>> positions = new ArrayList<>();
         for (Range<Token> range : Range.normalize(ranges))
         {
             AbstractBounds<RowPosition> keyRange = range.toRowBounds();
@@ -941,7 +991,7 @@ public class SSTableReader extends SSTable implements Closeable
             if (left == right)
                 // empty range
                 continue;
-            positions.add(Pair.create(Long.valueOf(left), Long.valueOf(right)));
+            positions.add(Pair.create(left, right));
         }
         return positions;
     }
@@ -1481,7 +1531,8 @@ public class SSTableReader extends SSTable implements Closeable
     {
         try
         {
-            return SSTableMetadata.serializer.deserialize(descriptor).right;
+            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
+            return compactionMetadata.ancestors;
         }
         catch (IOException e)
         {
@@ -1506,10 +1557,10 @@ public class SSTableReader extends SSTable implements Closeable
      */
     public void reloadSSTableMetadata() throws IOException
     {
-        this.sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left;
+        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
     }
 
-    public SSTableMetadata getSSTableMetadata()
+    public StatsMetadata getSSTableMetadata()
     {
         return sstableMetadata;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 5b02d37..60bb8d1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -33,6 +33,10 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -53,7 +57,7 @@ public class SSTableWriter extends SSTable
     private final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
-    private final SSTableMetadata.Collector sstableMetadataCollector;
+    private final MetadataCollector sstableMetadataCollector;
 
     public SSTableWriter(String filename, long keyCount)
     {
@@ -61,7 +65,7 @@ public class SSTableWriter extends SSTable
              keyCount,
              Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
              StorageService.getPartitioner(),
-             SSTableMetadata.createCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
+             new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
     }
 
     private static Set<Component> components(CFMetaData metadata)
@@ -93,7 +97,7 @@ public class SSTableWriter extends SSTable
                          long keyCount,
                          CFMetaData metadata,
                          IPartitioner<?> partitioner,
-                         SSTableMetadata.Collector sstableMetadataCollector)
+                         MetadataCollector sstableMetadataCollector)
     {
         super(Descriptor.fromFilename(filename),
               components(metadata),
@@ -308,9 +312,9 @@ public class SSTableWriter extends SSTable
 
     public SSTableReader closeAndOpenReader(long maxDataAge)
     {
-        Pair<Descriptor, SSTableMetadata> p = close();
+        Pair<Descriptor, StatsMetadata> p = close();
         Descriptor newdesc = p.left;
-        SSTableMetadata sstableMetadata = p.right;
+        StatsMetadata sstableMetadata = p.right;
 
         // finalize in-memory state for the reader
         SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
@@ -335,36 +339,40 @@ public class SSTableWriter extends SSTable
     }
 
     // Close the writer and return the descriptor to the new sstable and it's metadata
-    public Pair<Descriptor, SSTableMetadata> close()
+    public Pair<Descriptor, StatsMetadata> close()
     {
         // index and filter
         iwriter.close();
         // main data, close will truncate if necessary
         dataFile.close();
         // write sstable statistics
-        SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+        Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
+                                                                                    partitioner.getClass().getCanonicalName(),
                                                                                     metadata.getBloomFilterFpChance());
-        writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors);
+        writeMetadata(descriptor, metadataComponents);
 
         // save the table of components
         SSTable.appendTOC(descriptor, components);
 
         // remove the 'tmp' marker from all components
-        return Pair.create(rename(descriptor, components), sstableMetadata);
+        return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
     }
 
-    private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata,  Set<Integer> ancestors)
+    private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
     {
         SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
         try
         {
-            SSTableMetadata.serializer.serialize(sstableMetadata, ancestors, out.stream);
+            desc.getMetadataSerializer().serialize(components, out.stream);
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, out.getPath());
         }
-        out.close();
+        finally
+        {
+            out.close();
+        }
     }
 
     static Descriptor rename(Descriptor tmpdesc, Set<Component> components)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
new file mode 100644
index 0000000..fd0e626
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Compaction related SSTable metadata.
+ *
+ * Only loaded for <b>compacting</b> SSTables at the time of compaction.
+ */
+public class CompactionMetadata extends MetadataComponent
+{
+    public static final IMetadataComponentSerializer serializer = new CompactionMetadataSerializer();
+
+    public final Set<Integer> ancestors;
+
+    public CompactionMetadata(Set<Integer> ancestors)
+    {
+        this.ancestors = ancestors;
+    }
+
+    public MetadataType getType()
+    {
+        return MetadataType.COMPACTION;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        CompactionMetadata that = (CompactionMetadata) o;
+        return ancestors == null ? that.ancestors == null : ancestors.equals(that.ancestors);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return ancestors != null ? ancestors.hashCode() : 0;
+    }
+
+    public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>
+    {
+        public int serializedSize(CompactionMetadata component) throws IOException
+        {
+            int size = 0;
+            size += TypeSizes.NATIVE.sizeof(component.ancestors.size());
+            for (int g : component.ancestors)
+                size += TypeSizes.NATIVE.sizeof(g);
+            return size;
+        }
+
+        public void serialize(CompactionMetadata component, DataOutput out) throws IOException
+        {
+            out.writeInt(component.ancestors.size());
+            for (int g : component.ancestors)
+                out.writeInt(g);
+        }
+
+        public CompactionMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        {
+            int nbAncestors = in.readInt();
+            Set<Integer> ancestors = new HashSet<>(nbAncestors);
+            for (int i = 0; i < nbAncestors; i++)
+                ancestors.add(in.readInt());
+            return new CompactionMetadata(ancestors);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
new file mode 100644
index 0000000..53ca138
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Metadata component serializer
+ */
+public interface IMetadataComponentSerializer<T extends MetadataComponent>
+{
+    /**
+     * Calculate and return serialized size.
+     *
+     * @param component MetadataComponent to calculate serialized size
+     * @return serialized size of this component
+     * @throws IOException
+     */
+    int serializedSize(T component) throws IOException;
+
+    /**
+     * Serialize metadata component to given output.
+     *
+     * @param component MetadataComponent to serialize
+     * @param out  serialize destination
+     * @throws IOException
+     */
+    void serialize(T component, DataOutput out) throws IOException;
+
+    /**
+     * Deserialize metadata component from given input.
+     *
+     * @param version serialize version
+     * @param in deserialize source
+     * @return Deserialized component
+     * @throws IOException
+     */
+    T deserialize(Descriptor.Version version, DataInput in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
new file mode 100644
index 0000000..0875e5d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Interface for SSTable metadata serializer
+ */
+public interface IMetadataSerializer
+{
+    /**
+     * Serialize given metadata components
+     *
+     * @param components Metadata components to serialize
+     * @throws IOException
+     */
+    void serialize(Map<MetadataType, MetadataComponent> components, DataOutput out) throws IOException;
+
+    /**
+     * Deserialize specified metadata components from given descriptor.
+     *
+     * @param descriptor SSTable descriptor
+     * @return Deserialized metadata components, in deserialized order.
+     * @throws IOException
+     */
+    Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException;
+
+    /**
+     * Deserialized only metadata component specified from given descriptor.
+     *
+     * @param descriptor SSTable descriptor
+     * @param type Metadata component type to deserialize
+     * @return Deserialized metadata component. Can be null if specified type does not exist.
+     * @throws IOException
+     */
+    MetadataComponent deserialize(Descriptor descriptor, MetadataType type) throws IOException;
+
+    /**
+     * Mutate SSTable level
+     *
+     * @param descriptor SSTable descriptor
+     * @param newLevel new SSTable level
+     * @throws IOException
+     */
+    void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
+}