You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:39 UTC

[15/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
deleted file mode 100644
index 6db9c3d..0000000
--- a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.format.big;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.List;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * This is a reader that finds the block for a starting column and returns blocks before/after it for each next call.
- * This function assumes that the CF is sorted by name and exploits the name index.
- */
-class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-{
-    private final ColumnFamily emptyColumnFamily;
-
-    private final SSTableReader sstable;
-    private final List<IndexHelper.IndexInfo> indexes;
-    private final FileDataInput originalInput;
-    private FileDataInput file;
-    private final boolean reversed;
-    private final ColumnSlice[] slices;
-    private final BlockFetcher fetcher;
-    private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>();
-    private final CellNameType comparator;
-
-    // Holds range tombstone in reverse queries. See addColumn()
-    private final Deque<OnDiskAtom> rangeTombstonesReversed;
-
-    /**
-     * This slice reader assumes that slices are sorted correctly, e.g. that for forward lookup slices are in
-     * lexicographic order of start elements and that for reverse lookup they are in reverse lexicographic order of
-     * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also
-     * assumes that validation has been performed in terms of intervals (no overlapping intervals).
-     */
-    IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed)
-    {
-        Tracing.trace("Seeking to partition indexed section in data file");
-        this.sstable = sstable;
-        this.originalInput = input;
-        this.reversed = reversed;
-        this.slices = slices;
-        this.comparator = sstable.metadata.comparator;
-        this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() : null;
-
-        try
-        {
-            this.indexes = indexEntry.columnsIndex();
-            emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata);
-            if (indexes.isEmpty())
-            {
-                setToRowStart(indexEntry, input);
-                emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
-                fetcher = new SimpleBlockFetcher();
-            }
-            else
-            {
-                emptyColumnFamily.delete(indexEntry.deletionTime());
-                fetcher = new IndexedBlockFetcher(indexEntry.position);
-            }
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, file.getPath());
-        }
-    }
-
-    /**
-     * Sets the seek position to the start of the row for column scanning.
-     */
-    private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException
-    {
-        if (in == null)
-        {
-            this.file = sstable.getFileDataInput(rowEntry.position);
-        }
-        else
-        {
-            this.file = in;
-            in.seek(rowEntry.position);
-        }
-        sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return emptyColumnFamily;
-    }
-
-    public DecoratedKey getKey()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    protected OnDiskAtom computeNext()
-    {
-        while (true)
-        {
-            if (reversed)
-            {
-                // Return all tombstone for the block first (see addColumn() below)
-                OnDiskAtom column = rangeTombstonesReversed.poll();
-                if (column != null)
-                    return column;
-            }
-
-            OnDiskAtom column = blockColumns.poll();
-            if (column == null)
-            {
-                if (!fetcher.fetchMoreData())
-                    return endOfData();
-            }
-            else
-            {
-                return column;
-            }
-        }
-    }
-
-    public void close() throws IOException
-    {
-        if (originalInput == null && file != null)
-            file.close();
-    }
-
-    protected void addColumn(OnDiskAtom col)
-    {
-        if (reversed)
-        {
-            /*
-             * We put range tomstone markers at the beginning of the range they delete. But for reversed queries,
-             * the caller still need to know about a RangeTombstone before it sees any column that it covers.
-             * To make that simple, we keep said tombstones separate and return them all before any column for
-             * a given block.
-             */
-            if (col instanceof RangeTombstone)
-                rangeTombstonesReversed.addFirst(col);
-            else
-                blockColumns.addFirst(col);
-        }
-        else
-        {
-            blockColumns.addLast(col);
-        }
-    }
-
-    private abstract class BlockFetcher
-    {
-        protected int currentSliceIdx;
-
-        protected BlockFetcher(int sliceIdx)
-        {
-            this.currentSliceIdx = sliceIdx;
-        }
-
-        /*
-         * Return the smallest key selected by the current ColumnSlice.
-         */
-        protected Composite currentStart()
-        {
-            return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start;
-        }
-
-        /*
-         * Return the biggest key selected by the current ColumnSlice.
-         */
-        protected Composite currentFinish()
-        {
-            return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish;
-        }
-
-        protected abstract boolean setNextSlice();
-
-        protected abstract boolean fetchMoreData();
-
-        protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
-        {
-            return isBeforeSliceStart(column.name());
-        }
-
-        protected boolean isBeforeSliceStart(Composite name)
-        {
-            Composite start = currentStart();
-            return !start.isEmpty() && comparator.compare(name, start) < 0;
-        }
-
-        protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
-        {
-            Composite finish = currentFinish();
-            return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0;
-        }
-
-        protected boolean isAfterSliceFinish(Composite name)
-        {
-            Composite finish = currentFinish();
-            return !finish.isEmpty() && comparator.compare(name, finish) > 0;
-        }
-    }
-
-    private class IndexedBlockFetcher extends BlockFetcher
-    {
-        // where this row starts
-        private final long columnsStart;
-
-        // the index entry for the next block to deserialize
-        private int nextIndexIdx = -1;
-
-        // index of the last block we've read from disk;
-        private int lastDeserializedBlock = -1;
-
-        // For reversed, keep columns at the beginning of the last deserialized block that
-        // may still match a slice
-        private final Deque<OnDiskAtom> prefetched;
-
-        public IndexedBlockFetcher(long columnsStart)
-        {
-            super(-1);
-            this.columnsStart = columnsStart;
-            this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
-            setNextSlice();
-        }
-
-        protected boolean setNextSlice()
-        {
-            while (++currentSliceIdx < slices.length)
-            {
-                nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, reversed, nextIndexIdx);
-                if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size())
-                    // no index block for that slice
-                    continue;
-
-                // Check if we can exclude this slice entirely from the index
-                IndexInfo info = indexes.get(nextIndexIdx);
-                if (reversed)
-                {
-                    if (!isBeforeSliceStart(info.lastName))
-                        return true;
-                }
-                else
-                {
-                    if (!isAfterSliceFinish(info.firstName))
-                        return true;
-                }
-            }
-            nextIndexIdx = -1;
-            return false;
-        }
-
-        protected boolean hasMoreSlice()
-        {
-            return currentSliceIdx < slices.length;
-        }
-
-        protected boolean fetchMoreData()
-        {
-            if (!hasMoreSlice())
-                return false;
-
-            // If we read blocks in reversed disk order, we may have columns from the previous block to handle.
-            // Note that prefetched keeps columns in reversed disk order.
-            // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones
-            // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include
-            // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that
-            // can be mistakenly added this way.
-            if (reversed && !prefetched.isEmpty())
-            {
-                // Avoids some comparison when we know it's not useful
-                boolean inSlice = false;
-
-                OnDiskAtom prefetchedCol;
-                while ((prefetchedCol = prefetched.peek()) != null)
-                {
-                    // col is before slice, we update the slice
-                    if (isColumnBeforeSliceStart(prefetchedCol))
-                    {
-                        inSlice = false;
-
-                        // As explained above, we add RT unconditionally
-                        if (prefetchedCol instanceof RangeTombstone)
-                        {
-                            blockColumns.addLast(prefetched.poll());
-                            continue;
-                        }
-
-                        // Otherwise, we either move to the next slice. If we have no more slice, then
-                        // simply unwind prefetched entirely and add all RT.
-                        if (!setNextSlice())
-                        {
-                            while ((prefetchedCol = prefetched.poll()) != null)
-                                if (prefetchedCol instanceof RangeTombstone)
-                                    blockColumns.addLast(prefetchedCol);
-                            break;
-                        }
-
-                    }
-                    // col is within slice, all columns
-                    // (we go in reverse, so as soon as we are in a slice, no need to check
-                    // we're after the slice until we change slice)
-                    else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol))
-                    {
-                        blockColumns.addLast(prefetched.poll());
-                        inSlice = true;
-                    }
-                    // if col is after slice, ignore
-                    else
-                    {
-                        prefetched.poll();
-                    }
-                }
-
-                if (!blockColumns.isEmpty())
-                    return true;
-                else if (!hasMoreSlice())
-                    return false;
-            }
-            try
-            {
-                return getNextBlock();
-            }
-            catch (IOException e)
-            {
-                throw new CorruptSSTableException(e, file.getPath());
-            }
-        }
-
-        private boolean getNextBlock() throws IOException
-        {
-            if (lastDeserializedBlock == nextIndexIdx)
-            {
-                if (reversed)
-                    nextIndexIdx--;
-                else
-                    nextIndexIdx++;
-            }
-            lastDeserializedBlock = nextIndexIdx;
-
-            // Are we done?
-            if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size())
-                return false;
-
-            IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
-
-            /* seek to the correct offset to the data, and calculate the data size */
-            long positionToSeek = columnsStart + currentIndex.offset;
-
-            // With new promoted indexes, our first seek in the data file will happen at that point.
-            if (file == null)
-                file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput;
-
-            AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
-
-            file.seek(positionToSeek);
-            FileMark mark = file.mark();
-
-            // We remenber when we are whithin a slice to avoid some comparison
-            boolean inSlice = false;
-
-            // scan from index start
-            while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed())
-            {
-                // col is before slice
-                // (If in slice, don't bother checking that until we change slice)
-                Composite start = currentStart();
-                if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
-                {
-                    // If it's a rangeTombstone, then we need to read it and include it unless it's end
-                    // stops before our slice start.
-                    if (deserializer.nextIsRangeTombstone())
-                    {
-                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
-                        if (comparator.compare(rt.max, start) >= 0)
-                            addColumn(rt);
-                        continue;
-                    }
-
-                    if (reversed)
-                    {
-                        // the next slice select columns that are before the current one, so it may
-                        // match this column, so keep it around.
-                        prefetched.addFirst(deserializer.readNext());
-                    }
-                    else
-                    {
-                        deserializer.skipNext();
-                    }
-                }
-                // col is within slice
-                else
-                {
-                    Composite finish = currentFinish();
-                    if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
-                    {
-                        inSlice = true;
-                        addColumn(deserializer.readNext());
-                    }
-                    // col is after slice.
-                    else
-                    {
-                        // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice.
-                        // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous
-                        // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first
-                        // columns of the block were not part of the current slice, i.e. if we have columns in prefetched.
-                        if (reversed && prefetched.isEmpty())
-                            break;
-
-                        if (!setNextSlice())
-                            break;
-
-                        inSlice = false;
-
-                        // The next index block now corresponds to the first block that may have columns for the newly set slice.
-                        // So if it's different from the current block, we're done with this block. And in that case, we know
-                        // that our prefetched columns won't match.
-                        if (nextIndexIdx != lastDeserializedBlock)
-                        {
-                            if (reversed)
-                                prefetched.clear();
-                            break;
-                        }
-
-                        // Even if the next slice may have column in this blocks, if we're reversed, those columns have been
-                        // prefetched and we're done with that block
-                        if (reversed)
-                            break;
-
-                        // otherwise, we will deal with that column at the next iteration
-                    }
-                }
-            }
-            return true;
-        }
-    }
-
-    private class SimpleBlockFetcher extends BlockFetcher
-    {
-        public SimpleBlockFetcher() throws IOException
-        {
-            // Since we have to deserialize in order and will read all slices might as well reverse the slices and
-            // behave as if it was not reversed
-            super(reversed ? slices.length - 1 : 0);
-
-            // We remenber when we are whithin a slice to avoid some comparison
-            boolean inSlice = false;
-
-            AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
-            while (deserializer.hasNext())
-            {
-                // col is before slice
-                // (If in slice, don't bother checking that until we change slice)
-                Composite start = currentStart();
-                if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
-                {
-                    // If it's a rangeTombstone, then we need to read it and include it unless it's end
-                    // stops before our slice start. Otherwise, we can skip it.
-                    if (deserializer.nextIsRangeTombstone())
-                    {
-                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
-                        if (comparator.compare(rt.max, start) >= 0)
-                            addColumn(rt);
-                    }
-                    else
-                    {
-                        deserializer.skipNext();
-                    }
-                    continue;
-                }
-
-                // col is within slice
-                Composite finish = currentFinish();
-                if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
-                {
-                    inSlice = true;
-                    addColumn(deserializer.readNext());
-                }
-                // col is after slice. more slices?
-                else
-                {
-                    inSlice = false;
-                    if (!setNextSlice())
-                        break;
-                }
-            }
-        }
-
-        protected boolean setNextSlice()
-        {
-            if (reversed)
-            {
-                if (currentSliceIdx <= 0)
-                    return false;
-
-                currentSliceIdx--;
-            }
-            else
-            {
-                if (currentSliceIdx >= slices.length - 1)
-                    return false;
-
-                currentSliceIdx++;
-            }
-            return true;
-        }
-
-        protected boolean fetchMoreData()
-        {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
deleted file mode 100644
index b8910c7..0000000
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.format.big;
-
-import java.io.IOException;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-{
-    private ColumnFamily cf;
-    private final SSTableReader sstable;
-    private FileDataInput fileToClose;
-    private Iterator<OnDiskAtom> iter;
-    public final SortedSet<CellName> columns;
-    public final DecoratedKey key;
-
-    public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName> columns)
-    {
-        assert columns != null;
-        this.sstable = sstable;
-        this.columns = columns;
-        this.key = key;
-
-        RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
-        if (indexEntry == null)
-            return;
-
-        try
-        {
-            read(sstable, null, indexEntry);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, sstable.getFilename());
-        }
-        finally
-        {
-            if (fileToClose != null)
-                FileUtils.closeQuietly(fileToClose);
-        }
-    }
-
-    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
-    {
-        assert columns != null;
-        this.sstable = sstable;
-        this.columns = columns;
-        this.key = key;
-
-        try
-        {
-            read(sstable, file, indexEntry);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, sstable.getFilename());
-        }
-    }
-
-    private FileDataInput createFileDataInput(long position)
-    {
-        fileToClose = sstable.getFileDataInput(position);
-        return fileToClose;
-    }
-
-    @SuppressWarnings("resource")
-    private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry)
-    throws IOException
-    {
-        List<IndexHelper.IndexInfo> indexList;
-
-        // If the entry is not indexed or the index is not promoted, read from the row start
-        if (!indexEntry.isIndexed())
-        {
-            if (file == null)
-                file = createFileDataInput(indexEntry.position);
-            else
-                file.seek(indexEntry.position);
-
-            DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
-            assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
-        }
-
-        indexList = indexEntry.columnsIndex();
-
-        if (!indexEntry.isIndexed())
-        {
-            ColumnFamilySerializer serializer = ColumnFamily.serializer;
-            try
-            {
-                cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
-                cf.delete(DeletionTime.serializer.deserialize(file));
-            }
-            catch (Exception e)
-            {
-                throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e);
-            }
-        }
-        else
-        {
-            cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
-            cf.delete(indexEntry.deletionTime());
-        }
-
-        List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
-        if (indexList.isEmpty())
-        {
-            readSimpleColumns(file, columns, result);
-        }
-        else
-        {
-            readIndexedColumns(sstable.metadata, file, columns, indexList, indexEntry.position, result);
-        }
-
-        // create an iterator view of the columns we read
-        iter = result.iterator();
-    }
-
-    private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames, List<OnDiskAtom> result)
-    {
-        Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, sstable.descriptor.version);
-        int n = 0;
-        while (atomIterator.hasNext())
-        {
-            OnDiskAtom column = atomIterator.next();
-            if (column instanceof Cell)
-            {
-                if (columnNames.contains(column.name()))
-                {
-                    result.add(column);
-                    if (++n >= columns.size())
-                        break;
-                }
-            }
-            else
-            {
-                result.add(column);
-            }
-        }
-    }
-
-    @SuppressWarnings("resource")
-    private void readIndexedColumns(CFMetaData metadata,
-                                    FileDataInput file,
-                                    SortedSet<CellName> columnNames,
-                                    List<IndexHelper.IndexInfo> indexList,
-                                    long basePosition,
-                                    List<OnDiskAtom> result)
-    throws IOException
-    {
-        /* get the various column ranges we have to read */
-        CellNameType comparator = metadata.comparator;
-        List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>();
-        int lastIndexIdx = -1;
-        for (CellName name : columnNames)
-        {
-            int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx);
-            if (index < 0 || index == indexList.size())
-                continue;
-            IndexHelper.IndexInfo indexInfo = indexList.get(index);
-            // Check the index block does contain the column names and that we haven't inserted this block yet.
-            if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx)
-                continue;
-
-            ranges.add(indexInfo);
-            lastIndexIdx = index;
-        }
-
-        if (ranges.isEmpty())
-            return;
-
-        Iterator<CellName> toFetch = columnNames.iterator();
-        CellName nextToFetch = toFetch.next();
-        for (IndexHelper.IndexInfo indexInfo : ranges)
-        {
-            long positionToSeek = basePosition + indexInfo.offset;
-
-            // With new promoted indexes, our first seek in the data file will happen at that point.
-            if (file == null)
-                file = createFileDataInput(positionToSeek);
-
-            AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
-            file.seek(positionToSeek);
-            FileMark mark = file.mark();
-            while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null)
-            {
-                int cmp = deserializer.compareNextTo(nextToFetch);
-                if (cmp < 0)
-                {
-                    // If it's a rangeTombstone, then we need to read it and include
-                    // it if it includes our target. Otherwise, we can skip it.
-                    if (deserializer.nextIsRangeTombstone())
-                    {
-                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
-                        if (comparator.compare(rt.max, nextToFetch) >= 0)
-                            result.add(rt);
-                    }
-                    else
-                    {
-                        deserializer.skipNext();
-                    }
-                }
-                else if (cmp == 0)
-                {
-                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
-                    result.add(deserializer.readNext());
-                }
-                else
-                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
-            }
-        }
-    }
-
-    public DecoratedKey getKey()
-    {
-        return key;
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return cf;
-    }
-
-    protected OnDiskAtom computeNext()
-    {
-        if (iter == null || !iter.hasNext())
-            return endOfData();
-        return iter.next();
-    }
-
-    public void close() throws IOException { }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
deleted file mode 100644
index 07d867d..0000000
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.format.big;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-
-/**
- *  A Cell Iterator over SSTable
- */
-class SSTableSliceIterator implements OnDiskAtomIterator
-{
-    private final OnDiskAtomIterator reader;
-    private final DecoratedKey key;
-
-    public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ColumnSlice[] slices, boolean reversed)
-    {
-        this.key = key;
-        RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
-        this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, slices, reversed);
-    }
-
-    /**
-     * An iterator for a slice within an SSTable
-     * @param sstable Keyspace for the CFS we are reading from
-     * @param file Optional parameter that input is read from.  If null is passed, this class creates an appropriate one automatically.
-     * If this class creates, it will close the underlying file when #close() is called.
-     * If a caller passes a non-null argument, this class will NOT close the underlying file when the iterator is closed (i.e. the caller is responsible for closing the file)
-     * In all cases the caller should explicitly #close() this iterator.
-     * @param key The key the requested slice resides under
-     * @param slices the column slices
-     * @param reversed Results are returned in reverse order iff reversed is true.
-     * @param indexEntry position of the row
-     */
-    public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry)
-    {
-        this.key = key;
-        reader = createReader(sstable, indexEntry, file, slices, reversed);
-    }
-
-    private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed)
-    {
-        return slices.length == 1 && slices[0].start.isEmpty() && !reversed
-             ? new SimpleSliceReader(sstable, indexEntry, file, slices[0].finish)
-             : new IndexedSliceReader(sstable, indexEntry, file, slices, reversed);
-    }
-
-    public DecoratedKey getKey()
-    {
-        return key;
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return reader == null ? null : reader.getColumnFamily();
-    }
-
-    public boolean hasNext()
-    {
-        return reader != null && reader.hasNext();
-    }
-
-    public OnDiskAtom next()
-    {
-        return reader.next();
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void close() throws IOException
-    {
-        if (reader != null)
-            reader.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
deleted file mode 100644
index 9fec303..0000000
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable.format.big;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-{
-    private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class);
-
-    private final FileDataInput file;
-    private final boolean needsClosing;
-    private final Composite finishColumn;
-    private final CellNameType comparator;
-    private final ColumnFamily emptyColumnFamily;
-    private final Iterator<OnDiskAtom> atomIterator;
-
-    SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, Composite finishColumn)
-    {
-        Tracing.trace("Seeking to partition beginning in data file");
-        this.finishColumn = finishColumn;
-        this.comparator = sstable.metadata.comparator;
-        try
-        {
-            if (input == null)
-            {
-                this.file = sstable.getFileDataInput(indexEntry.position);
-                this.needsClosing = true;
-            }
-            else
-            {
-                this.file = input;
-                input.seek(indexEntry.position);
-                this.needsClosing = false;
-            }
-
-            // Skip key and data size
-            ByteBufferUtil.skipShortLength(file);
-
-            emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata);
-            emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
-            atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, sstable.descriptor.version);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, sstable.getFilename());
-        }
-    }
-
-    protected OnDiskAtom computeNext()
-    {
-        if (!atomIterator.hasNext())
-            return endOfData();
-
-        OnDiskAtom column = atomIterator.next();
-        if (!finishColumn.isEmpty() && comparator.compare(column.name(), finishColumn) > 0)
-            return endOfData();
-
-        return column;
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return emptyColumnFamily;
-    }
-
-    public void close() throws IOException
-    {
-        if (needsClosing)
-            file.close();
-    }
-
-    public DecoratedKey getKey()
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 4bd060e..90a9f24 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -23,6 +23,7 @@ import java.util.*;
 
 import com.google.common.collect.Maps;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -64,12 +65,12 @@ public class LegacyMetadataSerializer extends MetadataSerializer
             out.writeInt(g);
         StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out);
         out.writeInt(stats.sstableLevel);
-        out.writeInt(stats.minColumnNames.size());
-        for (ByteBuffer columnName : stats.minColumnNames)
-            ByteBufferUtil.writeWithShortLength(columnName, out);
-        out.writeInt(stats.maxColumnNames.size());
-        for (ByteBuffer columnName : stats.maxColumnNames)
-            ByteBufferUtil.writeWithShortLength(columnName, out);
+        out.writeInt(stats.minClusteringValues.size());
+        for (ByteBuffer value : stats.minClusteringValues)
+            ByteBufferUtil.writeWithShortLength(value, out);
+        out.writeInt(stats.maxClusteringValues.size());
+        for (ByteBuffer value : stats.maxClusteringValues)
+            ByteBufferUtil.writeWithShortLength(value, out);
     }
 
     /**
@@ -127,14 +128,19 @@ public class LegacyMetadataSerializer extends MetadataSerializer
                                                      replayPosition,
                                                      minTimestamp,
                                                      maxTimestamp,
+                                                     Integer.MAX_VALUE,
                                                      maxLocalDeletionTime,
+                                                     0,
+                                                     Integer.MAX_VALUE,
                                                      compressionRatio,
                                                      tombstoneHistogram,
                                                      sstableLevel,
                                                      minColumnNames,
                                                      maxColumnNames,
                                                      true,
-                                                     ActiveRepairService.UNREPAIRED_SSTABLE));
+                                                     ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                     -1,
+                                                     -1));
                 if (types.contains(MetadataType.COMPACTION))
                     components.put(MetadataType.COMPACTION,
                                    new CompactionMetadata(ancestors, null));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 5962a46..2574c62 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -19,22 +19,20 @@ package org.apache.cassandra.io.sstable.metadata;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.ColumnNameHelper;
-import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -47,13 +45,13 @@ public class MetadataCollector
 {
     public static final double NO_COMPRESSION_RATIO = -1.0;
 
-    static EstimatedHistogram defaultColumnCountHistogram()
+    static EstimatedHistogram defaultCellPerPartitionCountHistogram()
     {
         // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
         return new EstimatedHistogram(114);
     }
 
-    static EstimatedHistogram defaultRowSizeHistogram()
+    static EstimatedHistogram defaultPartitionSizeHistogram()
     {
         // EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
         return new EstimatedHistogram(150);
@@ -66,34 +64,42 @@ public class MetadataCollector
 
     public static StatsMetadata defaultStatsMetadata()
     {
-        return new StatsMetadata(defaultRowSizeHistogram(),
-                                 defaultColumnCountHistogram(),
+        return new StatsMetadata(defaultPartitionSizeHistogram(),
+                                 defaultCellPerPartitionCountHistogram(),
                                  ReplayPosition.NONE,
                                  Long.MIN_VALUE,
                                  Long.MAX_VALUE,
                                  Integer.MAX_VALUE,
+                                 Integer.MAX_VALUE,
+                                 0,
+                                 Integer.MAX_VALUE,
                                  NO_COMPRESSION_RATIO,
                                  defaultTombstoneDropTimeHistogram(),
                                  0,
                                  Collections.<ByteBuffer>emptyList(),
                                  Collections.<ByteBuffer>emptyList(),
                                  true,
-                                 ActiveRepairService.UNREPAIRED_SSTABLE);
+                                 ActiveRepairService.UNREPAIRED_SSTABLE,
+                                 -1,
+                                 -1);
     }
 
-    protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
-    protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
+    protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
+    // TODO: cound the number of row per partition (either with the number of cells, or instead)
+    protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
     protected ReplayPosition replayPosition = ReplayPosition.NONE;
-    protected long minTimestamp = Long.MAX_VALUE;
-    protected long maxTimestamp = Long.MIN_VALUE;
-    protected int maxLocalDeletionTime = Integer.MIN_VALUE;
+    protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
+    protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_DELETION_TIME);
+    protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(LivenessInfo.NO_TTL, LivenessInfo.NO_TTL);
     protected double compressionRatio = NO_COMPRESSION_RATIO;
     protected Set<Integer> ancestors = new HashSet<>();
     protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
     protected int sstableLevel;
-    protected List<ByteBuffer> minColumnNames = Collections.emptyList();
-    protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+    protected ByteBuffer[] minClusteringValues;
+    protected ByteBuffer[] maxClusteringValues;
     protected boolean hasLegacyCounterShards = false;
+    protected long totalColumnsSet;
+    protected long totalRows;
 
     /**
      * Default cardinality estimation method is to use HyperLogLog++.
@@ -102,16 +108,19 @@ public class MetadataCollector
      * See CASSANDRA-5906 for detail.
      */
     protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
-    private final CellNameType columnNameComparator;
+    private final ClusteringComparator comparator;
 
-    public MetadataCollector(CellNameType columnNameComparator)
+    public MetadataCollector(ClusteringComparator comparator)
     {
-        this.columnNameComparator = columnNameComparator;
+        this.comparator = comparator;
+
+        this.minClusteringValues = new ByteBuffer[comparator.size()];
+        this.maxClusteringValues = new ByteBuffer[comparator.size()];
     }
 
-    public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level, boolean skipAncestors)
+    public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level, boolean skipAncestors)
     {
-        this(columnNameComparator);
+        this(comparator);
 
         replayPosition(ReplayPosition.getReplayPosition(sstables));
         sstableLevel(level);
@@ -129,9 +138,9 @@ public class MetadataCollector
         }
     }
 
-    public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level)
+    public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level)
     {
-        this(sstables, columnNameComparator, level, false);
+        this(sstables, comparator, level, false);
     }
 
     public MetadataCollector addKey(ByteBuffer key)
@@ -141,15 +150,15 @@ public class MetadataCollector
         return this;
     }
 
-    public MetadataCollector addRowSize(long rowSize)
+    public MetadataCollector addPartitionSizeInBytes(long partitionSize)
     {
-        estimatedRowSize.add(rowSize);
+        estimatedPartitionSize.add(partitionSize);
         return this;
     }
 
-    public MetadataCollector addColumnCount(long columnCount)
+    public MetadataCollector addCellPerPartitionCount(long cellCount)
     {
-        estimatedColumnCount.add(columnCount);
+        estimatedCellPerPartitionCount.add(cellCount);
         return this;
     }
 
@@ -169,34 +178,50 @@ public class MetadataCollector
         return this;
     }
 
-    public MetadataCollector updateMinTimestamp(long potentialMin)
+    public MetadataCollector update(LivenessInfo newInfo)
     {
-        minTimestamp = Math.min(minTimestamp, potentialMin);
+        // If the info doesn't have a timestamp, this means the info is basically irrelevant (it's a row
+        // update whose only info we care are the cells info basically).
+        if (newInfo.hasTimestamp())
+        {
+            updateTimestamp(newInfo.timestamp());
+            updateTTL(newInfo.ttl());
+            updateLocalDeletionTime(newInfo.localDeletionTime());
+        }
         return this;
     }
 
-    public MetadataCollector updateMaxTimestamp(long potentialMax)
+    public MetadataCollector update(DeletionTime dt)
     {
-        maxTimestamp = Math.max(maxTimestamp, potentialMax);
+        if (!dt.isLive())
+        {
+            updateTimestamp(dt.markedForDeleteAt());
+            updateLocalDeletionTime(dt.localDeletionTime());
+        }
         return this;
     }
 
-    public MetadataCollector updateMaxLocalDeletionTime(int maxLocalDeletionTime)
+    public MetadataCollector updateColumnSetPerRow(long columnSetInRow)
     {
-        this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
+        totalColumnsSet += columnSetInRow;
+        ++totalRows;
         return this;
     }
 
-    public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize)
+    private void updateTimestamp(long newTimestamp)
     {
-        this.estimatedRowSize = estimatedRowSize;
-        return this;
+        timestampTracker.update(newTimestamp);
     }
 
-    public MetadataCollector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
+    private void updateLocalDeletionTime(int newLocalDeletionTime)
     {
-        this.estimatedColumnCount = estimatedColumnCount;
-        return this;
+        localDeletionTimeTracker.update(newLocalDeletionTime);
+        estimatedTombstoneDropTime.update(newLocalDeletionTime);
+    }
+
+    private void updateTTL(int newTTL)
+    {
+        ttlTracker.update(newTTL);
     }
 
     public MetadataCollector replayPosition(ReplayPosition replayPosition)
@@ -217,58 +242,179 @@ public class MetadataCollector
         return this;
     }
 
-    public MetadataCollector updateMinColumnNames(List<ByteBuffer> minColumnNames)
+    public MetadataCollector updateClusteringValues(ClusteringPrefix clustering)
     {
-        if (minColumnNames.size() > 0)
-            this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator);
+        int size = clustering.size();
+        for (int i = 0; i < size; i++)
+        {
+            AbstractType<?> type = comparator.subtype(i);
+            ByteBuffer newValue = clustering.get(i);
+            minClusteringValues[i] = min(minClusteringValues[i], newValue, type);
+            maxClusteringValues[i] = max(maxClusteringValues[i], newValue, type);
+        }
         return this;
     }
 
-    public MetadataCollector updateMaxColumnNames(List<ByteBuffer> maxColumnNames)
+    private static ByteBuffer min(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
     {
-        if (maxColumnNames.size() > 0)
-            this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator);
-        return this;
+        if (b1 == null)
+            return b2;
+        if (b2 == null)
+            return b1;
+
+        if (comparator.compare(b1, b2) >= 0)
+            return b2;
+        return b1;
     }
 
-    public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
+    private static ByteBuffer max(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
     {
-        this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
-        return this;
+        if (b1 == null)
+            return b2;
+        if (b2 == null)
+            return b1;
+
+        if (comparator.compare(b1, b2) >= 0)
+            return b1;
+        return b2;
     }
 
-    public MetadataCollector update(long rowSize, ColumnStats stats)
+    public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
     {
-        updateMinTimestamp(stats.minTimestamp);
-        updateMaxTimestamp(stats.maxTimestamp);
-        updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
-        addRowSize(rowSize);
-        addColumnCount(stats.columnCount);
-        mergeTombstoneHistogram(stats.tombstoneHistogram);
-        updateMinColumnNames(stats.minColumnNames);
-        updateMaxColumnNames(stats.maxColumnNames);
-        updateHasLegacyCounterShards(stats.hasLegacyCounterShards);
+        this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
         return this;
     }
 
-    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt)
+    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader header)
     {
         Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
         components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
-        components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize,
-                                                             estimatedColumnCount,
+        components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize,
+                                                             estimatedCellPerPartitionCount,
                                                              replayPosition,
-                                                             minTimestamp,
-                                                             maxTimestamp,
-                                                             maxLocalDeletionTime,
+                                                             timestampTracker.min(),
+                                                             timestampTracker.max(),
+                                                             localDeletionTimeTracker.min(),
+                                                             localDeletionTimeTracker.max(),
+                                                             ttlTracker.min(),
+                                                             ttlTracker.max(),
                                                              compressionRatio,
                                                              estimatedTombstoneDropTime,
                                                              sstableLevel,
-                                                             ImmutableList.copyOf(minColumnNames),
-                                                             ImmutableList.copyOf(maxColumnNames),
+                                                             makeList(minClusteringValues),
+                                                             makeList(maxClusteringValues),
                                                              hasLegacyCounterShards,
-                                                             repairedAt));
+                                                             repairedAt,
+                                                             totalColumnsSet,
+                                                             totalRows));
         components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
+        components.put(MetadataType.HEADER, header.toComponent());
         return components;
     }
+
+    private static List<ByteBuffer> makeList(ByteBuffer[] values)
+    {
+        // In most case, l will be the same size than values, but it's possible for it to be smaller
+        List<ByteBuffer> l = new ArrayList<ByteBuffer>(values.length);
+        for (int i = 0; i < values.length; i++)
+            if (values[i] == null)
+                break;
+            else
+                l.add(values[i]);
+        return l;
+    }
+
+    public static class MinMaxLongTracker
+    {
+        private final long defaultMin;
+        private final long defaultMax;
+
+        private boolean isSet = false;
+        private long min;
+        private long max;
+
+        public MinMaxLongTracker()
+        {
+            this(Long.MIN_VALUE, Long.MAX_VALUE);
+        }
+
+        public MinMaxLongTracker(long defaultMin, long defaultMax)
+        {
+            this.defaultMin = defaultMin;
+            this.defaultMax = defaultMax;
+        }
+
+        public void update(long value)
+        {
+            if (!isSet)
+            {
+                min = max = value;
+                isSet = true;
+            }
+            else
+            {
+                if (value < min)
+                    min = value;
+                if (value > max)
+                    max = value;
+            }
+        }
+
+        public long min()
+        {
+            return isSet ? min : defaultMin;
+        }
+
+        public long max()
+        {
+            return isSet ? max : defaultMax;
+        }
+    }
+
+    public static class MinMaxIntTracker
+    {
+        private final int defaultMin;
+        private final int defaultMax;
+
+        private boolean isSet = false;
+        private int min;
+        private int max;
+
+        public MinMaxIntTracker()
+        {
+            this(Integer.MIN_VALUE, Integer.MAX_VALUE);
+        }
+
+        public MinMaxIntTracker(int defaultMin, int defaultMax)
+        {
+            this.defaultMin = defaultMin;
+            this.defaultMax = defaultMax;
+        }
+
+        public void update(int value)
+        {
+            if (!isSet)
+            {
+                min = max = value;
+                isSet = true;
+            }
+            else
+            {
+                if (value < min)
+                    min = value;
+                if (value > max)
+                    max = value;
+            }
+        }
+
+        public int min()
+        {
+            return isSet ? min : defaultMin;
+        }
+
+        public int max()
+        {
+            return isSet ? max : defaultMax;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 8a65d8d..fcdf57a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -75,7 +75,7 @@ public class MetadataSerializer implements IMetadataSerializer
         }
     }
 
-    public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
+    public Map<MetadataType, MetadataComponent> deserialize( Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
     {
         Map<MetadataType, MetadataComponent> components;
         logger.debug("Load metadata for {}", descriptor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
index 9717da1..875cec4 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.io.sstable.metadata;
 
+import org.apache.cassandra.db.SerializationHeader;
+
 /**
  * Defines Metadata component type.
  */
@@ -27,7 +29,9 @@ public enum MetadataType
     /** Metadata only used at compaction */
     COMPACTION(CompactionMetadata.serializer),
     /** Metadata always keep in memory */
-    STATS(StatsMetadata.serializer);
+    STATS(StatsMetadata.serializer),
+    /** Serialization header */
+    HEADER((IMetadataComponentSerializer)SerializationHeader.serializer);
 
     public final IMetadataComponentSerializer<MetadataComponent> serializer;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index f2eb1af..809d6b3 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -46,42 +46,57 @@ public class StatsMetadata extends MetadataComponent
     public final ReplayPosition replayPosition;
     public final long minTimestamp;
     public final long maxTimestamp;
+    public final int minLocalDeletionTime;
     public final int maxLocalDeletionTime;
+    public final int minTTL;
+    public final int maxTTL;
     public final double compressionRatio;
     public final StreamingHistogram estimatedTombstoneDropTime;
     public final int sstableLevel;
-    public final List<ByteBuffer> maxColumnNames;
-    public final List<ByteBuffer> minColumnNames;
+    public final List<ByteBuffer> minClusteringValues;
+    public final List<ByteBuffer> maxClusteringValues;
     public final boolean hasLegacyCounterShards;
     public final long repairedAt;
+    public final long totalColumnsSet;
+    public final long totalRows;
 
     public StatsMetadata(EstimatedHistogram estimatedRowSize,
                          EstimatedHistogram estimatedColumnCount,
                          ReplayPosition replayPosition,
                          long minTimestamp,
                          long maxTimestamp,
+                         int minLocalDeletionTime,
                          int maxLocalDeletionTime,
+                         int minTTL,
+                         int maxTTL,
                          double compressionRatio,
                          StreamingHistogram estimatedTombstoneDropTime,
                          int sstableLevel,
-                         List<ByteBuffer> minColumnNames,
-                         List<ByteBuffer> maxColumnNames,
+                         List<ByteBuffer> minClusteringValues,
+                         List<ByteBuffer> maxClusteringValues,
                          boolean hasLegacyCounterShards,
-                         long repairedAt)
+                         long repairedAt,
+                         long totalColumnsSet,
+                         long totalRows)
     {
         this.estimatedRowSize = estimatedRowSize;
         this.estimatedColumnCount = estimatedColumnCount;
         this.replayPosition = replayPosition;
         this.minTimestamp = minTimestamp;
         this.maxTimestamp = maxTimestamp;
+        this.minLocalDeletionTime = minLocalDeletionTime;
         this.maxLocalDeletionTime = maxLocalDeletionTime;
+        this.minTTL = minTTL;
+        this.maxTTL = maxTTL;
         this.compressionRatio = compressionRatio;
         this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
         this.sstableLevel = sstableLevel;
-        this.minColumnNames = minColumnNames;
-        this.maxColumnNames = maxColumnNames;
+        this.minClusteringValues = minClusteringValues;
+        this.maxClusteringValues = maxClusteringValues;
         this.hasLegacyCounterShards = hasLegacyCounterShards;
         this.repairedAt = repairedAt;
+        this.totalColumnsSet = totalColumnsSet;
+        this.totalRows = totalRows;
     }
 
     public MetadataType getType()
@@ -120,14 +135,19 @@ public class StatsMetadata extends MetadataComponent
                                  replayPosition,
                                  minTimestamp,
                                  maxTimestamp,
+                                 minLocalDeletionTime,
                                  maxLocalDeletionTime,
+                                 minTTL,
+                                 maxTTL,
                                  compressionRatio,
                                  estimatedTombstoneDropTime,
                                  newLevel,
-                                 minColumnNames,
-                                 maxColumnNames,
+                                 minClusteringValues,
+                                 maxClusteringValues,
                                  hasLegacyCounterShards,
-                                 repairedAt);
+                                 repairedAt,
+                                 totalColumnsSet,
+                                 totalRows);
     }
 
     public StatsMetadata mutateRepairedAt(long newRepairedAt)
@@ -137,14 +157,19 @@ public class StatsMetadata extends MetadataComponent
                                  replayPosition,
                                  minTimestamp,
                                  maxTimestamp,
+                                 minLocalDeletionTime,
                                  maxLocalDeletionTime,
+                                 minTTL,
+                                 maxTTL,
                                  compressionRatio,
                                  estimatedTombstoneDropTime,
                                  sstableLevel,
-                                 minColumnNames,
-                                 maxColumnNames,
+                                 minClusteringValues,
+                                 maxClusteringValues,
                                  hasLegacyCounterShards,
-                                 newRepairedAt);
+                                 newRepairedAt,
+                                 totalColumnsSet,
+                                 totalRows);
     }
 
     @Override
@@ -160,14 +185,19 @@ public class StatsMetadata extends MetadataComponent
                        .append(replayPosition, that.replayPosition)
                        .append(minTimestamp, that.minTimestamp)
                        .append(maxTimestamp, that.maxTimestamp)
+                       .append(minLocalDeletionTime, that.minLocalDeletionTime)
                        .append(maxLocalDeletionTime, that.maxLocalDeletionTime)
+                       .append(minTTL, that.minTTL)
+                       .append(maxTTL, that.maxTTL)
                        .append(compressionRatio, that.compressionRatio)
                        .append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime)
                        .append(sstableLevel, that.sstableLevel)
                        .append(repairedAt, that.repairedAt)
-                       .append(maxColumnNames, that.maxColumnNames)
-                       .append(minColumnNames, that.minColumnNames)
+                       .append(maxClusteringValues, that.maxClusteringValues)
+                       .append(minClusteringValues, that.minClusteringValues)
                        .append(hasLegacyCounterShards, that.hasLegacyCounterShards)
+                       .append(totalColumnsSet, that.totalColumnsSet)
+                       .append(totalRows, that.totalRows)
                        .build();
     }
 
@@ -180,14 +210,19 @@ public class StatsMetadata extends MetadataComponent
                        .append(replayPosition)
                        .append(minTimestamp)
                        .append(maxTimestamp)
+                       .append(minLocalDeletionTime)
                        .append(maxLocalDeletionTime)
+                       .append(minTTL)
+                       .append(maxTTL)
                        .append(compressionRatio)
                        .append(estimatedTombstoneDropTime)
                        .append(sstableLevel)
                        .append(repairedAt)
-                       .append(maxColumnNames)
-                       .append(minColumnNames)
+                       .append(maxClusteringValues)
+                       .append(minClusteringValues)
                        .append(hasLegacyCounterShards)
+                       .append(totalColumnsSet)
+                       .append(totalRows)
                        .build();
     }
 
@@ -199,18 +234,19 @@ public class StatsMetadata extends MetadataComponent
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
             size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE);
-            size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
+            size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
             size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
             size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
             // min column names
             size += 4;
-            for (ByteBuffer columnName : component.minColumnNames)
-                size += 2 + columnName.remaining(); // with short length
+            for (ByteBuffer value : component.minClusteringValues)
+                size += 2 + value.remaining(); // with short length
             // max column names
             size += 4;
-            for (ByteBuffer columnName : component.maxColumnNames)
-                size += 2 + columnName.remaining(); // with short length
+            for (ByteBuffer value : component.maxClusteringValues)
+                size += 2 + value.remaining(); // with short length
             size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards);
+            size += 8 + 8; // totalColumnsSet, totalRows
             return size;
         }
 
@@ -221,18 +257,24 @@ public class StatsMetadata extends MetadataComponent
             ReplayPosition.serializer.serialize(component.replayPosition, out);
             out.writeLong(component.minTimestamp);
             out.writeLong(component.maxTimestamp);
+            out.writeInt(component.minLocalDeletionTime);
             out.writeInt(component.maxLocalDeletionTime);
+            out.writeInt(component.minTTL);
+            out.writeInt(component.maxTTL);
             out.writeDouble(component.compressionRatio);
             StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
             out.writeInt(component.sstableLevel);
             out.writeLong(component.repairedAt);
-            out.writeInt(component.minColumnNames.size());
-            for (ByteBuffer columnName : component.minColumnNames)
-                ByteBufferUtil.writeWithShortLength(columnName, out);
-            out.writeInt(component.maxColumnNames.size());
-            for (ByteBuffer columnName : component.maxColumnNames)
-                ByteBufferUtil.writeWithShortLength(columnName, out);
+            out.writeInt(component.minClusteringValues.size());
+            for (ByteBuffer value : component.minClusteringValues)
+                ByteBufferUtil.writeWithShortLength(value, out);
+            out.writeInt(component.maxClusteringValues.size());
+            for (ByteBuffer value : component.maxClusteringValues)
+                ByteBufferUtil.writeWithShortLength(value, out);
             out.writeBoolean(component.hasLegacyCounterShards);
+
+            out.writeLong(component.totalColumnsSet);
+            out.writeLong(component.totalRows);
         }
 
         public StatsMetadata deserialize(Version version, DataInput in) throws IOException
@@ -242,7 +284,11 @@ public class StatsMetadata extends MetadataComponent
             ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
             long minTimestamp = in.readLong();
             long maxTimestamp = in.readLong();
+            // We use MAX_VALUE as that's the default value for "no deletion time"
+            int minLocalDeletionTime = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
             int maxLocalDeletionTime = in.readInt();
+            int minTTL = version.storeRows() ? in.readInt() : 0;
+            int maxTTL = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
             double compressionRatio = in.readDouble();
             StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
             int sstableLevel = in.readInt();
@@ -251,32 +297,40 @@ public class StatsMetadata extends MetadataComponent
                 repairedAt = in.readLong();
 
             int colCount = in.readInt();
-            List<ByteBuffer> minColumnNames = new ArrayList<>(colCount);
+            List<ByteBuffer> minClusteringValues = new ArrayList<>(colCount);
             for (int i = 0; i < colCount; i++)
-                minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                minClusteringValues.add(ByteBufferUtil.readWithShortLength(in));
 
             colCount = in.readInt();
-            List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount);
+            List<ByteBuffer> maxClusteringValues = new ArrayList<>(colCount);
             for (int i = 0; i < colCount; i++)
-                maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                maxClusteringValues.add(ByteBufferUtil.readWithShortLength(in));
 
             boolean hasLegacyCounterShards = true;
             if (version.tracksLegacyCounterShards())
                 hasLegacyCounterShards = in.readBoolean();
 
+            long totalColumnsSet = version.storeRows() ? in.readLong() : -1L;
+            long totalRows = version.storeRows() ? in.readLong() : -1L;
+
             return new StatsMetadata(rowSizes,
                                      columnCounts,
                                      replayPosition,
                                      minTimestamp,
                                      maxTimestamp,
+                                     minLocalDeletionTime,
                                      maxLocalDeletionTime,
+                                     minTTL,
+                                     maxTTL,
                                      compressionRatio,
                                      tombstoneHistogram,
                                      sstableLevel,
-                                     minColumnNames,
-                                     maxColumnNames,
+                                     minClusteringValues,
+                                     maxClusteringValues,
                                      hasLegacyCounterShards,
-                                     repairedAt);
+                                     repairedAt,
+                                     totalColumnsSet,
+                                     totalRows);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 4362cee..c3a7f98 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -112,10 +112,10 @@ public class DataIntegrityMetadata
             }
             catch (Exception e)
             {
+                close();
                 // Attempting to create a FileDigestValidator without a DIGEST file will fail
                 throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA));
             }
-
         }
 
         // Validate the entire file
@@ -133,7 +133,14 @@ public class DataIntegrityMetadata
 
         public void close()
         {
-            this.digestReader.close();
+            try
+            {
+                this.digestReader.close();
+            }
+            finally
+            {
+                this.dataReader.close();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 35e1419..c182e58 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -227,6 +227,19 @@ public class FileUtils
         }
     }
 
+    public static void closeQuietly(AutoCloseable c)
+    {
+        try
+        {
+            if (c != null)
+                c.close();
+        }
+        catch (Exception e)
+        {
+            logger.warn("Failed closing {}", c, e);
+        }
+    }
+
     public static void close(Closeable... cs) throws IOException
     {
         close(Arrays.asList(cs));
@@ -252,6 +265,22 @@ public class FileUtils
             throw e;
     }
 
+    public static void closeQuietly(Iterable<? extends AutoCloseable> cs)
+    {
+        for (AutoCloseable c : cs)
+        {
+            try
+            {
+                if (c != null)
+                    c.close();
+            }
+            catch (Exception ex)
+            {
+                logger.warn("Failed closing {}", c, ex);
+            }
+        }
+    }
+
     public static String getCanonicalPath(String filename)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 83bc337..3f2160f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -81,7 +81,8 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_20 = 7;
     public static final int VERSION_21 = 8;
     public static final int VERSION_22 = 9;
-    public static final int current_version = VERSION_22;
+    public static final int VERSION_30 = 10;
+    public static final int current_version = VERSION_30;
 
     public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
     public static final byte[] ONE_BYTE = new byte[1];
@@ -104,7 +105,7 @@ public final class MessagingService implements MessagingServiceMBean
         @Deprecated STREAM_INITIATE_DONE,
         @Deprecated STREAM_REPLY,
         @Deprecated STREAM_REQUEST,
-        RANGE_SLICE,
+        @Deprecated RANGE_SLICE,
         @Deprecated BOOTSTRAP_TOKEN,
         @Deprecated TREE_REQUEST,
         @Deprecated TREE_RESPONSE,
@@ -132,7 +133,7 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PREPARE,
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
-        PAGED_RANGE,
+        @Deprecated PAGED_RANGE,
         // remember to add new verbs at the end, since we serialize by ordinal
         UNUSED_1,
         UNUSED_2,
@@ -204,8 +205,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
-        put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
-        put(Verb.PAGED_RANGE, PagedRangeCommand.serializer);
+        //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
+        //put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
         put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
@@ -230,8 +231,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.MUTATION, WriteResponse.serializer);
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
-        put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
-        put(Verb.PAGED_RANGE, RangeSliceReply.serializer);
+        put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer);
+        put(Verb.PAGED_RANGE, ReadResponse.legacyRangeSliceReplySerializer);
         put(Verb.READ, ReadResponse.serializer);
         put(Verb.TRUNCATE, TruncateResponse.serializer);
         put(Verb.SNAPSHOT, null);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 754e26f..ac20428 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -180,7 +180,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
         logger.info("[repair #{}] {}", desc.sessionId, message);
         Tracing.traceRepair(message);
-        int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+        int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
         List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
         for (InetAddress endpoint : endpoints)
         {
@@ -197,7 +197,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
      */
     private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddress> endpoints)
     {
-        int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+        int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
         List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
 
         Queue<InetAddress> requests = new LinkedList<>(endpoints);
@@ -236,7 +236,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
      */
     private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddress> endpoints)
     {
-        int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+        int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(FBUtilities.nowInSeconds());
         List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
 
         Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>();