You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/12 13:23:18 UTC

[13/20] cassandra git commit: Merge commit '849a438690aa97a361227781108cc90355dcbcd9' into cassandra-3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index b62c7e3,fba1627..17062b4
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -33,16 -33,14 +33,16 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.utils.Interval;
  
  import static com.google.common.base.Predicates.equalTo;
++import static com.google.common.base.Predicates.in;
  import static com.google.common.base.Predicates.not;
  import static com.google.common.collect.ImmutableList.copyOf;
  import static com.google.common.collect.ImmutableList.of;
  import static com.google.common.collect.Iterables.all;
  import static com.google.common.collect.Iterables.concat;
  import static com.google.common.collect.Iterables.filter;
 -import static java.util.Collections.singleton;
 +import static com.google.common.collect.Iterables.transform;
- import static java.util.Collections.singleton;
  import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
 +import static org.apache.cassandra.db.lifecycle.Helpers.filterOut;
  import static org.apache.cassandra.db.lifecycle.Helpers.replace;
  
  /**
@@@ -68,18 -66,18 +68,19 @@@ public class Vie
       * flushed. In chronologically ascending order.
       */
      public final List<Memtable> flushingMemtables;
 -    public final Set<SSTableReader> compacting;
 -    public final Set<SSTableReader> sstables;
 -    public final Set<SSTableReader> premature;
 +    final Set<SSTableReader> compacting;
 +    final Set<SSTableReader> sstables;
++    final Set<SSTableReader> premature;
      // we use a Map here so that we can easily perform identity checks as well as equality checks.
      // When marking compacting, we now  indicate if we expect the sstables to be present (by default we do),
      // and we then check that not only are they all present in the live set, but that the exact instance present is
      // the one we made our decision to compact against.
 -    public final Map<SSTableReader, SSTableReader> sstablesMap;
 +    final Map<SSTableReader, SSTableReader> sstablesMap;
 +    final Map<SSTableReader, SSTableReader> compactingMap;
  
 -    public final SSTableIntervalTree intervalTree;
 +    final SSTableIntervalTree intervalTree;
  
-     View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, SSTableIntervalTree intervalTree)
 -    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
++    View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
      {
          assert liveMemtables != null;
          assert flushingMemtables != null;
@@@ -92,8 -90,8 +93,9 @@@
  
          this.sstablesMap = sstables;
          this.sstables = sstablesMap.keySet();
 -        this.compacting = compacting;
 +        this.compactingMap = compacting;
 +        this.compacting = compactingMap.keySet();
+         this.premature = premature;
          this.intervalTree = intervalTree;
      }
  
@@@ -238,8 -158,8 +240,8 @@@
              {
                  assert all(mark, Helpers.idIn(view.sstablesMap));
                  return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
 -                                replace(view.compacting, unmark, mark),
 +                                replace(view.compactingMap, unmark, mark),
-                                 view.intervalTree);
+                                 view.premature, view.intervalTree);
              }
          };
      }
@@@ -270,7 -190,7 +272,7 @@@
              public View apply(View view)
              {
                  Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
-                 return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap,
 -                return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting, view.premature,
++                return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap, view.premature,
                                  SSTableIntervalTree.build(sstableMap.keySet()));
              }
          };
@@@ -285,7 -205,7 +287,7 @@@
              {
                  List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
                  assert newLive.size() == view.liveMemtables.size() + 1;
-                 return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree);
 -                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.premature, view.intervalTree);
++                return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
              }
          };
      }
@@@ -304,7 -224,7 +306,7 @@@
                                                             filter(flushing, not(lessThan(toFlush)))));
                  assert newLive.size() == live.size() - 1;
                  assert newFlushing.size() == flushing.size() + 1;
-                 return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.intervalTree);
 -                return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.premature, view.intervalTree);
++                return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
              }
          };
      }
@@@ -319,17 -239,35 +321,34 @@@
                  List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
                  assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
  
--                if (flushed == null)
++                if (flushed == null || flushed.isEmpty())
                      return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
-                                     view.compactingMap, view.intervalTree);
 -                                    view.compacting, view.premature, view.intervalTree);
++                                    view.compactingMap, view.premature, view.intervalTree);
  
 -                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
 -                Set<SSTableReader> compacting = replace(view.compacting, emptySet(), singleton(flushed));
 -                Set<SSTableReader> premature = replace(view.premature, emptySet(), singleton(flushed));
 -                return new View(view.liveMemtables, flushingMemtables, sstableMap, compacting, premature,
 +                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
-                 return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
++                Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
++                Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
++                return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
                                  SSTableIntervalTree.build(sstableMap.keySet()));
              }
          };
      }
  
 -    static Function<View, View> permitCompactionOfFlushed(final SSTableReader reader)
++    static Function<View, View> permitCompactionOfFlushed(final Collection<SSTableReader> readers)
+     {
++        Set<SSTableReader> expectAndRemove = ImmutableSet.copyOf(readers);
+         return new Function<View, View>()
+         {
 -
 -            @Nullable
+             public View apply(View view)
+             {
 -                Set<SSTableReader> premature = ImmutableSet.copyOf(filter(view.premature, not(equalTo(reader))));
 -                Set<SSTableReader> compacting = ImmutableSet.copyOf(filter(view.compacting, not(equalTo(reader))));
 -                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compacting, premature, view.intervalTree);
++                Set<SSTableReader> premature = replace(view.premature, expectAndRemove, emptySet());
++                Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, expectAndRemove, emptySet());
++                return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compactingMap, premature, view.intervalTree);
+             }
+         };
+     }
+ 
+ 
      private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
      {
          return new Predicate<T>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/TableViews.java
index 893bdd5,0000000..7feb67c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@@ -1,481 -1,0 +1,481 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.view;
 +
 +import java.util.*;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.PeekingIterator;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.btree.BTreeSet;
 +
 +
 +/**
 + * Groups all the views for a given table.
 + */
 +public class TableViews extends AbstractCollection<View>
 +{
 +    private final CFMetaData baseTableMetadata;
 +
 +    // We need this to be thread-safe, but the number of times this is changed (when a view is created in the keyspace)
 +    // massively exceeds the number of time it's read (for every mutation on the keyspace), so a copy-on-write list is the best option.
 +    private final List<View> views = new CopyOnWriteArrayList();
 +
 +    public TableViews(CFMetaData baseTableMetadata)
 +    {
 +        this.baseTableMetadata = baseTableMetadata;
 +    }
 +
 +    public int size()
 +    {
 +        return views.size();
 +    }
 +
 +    public Iterator<View> iterator()
 +    {
 +        return views.iterator();
 +    }
 +
 +    public boolean contains(String viewName)
 +    {
 +        return Iterables.any(views, view -> view.name.equals(viewName));
 +    }
 +
 +    public boolean add(View view)
 +    {
 +        // We should have validated that there is no existing view with this name at this point
 +        assert !contains(view.name);
 +        return views.add(view);
 +    }
 +
 +    public Iterable<ColumnFamilyStore> allViewsCfs()
 +    {
 +        Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName);
 +        return Iterables.transform(views, view -> keyspace.getColumnFamilyStore(view.getDefinition().viewName));
 +    }
 +
 +    public void forceBlockingFlush()
 +    {
 +        for (ColumnFamilyStore viewCfs : allViewsCfs())
 +            viewCfs.forceBlockingFlush();
 +    }
 +
 +    public void dumpMemtables()
 +    {
 +        for (ColumnFamilyStore viewCfs : allViewsCfs())
 +            viewCfs.dumpMemtable();
 +    }
 +
-     public void truncateBlocking(long truncatedAt)
++    public void truncateBlocking(ReplayPosition replayAfter, long truncatedAt)
 +    {
 +        for (ColumnFamilyStore viewCfs : allViewsCfs())
 +        {
-             ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
++            viewCfs.discardSSTables(truncatedAt);
 +            SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
 +        }
 +    }
 +
 +    public void removeByName(String viewName)
 +    {
 +        views.removeIf(v -> v.name.equals(viewName));
 +    }
 +
 +    /**
 +     * Calculates and pushes updates to the views replicas. The replicas are determined by
 +     * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
 +     *
 +     * @param update an update on the base table represented by this object.
 +     * @param writeCommitLog whether we should write the commit log for the view updates.
 +     * @param baseComplete time from epoch in ms that the local base mutation was (or will be) completed
 +     */
 +    public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong baseComplete)
 +    {
 +        assert update.metadata().cfId.equals(baseTableMetadata.cfId);
 +
 +        Collection<View> views = updatedViews(update);
 +        if (views.isEmpty())
 +            return;
 +
 +        // Read modified rows
 +        int nowInSec = FBUtilities.nowInSeconds();
 +        SinglePartitionReadCommand command = readExistingRowsCommand(update, views, nowInSec);
 +        if (command == null)
 +            return;
 +
 +        ColumnFamilyStore cfs = Keyspace.openAndGetStore(update.metadata());
 +        long start = System.nanoTime();
 +        Collection<Mutation> mutations;
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup();
 +             UnfilteredRowIterator existings = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command);
 +             UnfilteredRowIterator updates = update.unfilteredIterator())
 +        {
 +            mutations = generateViewUpdates(views, updates, existings, nowInSec);
 +        }
 +        Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS);
 +
 +        if (!mutations.isEmpty())
 +            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete);
 +    }
 +
 +    /**
 +     * Given some updates on the base table of this object and the existing values for the rows affected by that update, generates the
 +     * mutation to be applied to the provided views.
 +     *
 +     * @param views the views potentially affected by {@code updates}.
 +     * @param updates the base table updates being applied.
 +     * @param existings the existing values for the rows affected by {@code updates}. This is used to decide if a view is
 +     * obsoleted by the update and should be removed, gather the values for columns that may not be part of the update if
 +     * a new view entry needs to be created, and compute the minimal updates to be applied if the view entry isn't changed
 +     * but has simply some updated values. This will be empty for view building as we want to assume anything we'll pass
 +     * to {@code updates} is new.
 +     * @param nowInSec the current time in seconds.
 +     * @return the mutations to apply to the {@code views}. This can be empty.
 +     */
 +    public Collection<Mutation> generateViewUpdates(Collection<View> views, UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec)
 +    {
 +        assert updates.metadata().cfId.equals(baseTableMetadata.cfId);
 +
 +        List<ViewUpdateGenerator> generators = new ArrayList<>(views.size());
 +        for (View view : views)
 +            generators.add(new ViewUpdateGenerator(view, updates.partitionKey(), nowInSec));
 +
 +        DeletionTracker existingsDeletion = new DeletionTracker(existings.partitionLevelDeletion());
 +        DeletionTracker updatesDeletion = new DeletionTracker(updates.partitionLevelDeletion());
 +
 +        /*
 +         * We iterate through the updates and the existing rows in parallel. This allows us to know the consequence
 +         * on the view of each update.
 +         */
 +        PeekingIterator<Unfiltered> existingsIter = Iterators.peekingIterator(existings);
 +        PeekingIterator<Unfiltered> updatesIter = Iterators.peekingIterator(updates);
 +
 +        while (existingsIter.hasNext() && updatesIter.hasNext())
 +        {
 +            Unfiltered existing = existingsIter.peek();
 +            Unfiltered update = updatesIter.peek();
 +
 +            Row existingRow;
 +            Row updateRow;
 +            int cmp = baseTableMetadata.comparator.compare(update, existing);
 +            if (cmp < 0)
 +            {
 +                // We have an update where there was nothing before
 +                if (update.isRangeTombstoneMarker())
 +                {
 +                    updatesDeletion.update(updatesIter.next());
 +                    continue;
 +                }
 +
 +                updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion());
 +                existingRow = emptyRow(updateRow.clustering(), existingsDeletion.currentDeletion());
 +            }
 +            else if (cmp > 0)
 +            {
 +                // We have something existing but no update (which will happen either because it's a range tombstone marker in
 +                // existing, or because we've fetched the existing row due to some partition/range deletion in the updates)
 +                if (existing.isRangeTombstoneMarker())
 +                {
 +                    existingsDeletion.update(existingsIter.next());
 +                    continue;
 +                }
 +
 +                existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion());
 +                updateRow = emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion());
 +
 +                // The way we build the read command used for existing rows, we should always have updatesDeletion.currentDeletion()
 +                // that is not live, since we wouldn't have read the existing row otherwise. And we could assert that, but if we ever
 +                // change the read method so that it can slightly over-read in some case, that would be an easily avoiding bug lurking,
 +                // so we just handle the case.
 +                if (updateRow == null)
 +                    continue;
 +            }
 +            else
 +            {
 +                // We're updating a row that had pre-existing data
 +                if (update.isRangeTombstoneMarker())
 +                {
 +                    assert existing.isRangeTombstoneMarker();
 +                    updatesDeletion.update(updatesIter.next());
 +                    existingsDeletion.update(existingsIter.next());
 +                    continue;
 +                }
 +
 +                assert !existing.isRangeTombstoneMarker();
 +                existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion());
 +                updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion());
 +            }
 +
 +            addToViewUpdateGenerators(existingRow, updateRow, generators, nowInSec);
 +        }
 +
 +        // We only care about more existing rows if the update deletion isn't live, i.e. if we had a partition deletion
 +        if (!updatesDeletion.currentDeletion().isLive())
 +        {
 +            while (existingsIter.hasNext())
 +            {
 +                Unfiltered existing = existingsIter.next();
 +                // If it's a range tombstone, we don't care, we're only looking for existing entry that gets deleted by
 +                // the new partition deletion
 +                if (existing.isRangeTombstoneMarker())
 +                    continue;
 +
 +                Row existingRow = (Row)existing;
 +                addToViewUpdateGenerators(existingRow, emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), generators, nowInSec);
 +            }
 +        }
 +        while (updatesIter.hasNext())
 +        {
 +            Unfiltered update = updatesIter.next();
 +            // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates
 +            if (update.isRangeTombstoneMarker())
 +                continue;
 +
 +            Row updateRow = (Row)update;
 +            addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), updateRow, generators, nowInSec);
 +        }
 +
 +        return buildMutations(baseTableMetadata, generators);
 +    }
 +
 +    /**
 +     * Return the views that are potentially updated by the provided updates.
 +     *
 +     * @param updates the updates applied to the base table.
 +     * @return the views affected by {@code updates}.
 +     */
 +    public Collection<View> updatedViews(PartitionUpdate updates)
 +    {
 +        List<View> matchingViews = new ArrayList<>(views.size());
 +
 +        for (View view : views)
 +        {
 +            ReadQuery selectQuery = view.getReadQuery();
 +            if (!selectQuery.selectsKey(updates.partitionKey()))
 +                continue;
 +
 +            matchingViews.add(view);
 +        }
 +        return matchingViews;
 +    }
 +
 +    /**
 +     * Returns the command to use to read the existing rows required to generate view updates for the provided base
 +     * base updates.
 +     *
 +     * @param updates the base table updates being applied.
 +     * @param views the views potentially affected by {@code updates}.
 +     * @param nowInSec the current time in seconds.
 +     * @return the command to use to read the base table rows required to generate view updates for {@code updates}.
 +     */
 +    private SinglePartitionReadCommand readExistingRowsCommand(PartitionUpdate updates, Collection<View> views, int nowInSec)
 +    {
 +        Slices.Builder sliceBuilder = null;
 +        DeletionInfo deletionInfo = updates.deletionInfo();
 +        CFMetaData metadata = updates.metadata();
 +        DecoratedKey key = updates.partitionKey();
 +        // TODO: This is subtle: we need to gather all the slices that we have to fetch between partition del, range tombstones and rows.
 +        if (!deletionInfo.isLive())
 +        {
 +            sliceBuilder = new Slices.Builder(metadata.comparator);
 +            // Everything covered by a deletion might invalidate an existing view entry, which means we must read it to know. In practice
 +            // though, the views involved might filter some base table clustering columns, in which case we can restrict what we read
 +            // using those restrictions.
 +            // If there is a partition deletion, then we can simply take each slices from each view select filter. They may overlap but
 +            // the Slices.Builder handles that for us. Note that in many case this will just involve reading everything (as soon as any
 +            // view involved has no clustering restrictions for instance).
 +            // For range tombstone, we should theoretically take the difference between the range tombstoned and the slices selected
 +            // by every views, but as we don't an easy way to compute that right now, we keep it simple and just use the tombstoned
 +            // range.
 +            // TODO: we should improve that latter part.
 +            if (!deletionInfo.getPartitionDeletion().isLive())
 +            {
 +                for (View view : views)
 +                    sliceBuilder.addAll(view.getSelectStatement().clusteringIndexFilterAsSlices());
 +            }
 +            else
 +            {
 +                assert deletionInfo.hasRanges();
 +                Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
 +                while (iter.hasNext())
 +                    sliceBuilder.add(iter.next().deletedSlice());
 +            }
 +        }
 +
 +        // We need to read every row that is updated, unless we can prove that it has no impact on any view entries.
 +
 +        // If we had some slices from the deletions above, we'll continue using that. Otherwise, it's more efficient to build
 +        // a names query.
 +        BTreeSet.Builder<Clustering> namesBuilder = sliceBuilder == null ? BTreeSet.builder(metadata.comparator) : null;
 +        for (Row row : updates)
 +        {
 +            // Don't read the existing state if we can prove the update won't affect any views
 +            if (!affectsAnyViews(key, row, views))
 +                continue;
 +
 +            if (namesBuilder == null)
 +                sliceBuilder.add(Slice.make(row.clustering()));
 +            else
 +                namesBuilder.add(row.clustering());
 +        }
 +
 +        NavigableSet<Clustering> names = namesBuilder == null ? null : namesBuilder.build();
 +        // If we have a slice builder, it means we had some deletions and we have to read. But if we had
 +        // only row updates, it's possible none of them affected the views, in which case we have nothing
 +        // to do.
 +        if (names != null && names.isEmpty())
 +            return null;
 +
 +        ClusteringIndexFilter clusteringFilter = names == null
 +                                               ? new ClusteringIndexSliceFilter(sliceBuilder.build(), false)
 +                                               : new ClusteringIndexNamesFilter(names, false);
 +        // If we have more than one view, we should merge the queried columns by each views but to keep it simple we just
 +        // include everything. We could change that in the future.
 +        ColumnFilter queriedColumns = views.size() == 1
 +                                    ? Iterables.getOnlyElement(views).getSelectStatement().queriedColumns()
 +                                    : ColumnFilter.all(metadata);
 +        // Note that the views could have restrictions on regular columns, but even if that's the case we shouldn't apply those
 +        // when we read, because even if an existing row doesn't match the view filter, the update can change that in which
 +        // case we'll need to know the existing content. There is also no easy way to merge those RowFilter when we have multiple views.
 +        // TODO: we could still make sense to special case for when there is a single view and a small number of updates (and
 +        // no deletions). Indeed, in that case we could check whether any of the update modify any of the restricted regular
 +        // column, and if that's not the case we could use view filter. We keep it simple for now though.
 +        RowFilter rowFilter = RowFilter.NONE;
 +        return SinglePartitionReadCommand.create(metadata, nowInSec, queriedColumns, rowFilter, DataLimits.NONE, key, clusteringFilter);
 +    }
 +
 +    private boolean affectsAnyViews(DecoratedKey partitionKey, Row update, Collection<View> views)
 +    {
 +        for (View view : views)
 +        {
 +            if (view.mayBeAffectedBy(partitionKey, update))
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Given an existing base row and the update that we're going to apply to this row, generate the modifications
 +     * to apply to MVs using the provided {@code ViewUpdateGenerator}s.
 +     *
 +     * @param existingBaseRow the base table row as it is before an update.
 +     * @param updateBaseRow the newly updates made to {@code existingBaseRow}.
 +     * @param generators the view update generators to add the new changes to.
 +     * @param nowInSec the current time in seconds. Used to decide if data is live or not.
 +     */
 +    private static void addToViewUpdateGenerators(Row existingBaseRow, Row updateBaseRow, Collection<ViewUpdateGenerator> generators, int nowInSec)
 +    {
 +        // Having existing empty is useful, it just means we'll insert a brand new entry for updateBaseRow,
 +        // but if we have no update at all, we shouldn't get there.
 +        assert !updateBaseRow.isEmpty();
 +
 +        // We allow existingBaseRow to be null, which we treat the same as being empty as an small optimization
 +        // to avoid allocating empty row objects when we know there was nothing existing.
 +        Row mergedBaseRow = existingBaseRow == null ? updateBaseRow : Rows.merge(existingBaseRow, updateBaseRow, nowInSec);
 +        for (ViewUpdateGenerator generator : generators)
 +            generator.addBaseTableUpdate(existingBaseRow, mergedBaseRow);
 +    }
 +
 +    private static Row emptyRow(Clustering clustering, DeletionTime deletion)
 +    {
 +        // Returning null for an empty row is slightly ugly, but the case where there is no pre-existing row is fairly common
 +        // (especially when building the view), so we want to avoid a dummy allocation of an empty row every time.
 +        // And MultiViewUpdateBuilder knows how to deal with that.
 +        return deletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(deletion));
 +    }
 +
 +    /**
 +     * Extracts (and potentially groups) the mutations generated by the provided view update generator.
 +     * Returns the mutation that needs to be done to the views given the base table updates
 +     * passed to {@link #addBaseTableUpdate}.
 +     *
 +     * @param baseTableMetadata the metadata for the base table being updated.
 +     * @param generators the generators from which to extract the view mutations from.
 +     * @return the mutations created by all the generators in {@code generators}.
 +     */
 +    private Collection<Mutation> buildMutations(CFMetaData baseTableMetadata, List<ViewUpdateGenerator> generators)
 +    {
 +        // One view is probably common enough and we can optimize a bit easily
 +        if (generators.size() == 1)
 +        {
 +            Collection<PartitionUpdate> updates = generators.get(0).generateViewUpdates();
 +            List<Mutation> mutations = new ArrayList<>(updates.size());
 +            for (PartitionUpdate update : updates)
 +                mutations.add(new Mutation(update));
 +            return mutations;
 +        }
 +
 +        Map<DecoratedKey, Mutation> mutations = new HashMap<>();
 +        for (ViewUpdateGenerator generator : generators)
 +        {
 +            for (PartitionUpdate update : generator.generateViewUpdates())
 +            {
 +                DecoratedKey key = update.partitionKey();
 +                Mutation mutation = mutations.get(key);
 +                if (mutation == null)
 +                {
 +                    mutation = new Mutation(baseTableMetadata.ksName, key);
 +                    mutations.put(key, mutation);
 +                }
 +                mutation.add(update);
 +            }
 +        }
 +        return mutations.values();
 +    }
 +
 +    /**
 +     * A simple helper that tracks for a given {@code UnfilteredRowIterator} what is the current deletion at any time of the
 +     * iteration. It will be the currently open range tombstone deletion if there is one and the partition deletion otherwise.
 +     */
 +    private static class DeletionTracker
 +    {
 +        private final DeletionTime partitionDeletion;
 +        private DeletionTime deletion;
 +
 +        public DeletionTracker(DeletionTime partitionDeletion)
 +        {
 +            this.partitionDeletion = partitionDeletion;
 +        }
 +
 +        public void update(Unfiltered marker)
 +        {
 +            assert marker instanceof RangeTombstoneMarker;
 +            RangeTombstoneMarker rtm = (RangeTombstoneMarker)marker;
 +            this.deletion = rtm.isOpen(false)
 +                          ? rtm.openDeletionTime(false)
 +                          : null;
 +        }
 +
 +        public DeletionTime currentDeletion()
 +        {
 +            return deletion == null ? partitionDeletion : deletion;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/Version.java
index 16829ab,41a83e1..d9e289c
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@@ -60,16 -52,8 +60,18 @@@ public abstract class Versio
  
      public abstract boolean hasNewFileName();
  
 +    public abstract boolean storeRows();
 +
 +    public abstract int correspondingMessagingVersion(); // Only use by storage that 'storeRows' so far
 +
 +    public abstract boolean hasOldBfHashOrder();
 +
 +    public abstract boolean hasCompactionAncestors();
 +
 +    public abstract boolean hasBoundaries();
 +
+     public abstract boolean hasCommitLogLowerBound();
+ 
      public String getVersion()
      {
          return version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 1f2a98f,9244bbb..e0fb3b1
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@@ -111,7 -125,7 +111,7 @@@ public class BigFormat implements SSTab
      // we always incremented the major version.
      static class BigVersion extends Version
      {
-         public static final String current_version = "ma";
 -        public static final String current_version = "lb";
++        public static final String current_version = "mb";
          public static final String earliest_supported_version = "jb";
  
          // jb (2.0.1): switch from crc32 to adler32 for compression checksums
@@@ -121,10 -135,7 +121,12 @@@
          //             switch uncompressed checksums to adler32
          //             tracks presense of legacy (local and remote) counter shards
          // la (2.2.0): new file name format
+         // lb (2.2.7): commit log lower bound included
 +        // ma (3.0.0): swap bf hash order
 +        //             store rows natively
++        // mb (3.0.6): commit log lower bound included
 +        //
 +        // NOTE: when adding a new version, please add that to LegacySSTableTest, too.
  
          private final boolean isLatestVersion;
          private final boolean hasSamplingLevel;
@@@ -134,53 -144,20 +136,56 @@@
          private final boolean hasRepairedAt;
          private final boolean tracksLegacyCounterShards;
          private final boolean newFileName;
 +        public final boolean storeRows;
 +        public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far
 +        public final boolean hasBoundaries;
 +        /**
 +         * CASSANDRA-8413: 3.0 bloom filter representation changed (two longs just swapped)
 +         * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution.
 +         */
 +        private final boolean hasOldBfHashOrder;
+         private final boolean hasCommitLogLowerBound;
  
 -        public BigVersion(String version)
 +        /**
 +         * CASSANDRA-7066: compaction ancerstors are no longer used and have been removed.
 +         */
 +        private final boolean hasCompactionAncestors;
 +
 +        BigVersion(String version)
          {
 -            super(instance,version);
 +            super(instance, version);
  
              isLatestVersion = version.compareTo(current_version) == 0;
              hasSamplingLevel = version.compareTo("ka") >= 0;
              newStatsFile = version.compareTo("ka") >= 0;
 -            hasAllAdlerChecksums = version.compareTo("ka") >= 0;
 +
 +            //For a while Adler32 was in use, now the CRC32 instrinsic is very good especially after Haswell
 +            //PureJavaCRC32 was always faster than Adler32. See CASSANDRA-8684
 +            ChecksumType checksumType = ChecksumType.CRC32;
 +            if (version.compareTo("ka") >= 0 && version.compareTo("ma") < 0)
 +                checksumType = ChecksumType.Adler32;
 +            this.uncompressedChecksumType = checksumType;
 +
 +            checksumType = ChecksumType.CRC32;
 +            if (version.compareTo("jb") >= 0 && version.compareTo("ma") < 0)
 +                checksumType = ChecksumType.Adler32;
 +            this.compressedChecksumType = checksumType;
 +
              hasRepairedAt = version.compareTo("ka") >= 0;
              tracksLegacyCounterShards = version.compareTo("ka") >= 0;
 +
              newFileName = version.compareTo("la") >= 0;
 -            hasCommitLogLowerBound = version.compareTo("lb") >= 0;
 +
 +            hasOldBfHashOrder = version.compareTo("ma") < 0;
 +            hasCompactionAncestors = version.compareTo("ma") < 0;
 +            storeRows = version.compareTo("ma") >= 0;
 +            correspondingMessagingVersion = storeRows
 +                                          ? MessagingService.VERSION_30
 +                                          : MessagingService.VERSION_21;
 +
 +            hasBoundaries = version.compareTo("ma") < 0;
++            hasCommitLogLowerBound = (version.compareTo("lb") >= 0 && version.compareTo("ma") < 0)
++                                     || version.compareTo("mb") >= 0;
          }
  
          @Override
@@@ -243,25 -202,12 +248,30 @@@
              return newFileName;
          }
  
+         public boolean hasCommitLogLowerBound()
+         {
+             return hasCommitLogLowerBound;
+         }
+ 
          @Override
 +        public boolean storeRows()
 +        {
 +            return storeRows;
 +        }
 +
 +        @Override
 +        public int correspondingMessagingVersion()
 +        {
 +            return correspondingMessagingVersion;
 +        }
 +
 +        @Override
 +        public boolean hasBoundaries()
 +        {
 +            return hasBoundaries;
 +        }
 +
 +        @Override
          public boolean isCompatible()
          {
              return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
index 4f4c04b,c8e6ee8..ef3453a
--- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@@ -72,14 -72,22 +72,23 @@@ public class CompactionMetadata extend
  
      public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>
      {
 -        public int serializedSize(CompactionMetadata component, Version version) throws IOException
 +        public int serializedSize(Version version, CompactionMetadata component) throws IOException
          {
 -            int size = 0;
 -            size += TypeSizes.NATIVE.sizeof(component.ancestors.size());
 -            for (int g : component.ancestors)
 -                size += TypeSizes.NATIVE.sizeof(g);
++            int sz = 0;
++            if (version.hasCompactionAncestors())
++            {   // write empty ancestor marker
++                sz = 4;
++            }
              byte[] serializedCardinality = component.cardinalityEstimator.getBytes();
-             return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length;
 -            size += TypeSizes.NATIVE.sizeof(serializedCardinality.length) + serializedCardinality.length;
 -            return size;
++            return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length + sz;
          }
  
 -        public void serialize(CompactionMetadata component, Version version, DataOutputPlus out) throws IOException
 +        public void serialize(Version version, CompactionMetadata component, DataOutputPlus out) throws IOException
          {
 -            out.writeInt(component.ancestors.size());
 -            for (int g : component.ancestors)
 -                out.writeInt(g);
++            if (version.hasCompactionAncestors())
++            {   // write empty ancestor marker
++                out.writeInt(0);
++            }
              ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out);
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 2ed845c,bfeb930..4561520
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@@ -53,24 -51,28 +53,26 @@@ public class LegacyMetadataSerializer e
  
          assert validation != null && stats != null && compaction != null && validation.partitioner != null;
  
 -        EstimatedHistogram.serializer.serialize(stats.estimatedRowSize, out);
 +        EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
          EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
-         ReplayPosition.serializer.serialize(stats.replayPosition, out);
+         ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out);
          out.writeLong(stats.minTimestamp);
          out.writeLong(stats.maxTimestamp);
          out.writeInt(stats.maxLocalDeletionTime);
          out.writeDouble(validation.bloomFilterFPChance);
          out.writeDouble(stats.compressionRatio);
          out.writeUTF(validation.partitioner);
 -        out.writeInt(compaction.ancestors.size());
 -        for (Integer g : compaction.ancestors)
 -            out.writeInt(g);
 +        out.writeInt(0); // compaction ancestors
          StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out);
          out.writeInt(stats.sstableLevel);
 -        out.writeInt(stats.minColumnNames.size());
 -        for (ByteBuffer columnName : stats.minColumnNames)
 -            ByteBufferUtil.writeWithShortLength(columnName, out);
 -        out.writeInt(stats.maxColumnNames.size());
 -        for (ByteBuffer columnName : stats.maxColumnNames)
 -            ByteBufferUtil.writeWithShortLength(columnName, out);
 +        out.writeInt(stats.minClusteringValues.size());
 +        for (ByteBuffer value : stats.minClusteringValues)
 +            ByteBufferUtil.writeWithShortLength(value, out);
 +        out.writeInt(stats.maxClusteringValues.size());
 +        for (ByteBuffer value : stats.maxClusteringValues)
 +            ByteBufferUtil.writeWithShortLength(value, out);
+         if (version.hasCommitLogLowerBound())
+             ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out);
      }
  
      /**
@@@ -88,11 -90,12 +90,12 @@@
          }
          else
          {
 -            try (DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile))))
 +            try (DataInputStreamPlus in = new DataInputStreamPlus(new BufferedInputStream(new FileInputStream(statsFile))))
              {
 -                EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
 +                EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
                  EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
-                 ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+                 ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
+                 ReplayPosition commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
                  long minTimestamp = in.readLong();
                  long maxTimestamp = in.readLong();
                  int maxLocalDeletionTime = in.readInt();
@@@ -115,21 -120,21 +118,25 @@@
                  List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount);
                  for (int i = 0; i < colCount; i++)
                      maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
 +
+                 if (descriptor.version.hasCommitLogLowerBound())
+                     commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+ 
                  if (types.contains(MetadataType.VALIDATION))
                      components.put(MetadataType.VALIDATION,
                                     new ValidationMetadata(partitioner, bloomFilterFPChance));
                  if (types.contains(MetadataType.STATS))
                      components.put(MetadataType.STATS,
 -                                   new StatsMetadata(rowSizes,
 +                                   new StatsMetadata(partitionSizes,
                                                       columnCounts,
-                                                      replayPosition,
+                                                      commitLogLowerBound,
+                                                      commitLogUpperBound,
                                                       minTimestamp,
                                                       maxTimestamp,
 +                                                     Integer.MAX_VALUE,
                                                       maxLocalDeletionTime,
 +                                                     0,
 +                                                     Integer.MAX_VALUE,
                                                       compressionRatio,
                                                       tombstoneHistogram,
                                                       sstableLevel,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index c2b0caf,579ff7a..53cf0b0
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@@ -26,16 -26,17 +26,16 @@@ import java.util.List
  import java.util.Map;
  import java.util.Set;
  
 -import com.google.common.collect.ImmutableList;
  import com.google.common.collect.Maps;
+ import com.google.common.collect.Ordering;
  
  import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
  import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.commitlog.ReplayPosition;
 -import org.apache.cassandra.db.composites.CellNameType;
 -import org.apache.cassandra.io.sstable.ColumnNameHelper;
 -import org.apache.cassandra.io.sstable.ColumnStats;
 -import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 +import org.apache.cassandra.db.rows.Cell;
- import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.SSTable;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.service.ActiveRepairService;
@@@ -67,9 -67,10 +67,10 @@@ public class MetadataCollector implemen
  
      public static StatsMetadata defaultStatsMetadata()
      {
 -        return new StatsMetadata(defaultRowSizeHistogram(),
 -                                 defaultColumnCountHistogram(),
 +        return new StatsMetadata(defaultPartitionSizeHistogram(),
 +                                 defaultCellPerPartitionCountHistogram(),
                                   ReplayPosition.NONE,
+                                  ReplayPosition.NONE,
                                   Long.MIN_VALUE,
                                   Long.MAX_VALUE,
                                   Integer.MAX_VALUE,
@@@ -82,26 -80,23 +83,27 @@@
                                   Collections.<ByteBuffer>emptyList(),
                                   Collections.<ByteBuffer>emptyList(),
                                   true,
 -                                 ActiveRepairService.UNREPAIRED_SSTABLE);
 +                                 ActiveRepairService.UNREPAIRED_SSTABLE,
 +                                 -1,
 +                                 -1);
      }
  
 -    protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
 -    protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
 +    protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
 +    // TODO: cound the number of row per partition (either with the number of cells, or instead)
 +    protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
-     protected ReplayPosition replayPosition = ReplayPosition.NONE;
+     protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
+     protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
 -    protected long minTimestamp = Long.MAX_VALUE;
 -    protected long maxTimestamp = Long.MIN_VALUE;
 -    protected int maxLocalDeletionTime = Integer.MIN_VALUE;
 +    protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
 +    protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
 +    protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
      protected double compressionRatio = NO_COMPRESSION_RATIO;
 -    protected Set<Integer> ancestors = new HashSet<>();
      protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
      protected int sstableLevel;
 -    protected List<ByteBuffer> minColumnNames = Collections.emptyList();
 -    protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +    protected ByteBuffer[] minClusteringValues;
 +    protected ByteBuffer[] maxClusteringValues;
      protected boolean hasLegacyCounterShards = false;
 +    protected long totalColumnsSet;
 +    protected long totalRows;
  
      /**
       * Default cardinality estimation method is to use HyperLogLog++.
@@@ -110,22 -105,52 +112,38 @@@
       * See CASSANDRA-5906 for detail.
       */
      protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
 -    private final CellNameType columnNameComparator;
 +    private final ClusteringComparator comparator;
  
 -    public MetadataCollector(CellNameType columnNameComparator)
 +    public MetadataCollector(ClusteringComparator comparator)
      {
 -        this.columnNameComparator = columnNameComparator;
 +        this.comparator = comparator;
 +
 +        this.minClusteringValues = new ByteBuffer[comparator.size()];
 +        this.maxClusteringValues = new ByteBuffer[comparator.size()];
      }
  
 -    public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level, boolean skipAncestors)
 +    public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level)
      {
 -        this(columnNameComparator);
 +        this(comparator);
  
-         replayPosition(ReplayPosition.getReplayPosition(sstables));
+         ReplayPosition min = null, max = null;
+         for (SSTableReader sstable : sstables)
+         {
+             if (min == null)
+             {
+                 min = sstable.getSSTableMetadata().commitLogLowerBound;
+                 max = sstable.getSSTableMetadata().commitLogUpperBound;
+             }
+             else
+             {
+                 min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound);
+                 max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound);
+             }
+         }
+ 
+         commitLogLowerBound(min);
+         commitLogUpperBound(max);
          sstableLevel(level);
 -        // Get the max timestamp of the precompacted sstables
 -        // and adds generation of live ancestors
 -        if (!skipAncestors)
 -        {
 -            for (SSTableReader sstable : sstables)
 -            {
 -                addAncestor(sstable.descriptor.generation);
 -                for (Integer i : sstable.getAncestors())
 -                    if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
 -                        addAncestor(i);
 -            }
 -        }
 -    }
 -
 -    public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level)
 -    {
 -        this(sstables, columnNameComparator, level, false);
      }
  
      public MetadataCollector addKey(ByteBuffer key)
@@@ -163,57 -188,45 +181,63 @@@
          return this;
      }
  
 -    public MetadataCollector updateMinTimestamp(long potentialMin)
 +    public void update(LivenessInfo newInfo)
      {
 -        minTimestamp = Math.min(minTimestamp, potentialMin);
 -        return this;
 +        if (newInfo.isEmpty())
 +            return;
 +
 +        updateTimestamp(newInfo.timestamp());
 +        updateTTL(newInfo.ttl());
 +        updateLocalDeletionTime(newInfo.localExpirationTime());
      }
  
 -    public MetadataCollector updateMaxTimestamp(long potentialMax)
 +    public void update(Cell cell)
      {
 -        maxTimestamp = Math.max(maxTimestamp, potentialMax);
 -        return this;
 +        updateTimestamp(cell.timestamp());
 +        updateTTL(cell.ttl());
 +        updateLocalDeletionTime(cell.localDeletionTime());
      }
  
 -    public MetadataCollector updateMaxLocalDeletionTime(int maxLocalDeletionTime)
 +    public void update(DeletionTime dt)
      {
 -        this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
 -        return this;
 +        if (!dt.isLive())
 +        {
 +            updateTimestamp(dt.markedForDeleteAt());
 +            updateLocalDeletionTime(dt.localDeletionTime());
 +        }
      }
  
 -    public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize)
 +    public void updateColumnSetPerRow(long columnSetInRow)
      {
 -        this.estimatedRowSize = estimatedRowSize;
 -        return this;
 +        totalColumnsSet += columnSetInRow;
 +        ++totalRows;
      }
  
 -    public MetadataCollector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
 +    private void updateTimestamp(long newTimestamp)
      {
 -        this.estimatedColumnCount = estimatedColumnCount;
 -        return this;
 +        timestampTracker.update(newTimestamp);
 +    }
 +
 +    private void updateLocalDeletionTime(int newLocalDeletionTime)
 +    {
 +        localDeletionTimeTracker.update(newLocalDeletionTime);
 +        estimatedTombstoneDropTime.update(newLocalDeletionTime);
 +    }
 +
 +    private void updateTTL(int newTTL)
 +    {
 +        ttlTracker.update(newTTL);
      }
  
-     public MetadataCollector replayPosition(ReplayPosition replayPosition)
+     public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound)
+     {
+         this.commitLogLowerBound = commitLogLowerBound;
+         return this;
+     }
+ 
+     public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound)
      {
-         this.replayPosition = replayPosition;
+         this.commitLogUpperBound = commitLogUpperBound;
          return this;
      }
  
@@@ -276,15 -280,13 +300,16 @@@
      {
          Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
          components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
 -        components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize,
 -                                                             estimatedColumnCount,
 +        components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize,
 +                                                             estimatedCellPerPartitionCount,
-                                                              replayPosition,
+                                                              commitLogLowerBound,
+                                                              commitLogUpperBound,
 -                                                             minTimestamp,
 -                                                             maxTimestamp,
 -                                                             maxLocalDeletionTime,
 +                                                             timestampTracker.min(),
 +                                                             timestampTracker.max(),
 +                                                             localDeletionTimeTracker.min(),
 +                                                             localDeletionTimeTracker.max(),
 +                                                             ttlTracker.min(),
 +                                                             ttlTracker.max(),
                                                               compressionRatio,
                                                               estimatedTombstoneDropTime,
                                                               sstableLevel,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 824bf8b,3d48e34..07e35bb
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@@ -40,53 -41,43 +40,56 @@@ public class StatsMetadata extends Meta
  {
      public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
  
 -    public final EstimatedHistogram estimatedRowSize;
 +    public final EstimatedHistogram estimatedPartitionSize;
      public final EstimatedHistogram estimatedColumnCount;
-     public final ReplayPosition replayPosition;
+     public final ReplayPosition commitLogLowerBound;
+     public final ReplayPosition commitLogUpperBound;
      public final long minTimestamp;
      public final long maxTimestamp;
 +    public final int minLocalDeletionTime;
      public final int maxLocalDeletionTime;
 +    public final int minTTL;
 +    public final int maxTTL;
      public final double compressionRatio;
      public final StreamingHistogram estimatedTombstoneDropTime;
      public final int sstableLevel;
 -    public final List<ByteBuffer> maxColumnNames;
 -    public final List<ByteBuffer> minColumnNames;
 +    public final List<ByteBuffer> minClusteringValues;
 +    public final List<ByteBuffer> maxClusteringValues;
      public final boolean hasLegacyCounterShards;
      public final long repairedAt;
 +    public final long totalColumnsSet;
 +    public final long totalRows;
  
 -    public StatsMetadata(EstimatedHistogram estimatedRowSize,
 +    public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
                           EstimatedHistogram estimatedColumnCount,
-                          ReplayPosition replayPosition,
+                          ReplayPosition commitLogLowerBound,
+                          ReplayPosition commitLogUpperBound,
                           long minTimestamp,
                           long maxTimestamp,
 +                         int minLocalDeletionTime,
                           int maxLocalDeletionTime,
 +                         int minTTL,
 +                         int maxTTL,
                           double compressionRatio,
                           StreamingHistogram estimatedTombstoneDropTime,
                           int sstableLevel,
 -                         List<ByteBuffer> minColumnNames,
 -                         List<ByteBuffer> maxColumnNames,
 +                         List<ByteBuffer> minClusteringValues,
 +                         List<ByteBuffer> maxClusteringValues,
                           boolean hasLegacyCounterShards,
 -                         long repairedAt)
 +                         long repairedAt,
 +                         long totalColumnsSet,
 +                         long totalRows)
      {
 -        this.estimatedRowSize = estimatedRowSize;
 +        this.estimatedPartitionSize = estimatedPartitionSize;
          this.estimatedColumnCount = estimatedColumnCount;
-         this.replayPosition = replayPosition;
+         this.commitLogLowerBound = commitLogLowerBound;
+         this.commitLogUpperBound = commitLogUpperBound;
          this.minTimestamp = minTimestamp;
          this.maxTimestamp = maxTimestamp;
 +        this.minLocalDeletionTime = minLocalDeletionTime;
          this.maxLocalDeletionTime = maxLocalDeletionTime;
 +        this.minTTL = minTTL;
 +        this.maxTTL = maxTTL;
          this.compressionRatio = compressionRatio;
          this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
          this.sstableLevel = sstableLevel;
@@@ -129,15 -118,13 +132,16 @@@
  
      public StatsMetadata mutateLevel(int newLevel)
      {
 -        return new StatsMetadata(estimatedRowSize,
 +        return new StatsMetadata(estimatedPartitionSize,
                                   estimatedColumnCount,
-                                  replayPosition,
+                                  commitLogLowerBound,
+                                  commitLogUpperBound,
                                   minTimestamp,
                                   maxTimestamp,
 +                                 minLocalDeletionTime,
                                   maxLocalDeletionTime,
 +                                 minTTL,
 +                                 maxTTL,
                                   compressionRatio,
                                   estimatedTombstoneDropTime,
                                   newLevel,
@@@ -151,15 -136,13 +155,16 @@@
  
      public StatsMetadata mutateRepairedAt(long newRepairedAt)
      {
 -        return new StatsMetadata(estimatedRowSize,
 +        return new StatsMetadata(estimatedPartitionSize,
                                   estimatedColumnCount,
-                                  replayPosition,
+                                  commitLogLowerBound,
+                                  commitLogUpperBound,
                                   minTimestamp,
                                   maxTimestamp,
 +                                 minLocalDeletionTime,
                                   maxLocalDeletionTime,
 +                                 minTTL,
 +                                 maxTTL,
                                   compressionRatio,
                                   estimatedTombstoneDropTime,
                                   sstableLevel,
@@@ -179,15 -160,13 +184,16 @@@
  
          StatsMetadata that = (StatsMetadata) o;
          return new EqualsBuilder()
 -                       .append(estimatedRowSize, that.estimatedRowSize)
 +                       .append(estimatedPartitionSize, that.estimatedPartitionSize)
                         .append(estimatedColumnCount, that.estimatedColumnCount)
-                        .append(replayPosition, that.replayPosition)
+                        .append(commitLogLowerBound, that.commitLogLowerBound)
+                        .append(commitLogUpperBound, that.commitLogUpperBound)
                         .append(minTimestamp, that.minTimestamp)
                         .append(maxTimestamp, that.maxTimestamp)
 +                       .append(minLocalDeletionTime, that.minLocalDeletionTime)
                         .append(maxLocalDeletionTime, that.maxLocalDeletionTime)
 +                       .append(minTTL, that.minTTL)
 +                       .append(maxTTL, that.maxTTL)
                         .append(compressionRatio, that.compressionRatio)
                         .append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime)
                         .append(sstableLevel, that.sstableLevel)
@@@ -204,15 -181,13 +210,16 @@@
      public int hashCode()
      {
          return new HashCodeBuilder()
 -                       .append(estimatedRowSize)
 +                       .append(estimatedPartitionSize)
                         .append(estimatedColumnCount)
-                        .append(replayPosition)
+                        .append(commitLogLowerBound)
+                        .append(commitLogUpperBound)
                         .append(minTimestamp)
                         .append(maxTimestamp)
 +                       .append(minLocalDeletionTime)
                         .append(maxLocalDeletionTime)
 +                       .append(minTTL)
 +                       .append(maxTTL)
                         .append(compressionRatio)
                         .append(estimatedTombstoneDropTime)
                         .append(sstableLevel)
@@@ -227,78 -200,61 +234,84 @@@
  
      public static class StatsMetadataSerializer implements IMetadataComponentSerializer<StatsMetadata>
      {
 -        public int serializedSize(StatsMetadata component, Version version) throws IOException
 +        public int serializedSize(Version version, StatsMetadata component) throws IOException
          {
              int size = 0;
 -            size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
 -            size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
 -            size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound, TypeSizes.NATIVE);
 -            size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
 -            size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
 -            size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
 +            size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
 +            size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
-             size += ReplayPosition.serializer.serializedSize(component.replayPosition);
++            size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound);
 +            if (version.storeRows())
 +                size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
 +            else
 +                size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
 +            size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime);
 +            size += TypeSizes.sizeof(component.sstableLevel);
              // min column names
              size += 4;
 -            for (ByteBuffer columnName : component.minColumnNames)
 -                size += 2 + columnName.remaining(); // with short length
 +            for (ByteBuffer value : component.minClusteringValues)
 +                size += 2 + value.remaining(); // with short length
              // max column names
              size += 4;
 -            for (ByteBuffer columnName : component.maxColumnNames)
 -                size += 2 + columnName.remaining(); // with short length
 -            size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards);
 +            for (ByteBuffer value : component.maxClusteringValues)
 +                size += 2 + value.remaining(); // with short length
 +            size += TypeSizes.sizeof(component.hasLegacyCounterShards);
 +            if (version.storeRows())
 +                size += 8 + 8; // totalColumnsSet, totalRows
+             if (version.hasCommitLogLowerBound())
 -                size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound, TypeSizes.NATIVE);
++                size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound);
              return size;
          }
  
 -        public void serialize(StatsMetadata component, Version version, DataOutputPlus out) throws IOException
 +        public void serialize(Version version, StatsMetadata component, DataOutputPlus out) throws IOException
          {
 -            EstimatedHistogram.serializer.serialize(component.estimatedRowSize, out);
 +            EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
              EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
-             ReplayPosition.serializer.serialize(component.replayPosition, out);
+             ReplayPosition.serializer.serialize(component.commitLogUpperBound, out);
              out.writeLong(component.minTimestamp);
              out.writeLong(component.maxTimestamp);
 +            if (version.storeRows())
 +                out.writeInt(component.minLocalDeletionTime);
              out.writeInt(component.maxLocalDeletionTime);
 +            if (version.storeRows())
 +            {
 +                out.writeInt(component.minTTL);
 +                out.writeInt(component.maxTTL);
 +            }
              out.writeDouble(component.compressionRatio);
              StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
              out.writeInt(component.sstableLevel);
              out.writeLong(component.repairedAt);
 -            out.writeInt(component.minColumnNames.size());
 -            for (ByteBuffer columnName : component.minColumnNames)
 -                ByteBufferUtil.writeWithShortLength(columnName, out);
 -            out.writeInt(component.maxColumnNames.size());
 -            for (ByteBuffer columnName : component.maxColumnNames)
 -                ByteBufferUtil.writeWithShortLength(columnName, out);
 +            out.writeInt(component.minClusteringValues.size());
 +            for (ByteBuffer value : component.minClusteringValues)
 +                ByteBufferUtil.writeWithShortLength(value, out);
 +            out.writeInt(component.maxClusteringValues.size());
 +            for (ByteBuffer value : component.maxClusteringValues)
 +                ByteBufferUtil.writeWithShortLength(value, out);
              out.writeBoolean(component.hasLegacyCounterShards);
 +
 +            if (version.storeRows())
 +            {
 +                out.writeLong(component.totalColumnsSet);
 +                out.writeLong(component.totalRows);
 +            }
++
+             if (version.hasCommitLogLowerBound())
+                 ReplayPosition.serializer.serialize(component.commitLogLowerBound, out);
          }
  
 -        public StatsMetadata deserialize(Version version, DataInput in) throws IOException
 +        public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
          {
 -            EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
 +            EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
              EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
-             ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+             ReplayPosition commitLogLowerBound = ReplayPosition.NONE, commitLogUpperBound;
+             commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
              long minTimestamp = in.readLong();
              long maxTimestamp = in.readLong();
 +            // We use MAX_VALUE as that's the default value for "no deletion time"
 +            int minLocalDeletionTime = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
              int maxLocalDeletionTime = in.readInt();
 +            int minTTL = version.storeRows() ? in.readInt() : 0;
 +            int maxTTL = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
              double compressionRatio = in.readDouble();
              StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
              int sstableLevel = in.readInt();
@@@ -320,18 -276,15 +333,22 @@@
              if (version.tracksLegacyCounterShards())
                  hasLegacyCounterShards = in.readBoolean();
  
 +            long totalColumnsSet = version.storeRows() ? in.readLong() : -1L;
 +            long totalRows = version.storeRows() ? in.readLong() : -1L;
 +
+             if (version.hasCommitLogLowerBound())
+                 commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
 -            return new StatsMetadata(rowSizes,
++
 +            return new StatsMetadata(partitionSizes,
                                       columnCounts,
-                                      replayPosition,
+                                      commitLogLowerBound,
+                                      commitLogUpperBound,
                                       minTimestamp,
                                       maxTimestamp,
 +                                     minLocalDeletionTime,
                                       maxLocalDeletionTime,
 +                                     minTTL,
 +                                     maxTTL,
                                       compressionRatio,
                                       tombstoneHistogram,
                                       sstableLevel,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-CompressionInfo.db
index 0000000,0000000..e32d12f
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Data.db
index 0000000,0000000..790cbe1
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Digest.crc32
index 0000000,0000000..66d1d34
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++163939259

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Filter.db
index 0000000,0000000..2e1d5d2
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Index.db
index 0000000,0000000..38d0537
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Statistics.db
index 0000000,0000000..a53b7c1
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Summary.db
index 0000000,0000000..9b24e04
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-TOC.txt
index 0000000,0000000..fe9581f
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Summary.db
++TOC.txt
++Digest.crc32
++Filter.db
++Statistics.db
++Data.db
++Index.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-CompressionInfo.db
index 0000000,0000000..e3d4377
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Data.db
index 0000000,0000000..c5aa7ce
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Digest.crc32
index 0000000,0000000..4e3bf89
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++3387396134

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Filter.db
index 0000000,0000000..2e1d5d2
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Index.db
index 0000000,0000000..54afd0c
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Statistics.db
index 0000000,0000000..9d9b085
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Summary.db
index 0000000,0000000..9b24e04
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-TOC.txt
index 0000000,0000000..fe9581f
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Summary.db
++TOC.txt
++Digest.crc32
++Filter.db
++Statistics.db
++Data.db
++Index.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-CompressionInfo.db
index 0000000,0000000..5a34d6e
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Data.db
index 0000000,0000000..55ad88d
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Digest.crc32
index 0000000,0000000..86240b5
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++346435755

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Filter.db
index 0000000,0000000..2e1d5d2
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Index.db
index 0000000,0000000..d1a401b
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Statistics.db
index 0000000,0000000..4f3f45a
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Summary.db
index 0000000,0000000..9b24e04
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-TOC.txt
index 0000000,0000000..fe9581f
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Summary.db
++TOC.txt
++Digest.crc32
++Filter.db
++Statistics.db
++Data.db
++Index.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-CompressionInfo.db
index 0000000,0000000..32fa731
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Data.db
index 0000000,0000000..ca10874
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Digest.crc32
index 0000000,0000000..98f5784
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++3740016396

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Filter.db
index 0000000,0000000..2e1d5d2
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Index.db
index 0000000,0000000..3661d77
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Statistics.db
index 0000000,0000000..bb1e3fb
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Summary.db
index 0000000,0000000..9b24e04
new file mode 100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-TOC.txt
index 0000000,0000000..fe9581f
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Summary.db
++TOC.txt
++Digest.crc32
++Filter.db
++Statistics.db
++Data.db
++Index.db