You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/12 13:23:18 UTC
[13/20] cassandra git commit: Merge commit
'849a438690aa97a361227781108cc90355dcbcd9' into cassandra-3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index b62c7e3,fba1627..17062b4
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -33,16 -33,14 +33,16 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.utils.Interval;
import static com.google.common.base.Predicates.equalTo;
++import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableList.copyOf;
import static com.google.common.collect.ImmutableList.of;
import static com.google.common.collect.Iterables.all;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
-import static java.util.Collections.singleton;
+import static com.google.common.collect.Iterables.transform;
- import static java.util.Collections.singleton;
import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
+import static org.apache.cassandra.db.lifecycle.Helpers.filterOut;
import static org.apache.cassandra.db.lifecycle.Helpers.replace;
/**
@@@ -68,18 -66,18 +68,19 @@@ public class Vie
* flushed. In chronologically ascending order.
*/
public final List<Memtable> flushingMemtables;
- public final Set<SSTableReader> compacting;
- public final Set<SSTableReader> sstables;
- public final Set<SSTableReader> premature;
+ final Set<SSTableReader> compacting;
+ final Set<SSTableReader> sstables;
++ final Set<SSTableReader> premature;
// we use a Map here so that we can easily perform identity checks as well as equality checks.
// When marking compacting, we now indicate if we expect the sstables to be present (by default we do),
// and we then check that not only are they all present in the live set, but that the exact instance present is
// the one we made our decision to compact against.
- public final Map<SSTableReader, SSTableReader> sstablesMap;
+ final Map<SSTableReader, SSTableReader> sstablesMap;
+ final Map<SSTableReader, SSTableReader> compactingMap;
- public final SSTableIntervalTree intervalTree;
+ final SSTableIntervalTree intervalTree;
- View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, SSTableIntervalTree intervalTree)
- View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
++ View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
{
assert liveMemtables != null;
assert flushingMemtables != null;
@@@ -92,8 -90,8 +93,9 @@@
this.sstablesMap = sstables;
this.sstables = sstablesMap.keySet();
- this.compacting = compacting;
+ this.compactingMap = compacting;
+ this.compacting = compactingMap.keySet();
+ this.premature = premature;
this.intervalTree = intervalTree;
}
@@@ -238,8 -158,8 +240,8 @@@
{
assert all(mark, Helpers.idIn(view.sstablesMap));
return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
- replace(view.compacting, unmark, mark),
+ replace(view.compactingMap, unmark, mark),
- view.intervalTree);
+ view.premature, view.intervalTree);
}
};
}
@@@ -270,7 -190,7 +272,7 @@@
public View apply(View view)
{
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
- return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap,
- return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting, view.premature,
++ return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap, view.premature,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
@@@ -285,7 -205,7 +287,7 @@@
{
List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
assert newLive.size() == view.liveMemtables.size() + 1;
- return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree);
- return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.premature, view.intervalTree);
++ return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
}
};
}
@@@ -304,7 -224,7 +306,7 @@@
filter(flushing, not(lessThan(toFlush)))));
assert newLive.size() == live.size() - 1;
assert newFlushing.size() == flushing.size() + 1;
- return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.intervalTree);
- return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.premature, view.intervalTree);
++ return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
}
};
}
@@@ -319,17 -239,35 +321,34 @@@
List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
-- if (flushed == null)
++ if (flushed == null || flushed.isEmpty())
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
- view.compactingMap, view.intervalTree);
- view.compacting, view.premature, view.intervalTree);
++ view.compactingMap, view.premature, view.intervalTree);
- Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
- Set<SSTableReader> compacting = replace(view.compacting, emptySet(), singleton(flushed));
- Set<SSTableReader> premature = replace(view.premature, emptySet(), singleton(flushed));
- return new View(view.liveMemtables, flushingMemtables, sstableMap, compacting, premature,
+ Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
- return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
++ Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
++ Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
++ return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
}
- static Function<View, View> permitCompactionOfFlushed(final SSTableReader reader)
++ static Function<View, View> permitCompactionOfFlushed(final Collection<SSTableReader> readers)
+ {
++ Set<SSTableReader> expectAndRemove = ImmutableSet.copyOf(readers);
+ return new Function<View, View>()
+ {
-
- @Nullable
+ public View apply(View view)
+ {
- Set<SSTableReader> premature = ImmutableSet.copyOf(filter(view.premature, not(equalTo(reader))));
- Set<SSTableReader> compacting = ImmutableSet.copyOf(filter(view.compacting, not(equalTo(reader))));
- return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compacting, premature, view.intervalTree);
++ Set<SSTableReader> premature = replace(view.premature, expectAndRemove, emptySet());
++ Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, expectAndRemove, emptySet());
++ return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compactingMap, premature, view.intervalTree);
+ }
+ };
+ }
+
+
private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
{
return new Predicate<T>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/TableViews.java
index 893bdd5,0000000..7feb67c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@@ -1,481 -1,0 +1,481 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+
+/**
+ * Groups all the views for a given table.
+ */
+public class TableViews extends AbstractCollection<View>
+{
+ private final CFMetaData baseTableMetadata;
+
+ // We need this to be thread-safe, but the number of times this is changed (when a view is created in the keyspace)
+ // massively exceeds the number of time it's read (for every mutation on the keyspace), so a copy-on-write list is the best option.
+ private final List<View> views = new CopyOnWriteArrayList();
+
+ public TableViews(CFMetaData baseTableMetadata)
+ {
+ this.baseTableMetadata = baseTableMetadata;
+ }
+
+ public int size()
+ {
+ return views.size();
+ }
+
+ public Iterator<View> iterator()
+ {
+ return views.iterator();
+ }
+
+ public boolean contains(String viewName)
+ {
+ return Iterables.any(views, view -> view.name.equals(viewName));
+ }
+
+ public boolean add(View view)
+ {
+ // We should have validated that there is no existing view with this name at this point
+ assert !contains(view.name);
+ return views.add(view);
+ }
+
+ public Iterable<ColumnFamilyStore> allViewsCfs()
+ {
+ Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName);
+ return Iterables.transform(views, view -> keyspace.getColumnFamilyStore(view.getDefinition().viewName));
+ }
+
+ public void forceBlockingFlush()
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ viewCfs.forceBlockingFlush();
+ }
+
+ public void dumpMemtables()
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ viewCfs.dumpMemtable();
+ }
+
- public void truncateBlocking(long truncatedAt)
++ public void truncateBlocking(ReplayPosition replayAfter, long truncatedAt)
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ {
- ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
++ viewCfs.discardSSTables(truncatedAt);
+ SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
+ }
+ }
+
+ public void removeByName(String viewName)
+ {
+ views.removeIf(v -> v.name.equals(viewName));
+ }
+
+ /**
+ * Calculates and pushes updates to the views replicas. The replicas are determined by
+ * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
+ *
+ * @param update an update on the base table represented by this object.
+ * @param writeCommitLog whether we should write the commit log for the view updates.
+ * @param baseComplete time from epoch in ms that the local base mutation was (or will be) completed
+ */
+ public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong baseComplete)
+ {
+ assert update.metadata().cfId.equals(baseTableMetadata.cfId);
+
+ Collection<View> views = updatedViews(update);
+ if (views.isEmpty())
+ return;
+
+ // Read modified rows
+ int nowInSec = FBUtilities.nowInSeconds();
+ SinglePartitionReadCommand command = readExistingRowsCommand(update, views, nowInSec);
+ if (command == null)
+ return;
+
+ ColumnFamilyStore cfs = Keyspace.openAndGetStore(update.metadata());
+ long start = System.nanoTime();
+ Collection<Mutation> mutations;
+ try (ReadOrderGroup orderGroup = command.startOrderGroup();
+ UnfilteredRowIterator existings = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command);
+ UnfilteredRowIterator updates = update.unfilteredIterator())
+ {
+ mutations = generateViewUpdates(views, updates, existings, nowInSec);
+ }
+ Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS);
+
+ if (!mutations.isEmpty())
+ StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete);
+ }
+
+ /**
+ * Given some updates on the base table of this object and the existing values for the rows affected by that update, generates the
+ * mutation to be applied to the provided views.
+ *
+ * @param views the views potentially affected by {@code updates}.
+ * @param updates the base table updates being applied.
+ * @param existings the existing values for the rows affected by {@code updates}. This is used to decide if a view is
+ * obsoleted by the update and should be removed, gather the values for columns that may not be part of the update if
+ * a new view entry needs to be created, and compute the minimal updates to be applied if the view entry isn't changed
+ * but has simply some updated values. This will be empty for view building as we want to assume anything we'll pass
+ * to {@code updates} is new.
+ * @param nowInSec the current time in seconds.
+ * @return the mutations to apply to the {@code views}. This can be empty.
+ */
+ public Collection<Mutation> generateViewUpdates(Collection<View> views, UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec)
+ {
+ assert updates.metadata().cfId.equals(baseTableMetadata.cfId);
+
+ List<ViewUpdateGenerator> generators = new ArrayList<>(views.size());
+ for (View view : views)
+ generators.add(new ViewUpdateGenerator(view, updates.partitionKey(), nowInSec));
+
+ DeletionTracker existingsDeletion = new DeletionTracker(existings.partitionLevelDeletion());
+ DeletionTracker updatesDeletion = new DeletionTracker(updates.partitionLevelDeletion());
+
+ /*
+ * We iterate through the updates and the existing rows in parallel. This allows us to know the consequence
+ * on the view of each update.
+ */
+ PeekingIterator<Unfiltered> existingsIter = Iterators.peekingIterator(existings);
+ PeekingIterator<Unfiltered> updatesIter = Iterators.peekingIterator(updates);
+
+ while (existingsIter.hasNext() && updatesIter.hasNext())
+ {
+ Unfiltered existing = existingsIter.peek();
+ Unfiltered update = updatesIter.peek();
+
+ Row existingRow;
+ Row updateRow;
+ int cmp = baseTableMetadata.comparator.compare(update, existing);
+ if (cmp < 0)
+ {
+ // We have an update where there was nothing before
+ if (update.isRangeTombstoneMarker())
+ {
+ updatesDeletion.update(updatesIter.next());
+ continue;
+ }
+
+ updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion());
+ existingRow = emptyRow(updateRow.clustering(), existingsDeletion.currentDeletion());
+ }
+ else if (cmp > 0)
+ {
+ // We have something existing but no update (which will happen either because it's a range tombstone marker in
+ // existing, or because we've fetched the existing row due to some partition/range deletion in the updates)
+ if (existing.isRangeTombstoneMarker())
+ {
+ existingsDeletion.update(existingsIter.next());
+ continue;
+ }
+
+ existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion());
+ updateRow = emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion());
+
+ // The way we build the read command used for existing rows, we should always have updatesDeletion.currentDeletion()
+ // that is not live, since we wouldn't have read the existing row otherwise. And we could assert that, but if we ever
+ // change the read method so that it can slightly over-read in some case, that would be an easily avoiding bug lurking,
+ // so we just handle the case.
+ if (updateRow == null)
+ continue;
+ }
+ else
+ {
+ // We're updating a row that had pre-existing data
+ if (update.isRangeTombstoneMarker())
+ {
+ assert existing.isRangeTombstoneMarker();
+ updatesDeletion.update(updatesIter.next());
+ existingsDeletion.update(existingsIter.next());
+ continue;
+ }
+
+ assert !existing.isRangeTombstoneMarker();
+ existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion());
+ updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion());
+ }
+
+ addToViewUpdateGenerators(existingRow, updateRow, generators, nowInSec);
+ }
+
+ // We only care about more existing rows if the update deletion isn't live, i.e. if we had a partition deletion
+ if (!updatesDeletion.currentDeletion().isLive())
+ {
+ while (existingsIter.hasNext())
+ {
+ Unfiltered existing = existingsIter.next();
+ // If it's a range tombstone, we don't care, we're only looking for existing entry that gets deleted by
+ // the new partition deletion
+ if (existing.isRangeTombstoneMarker())
+ continue;
+
+ Row existingRow = (Row)existing;
+ addToViewUpdateGenerators(existingRow, emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), generators, nowInSec);
+ }
+ }
+ while (updatesIter.hasNext())
+ {
+ Unfiltered update = updatesIter.next();
+ // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates
+ if (update.isRangeTombstoneMarker())
+ continue;
+
+ Row updateRow = (Row)update;
+ addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), updateRow, generators, nowInSec);
+ }
+
+ return buildMutations(baseTableMetadata, generators);
+ }
+
+ /**
+ * Return the views that are potentially updated by the provided updates.
+ *
+ * @param updates the updates applied to the base table.
+ * @return the views affected by {@code updates}.
+ */
+ public Collection<View> updatedViews(PartitionUpdate updates)
+ {
+ List<View> matchingViews = new ArrayList<>(views.size());
+
+ for (View view : views)
+ {
+ ReadQuery selectQuery = view.getReadQuery();
+ if (!selectQuery.selectsKey(updates.partitionKey()))
+ continue;
+
+ matchingViews.add(view);
+ }
+ return matchingViews;
+ }
+
+ /**
+ * Returns the command to use to read the existing rows required to generate view updates for the provided base
+ * base updates.
+ *
+ * @param updates the base table updates being applied.
+ * @param views the views potentially affected by {@code updates}.
+ * @param nowInSec the current time in seconds.
+ * @return the command to use to read the base table rows required to generate view updates for {@code updates}.
+ */
+ private SinglePartitionReadCommand readExistingRowsCommand(PartitionUpdate updates, Collection<View> views, int nowInSec)
+ {
+ Slices.Builder sliceBuilder = null;
+ DeletionInfo deletionInfo = updates.deletionInfo();
+ CFMetaData metadata = updates.metadata();
+ DecoratedKey key = updates.partitionKey();
+ // TODO: This is subtle: we need to gather all the slices that we have to fetch between partition del, range tombstones and rows.
+ if (!deletionInfo.isLive())
+ {
+ sliceBuilder = new Slices.Builder(metadata.comparator);
+ // Everything covered by a deletion might invalidate an existing view entry, which means we must read it to know. In practice
+ // though, the views involved might filter some base table clustering columns, in which case we can restrict what we read
+ // using those restrictions.
+ // If there is a partition deletion, then we can simply take each slices from each view select filter. They may overlap but
+ // the Slices.Builder handles that for us. Note that in many case this will just involve reading everything (as soon as any
+ // view involved has no clustering restrictions for instance).
+ // For range tombstone, we should theoretically take the difference between the range tombstoned and the slices selected
+ // by every views, but as we don't an easy way to compute that right now, we keep it simple and just use the tombstoned
+ // range.
+ // TODO: we should improve that latter part.
+ if (!deletionInfo.getPartitionDeletion().isLive())
+ {
+ for (View view : views)
+ sliceBuilder.addAll(view.getSelectStatement().clusteringIndexFilterAsSlices());
+ }
+ else
+ {
+ assert deletionInfo.hasRanges();
+ Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
+ while (iter.hasNext())
+ sliceBuilder.add(iter.next().deletedSlice());
+ }
+ }
+
+ // We need to read every row that is updated, unless we can prove that it has no impact on any view entries.
+
+ // If we had some slices from the deletions above, we'll continue using that. Otherwise, it's more efficient to build
+ // a names query.
+ BTreeSet.Builder<Clustering> namesBuilder = sliceBuilder == null ? BTreeSet.builder(metadata.comparator) : null;
+ for (Row row : updates)
+ {
+ // Don't read the existing state if we can prove the update won't affect any views
+ if (!affectsAnyViews(key, row, views))
+ continue;
+
+ if (namesBuilder == null)
+ sliceBuilder.add(Slice.make(row.clustering()));
+ else
+ namesBuilder.add(row.clustering());
+ }
+
+ NavigableSet<Clustering> names = namesBuilder == null ? null : namesBuilder.build();
+ // If we have a slice builder, it means we had some deletions and we have to read. But if we had
+ // only row updates, it's possible none of them affected the views, in which case we have nothing
+ // to do.
+ if (names != null && names.isEmpty())
+ return null;
+
+ ClusteringIndexFilter clusteringFilter = names == null
+ ? new ClusteringIndexSliceFilter(sliceBuilder.build(), false)
+ : new ClusteringIndexNamesFilter(names, false);
+ // If we have more than one view, we should merge the queried columns by each views but to keep it simple we just
+ // include everything. We could change that in the future.
+ ColumnFilter queriedColumns = views.size() == 1
+ ? Iterables.getOnlyElement(views).getSelectStatement().queriedColumns()
+ : ColumnFilter.all(metadata);
+ // Note that the views could have restrictions on regular columns, but even if that's the case we shouldn't apply those
+ // when we read, because even if an existing row doesn't match the view filter, the update can change that in which
+ // case we'll need to know the existing content. There is also no easy way to merge those RowFilter when we have multiple views.
+ // TODO: we could still make sense to special case for when there is a single view and a small number of updates (and
+ // no deletions). Indeed, in that case we could check whether any of the update modify any of the restricted regular
+ // column, and if that's not the case we could use view filter. We keep it simple for now though.
+ RowFilter rowFilter = RowFilter.NONE;
+ return SinglePartitionReadCommand.create(metadata, nowInSec, queriedColumns, rowFilter, DataLimits.NONE, key, clusteringFilter);
+ }
+
+ private boolean affectsAnyViews(DecoratedKey partitionKey, Row update, Collection<View> views)
+ {
+ for (View view : views)
+ {
+ if (view.mayBeAffectedBy(partitionKey, update))
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Given an existing base row and the update that we're going to apply to this row, generate the modifications
+ * to apply to MVs using the provided {@code ViewUpdateGenerator}s.
+ *
+ * @param existingBaseRow the base table row as it is before an update.
+ * @param updateBaseRow the newly updates made to {@code existingBaseRow}.
+ * @param generators the view update generators to add the new changes to.
+ * @param nowInSec the current time in seconds. Used to decide if data is live or not.
+ */
+ private static void addToViewUpdateGenerators(Row existingBaseRow, Row updateBaseRow, Collection<ViewUpdateGenerator> generators, int nowInSec)
+ {
+ // Having existing empty is useful, it just means we'll insert a brand new entry for updateBaseRow,
+ // but if we have no update at all, we shouldn't get there.
+ assert !updateBaseRow.isEmpty();
+
+ // We allow existingBaseRow to be null, which we treat the same as being empty as an small optimization
+ // to avoid allocating empty row objects when we know there was nothing existing.
+ Row mergedBaseRow = existingBaseRow == null ? updateBaseRow : Rows.merge(existingBaseRow, updateBaseRow, nowInSec);
+ for (ViewUpdateGenerator generator : generators)
+ generator.addBaseTableUpdate(existingBaseRow, mergedBaseRow);
+ }
+
+ private static Row emptyRow(Clustering clustering, DeletionTime deletion)
+ {
+ // Returning null for an empty row is slightly ugly, but the case where there is no pre-existing row is fairly common
+ // (especially when building the view), so we want to avoid a dummy allocation of an empty row every time.
+ // And MultiViewUpdateBuilder knows how to deal with that.
+ return deletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(deletion));
+ }
+
+ /**
+ * Extracts (and potentially groups) the mutations generated by the provided view update generator.
+ * Returns the mutation that needs to be done to the views given the base table updates
+ * passed to {@link #addBaseTableUpdate}.
+ *
+ * @param baseTableMetadata the metadata for the base table being updated.
+ * @param generators the generators from which to extract the view mutations from.
+ * @return the mutations created by all the generators in {@code generators}.
+ */
+ private Collection<Mutation> buildMutations(CFMetaData baseTableMetadata, List<ViewUpdateGenerator> generators)
+ {
+ // One view is probably common enough and we can optimize a bit easily
+ if (generators.size() == 1)
+ {
+ Collection<PartitionUpdate> updates = generators.get(0).generateViewUpdates();
+ List<Mutation> mutations = new ArrayList<>(updates.size());
+ for (PartitionUpdate update : updates)
+ mutations.add(new Mutation(update));
+ return mutations;
+ }
+
+ Map<DecoratedKey, Mutation> mutations = new HashMap<>();
+ for (ViewUpdateGenerator generator : generators)
+ {
+ for (PartitionUpdate update : generator.generateViewUpdates())
+ {
+ DecoratedKey key = update.partitionKey();
+ Mutation mutation = mutations.get(key);
+ if (mutation == null)
+ {
+ mutation = new Mutation(baseTableMetadata.ksName, key);
+ mutations.put(key, mutation);
+ }
+ mutation.add(update);
+ }
+ }
+ return mutations.values();
+ }
+
+ /**
+ * A simple helper that tracks for a given {@code UnfilteredRowIterator} what is the current deletion at any time of the
+ * iteration. It will be the currently open range tombstone deletion if there is one and the partition deletion otherwise.
+ */
+ private static class DeletionTracker
+ {
+ private final DeletionTime partitionDeletion;
+ private DeletionTime deletion;
+
+ public DeletionTracker(DeletionTime partitionDeletion)
+ {
+ this.partitionDeletion = partitionDeletion;
+ }
+
+ public void update(Unfiltered marker)
+ {
+ assert marker instanceof RangeTombstoneMarker;
+ RangeTombstoneMarker rtm = (RangeTombstoneMarker)marker;
+ this.deletion = rtm.isOpen(false)
+ ? rtm.openDeletionTime(false)
+ : null;
+ }
+
+ public DeletionTime currentDeletion()
+ {
+ return deletion == null ? partitionDeletion : deletion;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/Version.java
index 16829ab,41a83e1..d9e289c
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@@ -60,16 -52,8 +60,18 @@@ public abstract class Versio
public abstract boolean hasNewFileName();
+ public abstract boolean storeRows();
+
+ public abstract int correspondingMessagingVersion(); // Only use by storage that 'storeRows' so far
+
+ public abstract boolean hasOldBfHashOrder();
+
+ public abstract boolean hasCompactionAncestors();
+
+ public abstract boolean hasBoundaries();
+
+ public abstract boolean hasCommitLogLowerBound();
+
public String getVersion()
{
return version;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 1f2a98f,9244bbb..e0fb3b1
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@@ -111,7 -125,7 +111,7 @@@ public class BigFormat implements SSTab
// we always incremented the major version.
static class BigVersion extends Version
{
- public static final String current_version = "ma";
- public static final String current_version = "lb";
++ public static final String current_version = "mb";
public static final String earliest_supported_version = "jb";
// jb (2.0.1): switch from crc32 to adler32 for compression checksums
@@@ -121,10 -135,7 +121,12 @@@
// switch uncompressed checksums to adler32
// tracks presense of legacy (local and remote) counter shards
// la (2.2.0): new file name format
+ // lb (2.2.7): commit log lower bound included
+ // ma (3.0.0): swap bf hash order
+ // store rows natively
++ // mb (3.0.6): commit log lower bound included
+ //
+ // NOTE: when adding a new version, please add that to LegacySSTableTest, too.
private final boolean isLatestVersion;
private final boolean hasSamplingLevel;
@@@ -134,53 -144,20 +136,56 @@@
private final boolean hasRepairedAt;
private final boolean tracksLegacyCounterShards;
private final boolean newFileName;
+ public final boolean storeRows;
+ public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far
+ public final boolean hasBoundaries;
+ /**
+ * CASSANDRA-8413: 3.0 bloom filter representation changed (two longs just swapped)
+ * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution.
+ */
+ private final boolean hasOldBfHashOrder;
+ private final boolean hasCommitLogLowerBound;
- public BigVersion(String version)
+ /**
+ * CASSANDRA-7066: compaction ancerstors are no longer used and have been removed.
+ */
+ private final boolean hasCompactionAncestors;
+
+ BigVersion(String version)
{
- super(instance,version);
+ super(instance, version);
isLatestVersion = version.compareTo(current_version) == 0;
hasSamplingLevel = version.compareTo("ka") >= 0;
newStatsFile = version.compareTo("ka") >= 0;
- hasAllAdlerChecksums = version.compareTo("ka") >= 0;
+
+ //For a while Adler32 was in use, now the CRC32 instrinsic is very good especially after Haswell
+ //PureJavaCRC32 was always faster than Adler32. See CASSANDRA-8684
+ ChecksumType checksumType = ChecksumType.CRC32;
+ if (version.compareTo("ka") >= 0 && version.compareTo("ma") < 0)
+ checksumType = ChecksumType.Adler32;
+ this.uncompressedChecksumType = checksumType;
+
+ checksumType = ChecksumType.CRC32;
+ if (version.compareTo("jb") >= 0 && version.compareTo("ma") < 0)
+ checksumType = ChecksumType.Adler32;
+ this.compressedChecksumType = checksumType;
+
hasRepairedAt = version.compareTo("ka") >= 0;
tracksLegacyCounterShards = version.compareTo("ka") >= 0;
+
newFileName = version.compareTo("la") >= 0;
- hasCommitLogLowerBound = version.compareTo("lb") >= 0;
+
+ hasOldBfHashOrder = version.compareTo("ma") < 0;
+ hasCompactionAncestors = version.compareTo("ma") < 0;
+ storeRows = version.compareTo("ma") >= 0;
+ correspondingMessagingVersion = storeRows
+ ? MessagingService.VERSION_30
+ : MessagingService.VERSION_21;
+
+ hasBoundaries = version.compareTo("ma") < 0;
++ hasCommitLogLowerBound = (version.compareTo("lb") >= 0 && version.compareTo("ma") < 0)
++ || version.compareTo("mb") >= 0;
}
@Override
@@@ -243,25 -202,12 +248,30 @@@
return newFileName;
}
+ public boolean hasCommitLogLowerBound()
+ {
+ return hasCommitLogLowerBound;
+ }
+
@Override
+ public boolean storeRows()
+ {
+ return storeRows;
+ }
+
+ @Override
+ public int correspondingMessagingVersion()
+ {
+ return correspondingMessagingVersion;
+ }
+
+ @Override
+ public boolean hasBoundaries()
+ {
+ return hasBoundaries;
+ }
+
+ @Override
public boolean isCompatible()
{
return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
index 4f4c04b,c8e6ee8..ef3453a
--- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@@ -72,14 -72,22 +72,23 @@@ public class CompactionMetadata extend
public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>
{
- public int serializedSize(CompactionMetadata component, Version version) throws IOException
+ public int serializedSize(Version version, CompactionMetadata component) throws IOException
{
- int size = 0;
- size += TypeSizes.NATIVE.sizeof(component.ancestors.size());
- for (int g : component.ancestors)
- size += TypeSizes.NATIVE.sizeof(g);
++ int sz = 0;
++ if (version.hasCompactionAncestors())
++ { // write empty ancestor marker
++ sz = 4;
++ }
byte[] serializedCardinality = component.cardinalityEstimator.getBytes();
- return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length;
- size += TypeSizes.NATIVE.sizeof(serializedCardinality.length) + serializedCardinality.length;
- return size;
++ return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length + sz;
}
- public void serialize(CompactionMetadata component, Version version, DataOutputPlus out) throws IOException
+ public void serialize(Version version, CompactionMetadata component, DataOutputPlus out) throws IOException
{
- out.writeInt(component.ancestors.size());
- for (int g : component.ancestors)
- out.writeInt(g);
++ if (version.hasCompactionAncestors())
++ { // write empty ancestor marker
++ out.writeInt(0);
++ }
ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 2ed845c,bfeb930..4561520
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@@ -53,24 -51,28 +53,26 @@@ public class LegacyMetadataSerializer e
assert validation != null && stats != null && compaction != null && validation.partitioner != null;
- EstimatedHistogram.serializer.serialize(stats.estimatedRowSize, out);
+ EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(stats.replayPosition, out);
+ ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out);
out.writeLong(stats.minTimestamp);
out.writeLong(stats.maxTimestamp);
out.writeInt(stats.maxLocalDeletionTime);
out.writeDouble(validation.bloomFilterFPChance);
out.writeDouble(stats.compressionRatio);
out.writeUTF(validation.partitioner);
- out.writeInt(compaction.ancestors.size());
- for (Integer g : compaction.ancestors)
- out.writeInt(g);
+ out.writeInt(0); // compaction ancestors
StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out);
out.writeInt(stats.sstableLevel);
- out.writeInt(stats.minColumnNames.size());
- for (ByteBuffer columnName : stats.minColumnNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
- out.writeInt(stats.maxColumnNames.size());
- for (ByteBuffer columnName : stats.maxColumnNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
+ out.writeInt(stats.minClusteringValues.size());
+ for (ByteBuffer value : stats.minClusteringValues)
+ ByteBufferUtil.writeWithShortLength(value, out);
+ out.writeInt(stats.maxClusteringValues.size());
+ for (ByteBuffer value : stats.maxClusteringValues)
+ ByteBufferUtil.writeWithShortLength(value, out);
+ if (version.hasCommitLogLowerBound())
+ ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out);
}
/**
@@@ -88,11 -90,12 +90,12 @@@
}
else
{
- try (DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile))))
+ try (DataInputStreamPlus in = new DataInputStreamPlus(new BufferedInputStream(new FileInputStream(statsFile))))
{
- EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
+ EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
- ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+ ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
+ ReplayPosition commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
int maxLocalDeletionTime = in.readInt();
@@@ -115,21 -120,21 +118,25 @@@
List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount);
for (int i = 0; i < colCount; i++)
maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+
+ if (descriptor.version.hasCommitLogLowerBound())
+ commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+
if (types.contains(MetadataType.VALIDATION))
components.put(MetadataType.VALIDATION,
new ValidationMetadata(partitioner, bloomFilterFPChance));
if (types.contains(MetadataType.STATS))
components.put(MetadataType.STATS,
- new StatsMetadata(rowSizes,
+ new StatsMetadata(partitionSizes,
columnCounts,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
+ Integer.MAX_VALUE,
maxLocalDeletionTime,
+ 0,
+ Integer.MAX_VALUE,
compressionRatio,
tombstoneHistogram,
sstableLevel,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index c2b0caf,579ff7a..53cf0b0
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@@ -26,16 -26,17 +26,16 @@@ import java.util.List
import java.util.Map;
import java.util.Set;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
+ import com.google.common.collect.Ordering;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.ColumnNameHelper;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.db.rows.Cell;
- import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.ActiveRepairService;
@@@ -67,9 -67,10 +67,10 @@@ public class MetadataCollector implemen
public static StatsMetadata defaultStatsMetadata()
{
- return new StatsMetadata(defaultRowSizeHistogram(),
- defaultColumnCountHistogram(),
+ return new StatsMetadata(defaultPartitionSizeHistogram(),
+ defaultCellPerPartitionCountHistogram(),
ReplayPosition.NONE,
+ ReplayPosition.NONE,
Long.MIN_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE,
@@@ -82,26 -80,23 +83,27 @@@
Collections.<ByteBuffer>emptyList(),
Collections.<ByteBuffer>emptyList(),
true,
- ActiveRepairService.UNREPAIRED_SSTABLE);
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ -1,
+ -1);
}
- protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
- protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
+ protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
+ // TODO: cound the number of row per partition (either with the number of cells, or instead)
+ protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
- protected ReplayPosition replayPosition = ReplayPosition.NONE;
+ protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
+ protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
- protected long minTimestamp = Long.MAX_VALUE;
- protected long maxTimestamp = Long.MIN_VALUE;
- protected int maxLocalDeletionTime = Integer.MIN_VALUE;
+ protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
+ protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
+ protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
protected double compressionRatio = NO_COMPRESSION_RATIO;
- protected Set<Integer> ancestors = new HashSet<>();
protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
protected int sstableLevel;
- protected List<ByteBuffer> minColumnNames = Collections.emptyList();
- protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+ protected ByteBuffer[] minClusteringValues;
+ protected ByteBuffer[] maxClusteringValues;
protected boolean hasLegacyCounterShards = false;
+ protected long totalColumnsSet;
+ protected long totalRows;
/**
* Default cardinality estimation method is to use HyperLogLog++.
@@@ -110,22 -105,52 +112,38 @@@
* See CASSANDRA-5906 for detail.
*/
protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
- private final CellNameType columnNameComparator;
+ private final ClusteringComparator comparator;
- public MetadataCollector(CellNameType columnNameComparator)
+ public MetadataCollector(ClusteringComparator comparator)
{
- this.columnNameComparator = columnNameComparator;
+ this.comparator = comparator;
+
+ this.minClusteringValues = new ByteBuffer[comparator.size()];
+ this.maxClusteringValues = new ByteBuffer[comparator.size()];
}
- public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level, boolean skipAncestors)
+ public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level)
{
- this(columnNameComparator);
+ this(comparator);
- replayPosition(ReplayPosition.getReplayPosition(sstables));
+ ReplayPosition min = null, max = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (min == null)
+ {
+ min = sstable.getSSTableMetadata().commitLogLowerBound;
+ max = sstable.getSSTableMetadata().commitLogUpperBound;
+ }
+ else
+ {
+ min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound);
+ max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound);
+ }
+ }
+
+ commitLogLowerBound(min);
+ commitLogUpperBound(max);
sstableLevel(level);
- // Get the max timestamp of the precompacted sstables
- // and adds generation of live ancestors
- if (!skipAncestors)
- {
- for (SSTableReader sstable : sstables)
- {
- addAncestor(sstable.descriptor.generation);
- for (Integer i : sstable.getAncestors())
- if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
- addAncestor(i);
- }
- }
- }
-
- public MetadataCollector(Iterable<SSTableReader> sstables, CellNameType columnNameComparator, int level)
- {
- this(sstables, columnNameComparator, level, false);
}
public MetadataCollector addKey(ByteBuffer key)
@@@ -163,57 -188,45 +181,63 @@@
return this;
}
- public MetadataCollector updateMinTimestamp(long potentialMin)
+ public void update(LivenessInfo newInfo)
{
- minTimestamp = Math.min(minTimestamp, potentialMin);
- return this;
+ if (newInfo.isEmpty())
+ return;
+
+ updateTimestamp(newInfo.timestamp());
+ updateTTL(newInfo.ttl());
+ updateLocalDeletionTime(newInfo.localExpirationTime());
}
- public MetadataCollector updateMaxTimestamp(long potentialMax)
+ public void update(Cell cell)
{
- maxTimestamp = Math.max(maxTimestamp, potentialMax);
- return this;
+ updateTimestamp(cell.timestamp());
+ updateTTL(cell.ttl());
+ updateLocalDeletionTime(cell.localDeletionTime());
}
- public MetadataCollector updateMaxLocalDeletionTime(int maxLocalDeletionTime)
+ public void update(DeletionTime dt)
{
- this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
- return this;
+ if (!dt.isLive())
+ {
+ updateTimestamp(dt.markedForDeleteAt());
+ updateLocalDeletionTime(dt.localDeletionTime());
+ }
}
- public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize)
+ public void updateColumnSetPerRow(long columnSetInRow)
{
- this.estimatedRowSize = estimatedRowSize;
- return this;
+ totalColumnsSet += columnSetInRow;
+ ++totalRows;
}
- public MetadataCollector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
+ private void updateTimestamp(long newTimestamp)
{
- this.estimatedColumnCount = estimatedColumnCount;
- return this;
+ timestampTracker.update(newTimestamp);
+ }
+
+ private void updateLocalDeletionTime(int newLocalDeletionTime)
+ {
+ localDeletionTimeTracker.update(newLocalDeletionTime);
+ estimatedTombstoneDropTime.update(newLocalDeletionTime);
+ }
+
+ private void updateTTL(int newTTL)
+ {
+ ttlTracker.update(newTTL);
}
- public MetadataCollector replayPosition(ReplayPosition replayPosition)
+ public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound)
+ {
+ this.commitLogLowerBound = commitLogLowerBound;
+ return this;
+ }
+
+ public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound)
{
- this.replayPosition = replayPosition;
+ this.commitLogUpperBound = commitLogUpperBound;
return this;
}
@@@ -276,15 -280,13 +300,16 @@@
{
Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
- components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize,
- estimatedColumnCount,
+ components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize,
+ estimatedCellPerPartitionCount,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
- minTimestamp,
- maxTimestamp,
- maxLocalDeletionTime,
+ timestampTracker.min(),
+ timestampTracker.max(),
+ localDeletionTimeTracker.min(),
+ localDeletionTimeTracker.max(),
+ ttlTracker.min(),
+ ttlTracker.max(),
compressionRatio,
estimatedTombstoneDropTime,
sstableLevel,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 824bf8b,3d48e34..07e35bb
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@@ -40,53 -41,43 +40,56 @@@ public class StatsMetadata extends Meta
{
public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
- public final EstimatedHistogram estimatedRowSize;
+ public final EstimatedHistogram estimatedPartitionSize;
public final EstimatedHistogram estimatedColumnCount;
- public final ReplayPosition replayPosition;
+ public final ReplayPosition commitLogLowerBound;
+ public final ReplayPosition commitLogUpperBound;
public final long minTimestamp;
public final long maxTimestamp;
+ public final int minLocalDeletionTime;
public final int maxLocalDeletionTime;
+ public final int minTTL;
+ public final int maxTTL;
public final double compressionRatio;
public final StreamingHistogram estimatedTombstoneDropTime;
public final int sstableLevel;
- public final List<ByteBuffer> maxColumnNames;
- public final List<ByteBuffer> minColumnNames;
+ public final List<ByteBuffer> minClusteringValues;
+ public final List<ByteBuffer> maxClusteringValues;
public final boolean hasLegacyCounterShards;
public final long repairedAt;
+ public final long totalColumnsSet;
+ public final long totalRows;
- public StatsMetadata(EstimatedHistogram estimatedRowSize,
+ public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
EstimatedHistogram estimatedColumnCount,
- ReplayPosition replayPosition,
+ ReplayPosition commitLogLowerBound,
+ ReplayPosition commitLogUpperBound,
long minTimestamp,
long maxTimestamp,
+ int minLocalDeletionTime,
int maxLocalDeletionTime,
+ int minTTL,
+ int maxTTL,
double compressionRatio,
StreamingHistogram estimatedTombstoneDropTime,
int sstableLevel,
- List<ByteBuffer> minColumnNames,
- List<ByteBuffer> maxColumnNames,
+ List<ByteBuffer> minClusteringValues,
+ List<ByteBuffer> maxClusteringValues,
boolean hasLegacyCounterShards,
- long repairedAt)
+ long repairedAt,
+ long totalColumnsSet,
+ long totalRows)
{
- this.estimatedRowSize = estimatedRowSize;
+ this.estimatedPartitionSize = estimatedPartitionSize;
this.estimatedColumnCount = estimatedColumnCount;
- this.replayPosition = replayPosition;
+ this.commitLogLowerBound = commitLogLowerBound;
+ this.commitLogUpperBound = commitLogUpperBound;
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
+ this.minLocalDeletionTime = minLocalDeletionTime;
this.maxLocalDeletionTime = maxLocalDeletionTime;
+ this.minTTL = minTTL;
+ this.maxTTL = maxTTL;
this.compressionRatio = compressionRatio;
this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
this.sstableLevel = sstableLevel;
@@@ -129,15 -118,13 +132,16 @@@
public StatsMetadata mutateLevel(int newLevel)
{
- return new StatsMetadata(estimatedRowSize,
+ return new StatsMetadata(estimatedPartitionSize,
estimatedColumnCount,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
+ minLocalDeletionTime,
maxLocalDeletionTime,
+ minTTL,
+ maxTTL,
compressionRatio,
estimatedTombstoneDropTime,
newLevel,
@@@ -151,15 -136,13 +155,16 @@@
public StatsMetadata mutateRepairedAt(long newRepairedAt)
{
- return new StatsMetadata(estimatedRowSize,
+ return new StatsMetadata(estimatedPartitionSize,
estimatedColumnCount,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
+ minLocalDeletionTime,
maxLocalDeletionTime,
+ minTTL,
+ maxTTL,
compressionRatio,
estimatedTombstoneDropTime,
sstableLevel,
@@@ -179,15 -160,13 +184,16 @@@
StatsMetadata that = (StatsMetadata) o;
return new EqualsBuilder()
- .append(estimatedRowSize, that.estimatedRowSize)
+ .append(estimatedPartitionSize, that.estimatedPartitionSize)
.append(estimatedColumnCount, that.estimatedColumnCount)
- .append(replayPosition, that.replayPosition)
+ .append(commitLogLowerBound, that.commitLogLowerBound)
+ .append(commitLogUpperBound, that.commitLogUpperBound)
.append(minTimestamp, that.minTimestamp)
.append(maxTimestamp, that.maxTimestamp)
+ .append(minLocalDeletionTime, that.minLocalDeletionTime)
.append(maxLocalDeletionTime, that.maxLocalDeletionTime)
+ .append(minTTL, that.minTTL)
+ .append(maxTTL, that.maxTTL)
.append(compressionRatio, that.compressionRatio)
.append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime)
.append(sstableLevel, that.sstableLevel)
@@@ -204,15 -181,13 +210,16 @@@
public int hashCode()
{
return new HashCodeBuilder()
- .append(estimatedRowSize)
+ .append(estimatedPartitionSize)
.append(estimatedColumnCount)
- .append(replayPosition)
+ .append(commitLogLowerBound)
+ .append(commitLogUpperBound)
.append(minTimestamp)
.append(maxTimestamp)
+ .append(minLocalDeletionTime)
.append(maxLocalDeletionTime)
+ .append(minTTL)
+ .append(maxTTL)
.append(compressionRatio)
.append(estimatedTombstoneDropTime)
.append(sstableLevel)
@@@ -227,78 -200,61 +234,84 @@@
public static class StatsMetadataSerializer implements IMetadataComponentSerializer<StatsMetadata>
{
- public int serializedSize(StatsMetadata component, Version version) throws IOException
+ public int serializedSize(Version version, StatsMetadata component) throws IOException
{
int size = 0;
- size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
- size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
- size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound, TypeSizes.NATIVE);
- size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
- size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
- size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
+ size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
+ size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
- size += ReplayPosition.serializer.serializedSize(component.replayPosition);
++ size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound);
+ if (version.storeRows())
+ size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
+ else
+ size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
+ size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime);
+ size += TypeSizes.sizeof(component.sstableLevel);
// min column names
size += 4;
- for (ByteBuffer columnName : component.minColumnNames)
- size += 2 + columnName.remaining(); // with short length
+ for (ByteBuffer value : component.minClusteringValues)
+ size += 2 + value.remaining(); // with short length
// max column names
size += 4;
- for (ByteBuffer columnName : component.maxColumnNames)
- size += 2 + columnName.remaining(); // with short length
- size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards);
+ for (ByteBuffer value : component.maxClusteringValues)
+ size += 2 + value.remaining(); // with short length
+ size += TypeSizes.sizeof(component.hasLegacyCounterShards);
+ if (version.storeRows())
+ size += 8 + 8; // totalColumnsSet, totalRows
+ if (version.hasCommitLogLowerBound())
- size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound, TypeSizes.NATIVE);
++ size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound);
return size;
}
- public void serialize(StatsMetadata component, Version version, DataOutputPlus out) throws IOException
+ public void serialize(Version version, StatsMetadata component, DataOutputPlus out) throws IOException
{
- EstimatedHistogram.serializer.serialize(component.estimatedRowSize, out);
+ EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(component.replayPosition, out);
+ ReplayPosition.serializer.serialize(component.commitLogUpperBound, out);
out.writeLong(component.minTimestamp);
out.writeLong(component.maxTimestamp);
+ if (version.storeRows())
+ out.writeInt(component.minLocalDeletionTime);
out.writeInt(component.maxLocalDeletionTime);
+ if (version.storeRows())
+ {
+ out.writeInt(component.minTTL);
+ out.writeInt(component.maxTTL);
+ }
out.writeDouble(component.compressionRatio);
StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
out.writeInt(component.sstableLevel);
out.writeLong(component.repairedAt);
- out.writeInt(component.minColumnNames.size());
- for (ByteBuffer columnName : component.minColumnNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
- out.writeInt(component.maxColumnNames.size());
- for (ByteBuffer columnName : component.maxColumnNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
+ out.writeInt(component.minClusteringValues.size());
+ for (ByteBuffer value : component.minClusteringValues)
+ ByteBufferUtil.writeWithShortLength(value, out);
+ out.writeInt(component.maxClusteringValues.size());
+ for (ByteBuffer value : component.maxClusteringValues)
+ ByteBufferUtil.writeWithShortLength(value, out);
out.writeBoolean(component.hasLegacyCounterShards);
+
+ if (version.storeRows())
+ {
+ out.writeLong(component.totalColumnsSet);
+ out.writeLong(component.totalRows);
+ }
++
+ if (version.hasCommitLogLowerBound())
+ ReplayPosition.serializer.serialize(component.commitLogLowerBound, out);
}
- public StatsMetadata deserialize(Version version, DataInput in) throws IOException
+ public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
{
- EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
+ EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
- ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+ ReplayPosition commitLogLowerBound = ReplayPosition.NONE, commitLogUpperBound;
+ commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
+ // We use MAX_VALUE as that's the default value for "no deletion time"
+ int minLocalDeletionTime = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
int maxLocalDeletionTime = in.readInt();
+ int minTTL = version.storeRows() ? in.readInt() : 0;
+ int maxTTL = version.storeRows() ? in.readInt() : Integer.MAX_VALUE;
double compressionRatio = in.readDouble();
StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
int sstableLevel = in.readInt();
@@@ -320,18 -276,15 +333,22 @@@
if (version.tracksLegacyCounterShards())
hasLegacyCounterShards = in.readBoolean();
+ long totalColumnsSet = version.storeRows() ? in.readLong() : -1L;
+ long totalRows = version.storeRows() ? in.readLong() : -1L;
+
+ if (version.hasCommitLogLowerBound())
+ commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
- return new StatsMetadata(rowSizes,
++
+ return new StatsMetadata(partitionSizes,
columnCounts,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
+ minLocalDeletionTime,
maxLocalDeletionTime,
+ minTTL,
+ maxTTL,
compressionRatio,
tombstoneHistogram,
sstableLevel,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-CompressionInfo.db
index 0000000,0000000..e32d12f
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Data.db
index 0000000,0000000..790cbe1
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Digest.crc32
index 0000000,0000000..66d1d34
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++163939259
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Filter.db
index 0000000,0000000..2e1d5d2
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Index.db
index 0000000,0000000..38d0537
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Statistics.db
index 0000000,0000000..a53b7c1
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-Summary.db
index 0000000,0000000..9b24e04
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-TOC.txt
index 0000000,0000000..fe9581f
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust/mb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Summary.db
++TOC.txt
++Digest.crc32
++Filter.db
++Statistics.db
++Data.db
++Index.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-CompressionInfo.db
index 0000000,0000000..e3d4377
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Data.db
index 0000000,0000000..c5aa7ce
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Digest.crc32
index 0000000,0000000..4e3bf89
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++3387396134
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Filter.db
index 0000000,0000000..2e1d5d2
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Index.db
index 0000000,0000000..54afd0c
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Statistics.db
index 0000000,0000000..9d9b085
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-Summary.db
index 0000000,0000000..9b24e04
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-TOC.txt
index 0000000,0000000..fe9581f
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_compact/mb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Summary.db
++TOC.txt
++Digest.crc32
++Filter.db
++Statistics.db
++Data.db
++Index.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-CompressionInfo.db
index 0000000,0000000..5a34d6e
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Data.db
index 0000000,0000000..55ad88d
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Digest.crc32
index 0000000,0000000..86240b5
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++346435755
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Filter.db
index 0000000,0000000..2e1d5d2
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Index.db
index 0000000,0000000..d1a401b
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Statistics.db
index 0000000,0000000..4f3f45a
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-Summary.db
index 0000000,0000000..9b24e04
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-TOC.txt
index 0000000,0000000..fe9581f
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter/mb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Summary.db
++TOC.txt
++Digest.crc32
++Filter.db
++Statistics.db
++Data.db
++Index.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-CompressionInfo.db
index 0000000,0000000..32fa731
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Data.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Data.db
index 0000000,0000000..ca10874
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Digest.crc32
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Digest.crc32
index 0000000,0000000..98f5784
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Digest.crc32
@@@ -1,0 -1,0 +1,1 @@@
++3740016396
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Filter.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Filter.db
index 0000000,0000000..2e1d5d2
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Index.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Index.db
index 0000000,0000000..3661d77
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Statistics.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Statistics.db
index 0000000,0000000..bb1e3fb
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Summary.db
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-Summary.db
index 0000000,0000000..9b24e04
new file mode 100644
Binary files differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-TOC.txt
----------------------------------------------------------------------
diff --cc test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-TOC.txt
index 0000000,0000000..fe9581f
new file mode 100644
--- /dev/null
+++ b/test/data/legacy-sstables/mb/legacy_tables/legacy_mb_clust_counter_compact/mb-1-big-TOC.txt
@@@ -1,0 -1,0 +1,8 @@@
++CompressionInfo.db
++Summary.db
++TOC.txt
++Digest.crc32
++Filter.db
++Statistics.db
++Data.db
++Index.db