You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/06 11:47:43 UTC
[5/9] cassandra git commit: Refactor MV code
http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 1b823aa..845a6ab 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -32,17 +32,15 @@ import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.*;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.partitions.AbstractBTreePartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.btree.BTreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,46 +48,18 @@ import org.slf4j.LoggerFactory;
* A View copies data from a base table into a view table which can be queried independently from the
* base. Every update which targets the base table must be fed through the {@link ViewManager} to ensure
* that if a view needs to be updated, the updates are properly created and fed into the view.
- *
- * This class does the job of translating the base row to the view row.
- *
- * It handles reading existing state and figuring out what tombstones need to be generated.
- *
- * {@link View#createMutations(AbstractBTreePartition, TemporalRow.Set, boolean)} is the "main method"
- *
*/
public class View
{
private static final Logger logger = LoggerFactory.getLogger(View.class);
- /**
- * The columns should all be updated together, so we use this object as group.
- */
- private static class Columns
- {
- //These are the base column definitions in terms of the *views* partitioning.
- //Meaning we can see (for example) the partition key of the view contains a clustering key
- //from the base table.
- public final List<ColumnDefinition> partitionDefs;
- public final List<ColumnDefinition> primaryKeyDefs;
- public final List<ColumnDefinition> baseComplexColumns;
-
- private Columns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
- {
- this.partitionDefs = partitionDefs;
- this.primaryKeyDefs = primaryKeyDefs;
- this.baseComplexColumns = baseComplexColumns;
- }
- }
-
public final String name;
private volatile ViewDefinition definition;
private final ColumnFamilyStore baseCfs;
- private Columns columns;
+ public volatile List<ColumnDefinition> baseNonPKColumnsInViewPK;
- private final boolean viewPKIncludesOnlyBasePKColumns;
private final boolean includeAllColumns;
private ViewBuilder builder;
@@ -104,12 +74,11 @@ public class View
ColumnFamilyStore baseCfs)
{
this.baseCfs = baseCfs;
-
- name = definition.viewName;
- includeAllColumns = definition.includeAllColumns;
-
- viewPKIncludesOnlyBasePKColumns = updateDefinition(definition);
+ this.name = definition.viewName;
+ this.includeAllColumns = definition.includeAllColumns;
this.rawSelect = definition.select;
+
+ updateDefinition(definition);
}
public ViewDefinition getDefinition()
@@ -118,513 +87,100 @@ public class View
}
/**
- * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
- *
- * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
- * tombstone checks.
- *
- * @param columns a list of columns to lookup in the base table
- * @param definitions lists to populate for the base table definitions
- * @return true if all view PKs are also Base PKs
- */
- private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
- {
- boolean allArePrimaryKeys = true;
- for (ColumnIdentifier identifier : columns)
- {
- ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
- assert cdef != null : "Could not resolve column " + identifier.toString();
-
- for (List<ColumnDefinition> list : definitions)
- {
- list.add(cdef);
- }
-
- allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
- }
-
- return allArePrimaryKeys;
- }
-
- /**
* This updates the columns stored which are dependent on the base CFMetaData.
*
* @return true if the view contains only columns which are part of the base's primary key; false if there is at
* least one column which is not.
*/
- public boolean updateDefinition(ViewDefinition definition)
+ public void updateDefinition(ViewDefinition definition)
{
this.definition = definition;
CFMetaData viewCfm = definition.metadata;
- List<ColumnDefinition> partitionDefs = new ArrayList<>(viewCfm.partitionKeyColumns().size());
- List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(viewCfm.partitionKeyColumns().size()
- + viewCfm.clusteringColumns().size());
- List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
-
- // We only add the partition columns to the partitions list, but both partition columns and clustering
- // columns are added to the primary keys list
- boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(Iterables.transform(viewCfm.partitionKeyColumns(), cd -> cd.name), primaryKeyDefs, partitionDefs);
- boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(Iterables.transform(viewCfm.clusteringColumns(), cd -> cd.name), primaryKeyDefs);
-
- for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
+ List<ColumnDefinition> nonPKDefPartOfViewPK = new ArrayList<>();
+ for (ColumnDefinition baseColumn : baseCfs.metadata.allColumns())
{
- if (cdef.isComplex() && definition.includes(cdef.name))
- {
- baseComplexColumns.add(cdef);
- }
+ ColumnDefinition viewColumn = getViewColumn(baseColumn);
+ if (viewColumn != null && !baseColumn.isPrimaryKeyColumn() && viewColumn.isPrimaryKeyColumn())
+ nonPKDefPartOfViewPK.add(baseColumn);
}
-
- this.columns = new Columns(partitionDefs, primaryKeyDefs, baseComplexColumns);
-
- return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
+ this.baseNonPKColumnsInViewPK = nonPKDefPartOfViewPK;
}
/**
- * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
- * <ul>
- * <li>View selects all columns</li>
- * <li>Update contains any range tombstones</li>
- * <li>Update touches one of the columns included in the view</li>
- * </ul>
- *
- * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
- * currently included in the view.
- *
- * @return true if {@param partition} modifies a column included in the view
+ * The view column corresponding to the provided base column. This <b>can</b>
+ * return {@code null} if the column is denormalized in the view.
*/
- public boolean updateAffectsView(AbstractBTreePartition partition)
+ public ColumnDefinition getViewColumn(ColumnDefinition baseColumn)
{
- ReadQuery selectQuery = getReadQuery();
-
- if (!partition.metadata().cfId.equals(definition.baseTableId))
- return false;
-
- if (!selectQuery.selectsKey(partition.partitionKey()))
- return false;
-
- // If there are range tombstones, tombstones will also need to be generated for the view
- // This requires a query of the base rows and generating tombstones for all of those values
- if (!partition.deletionInfo().isLive())
- return true;
-
- // Check each row for deletion or update
- for (Row row : partition)
- {
- if (!selectQuery.selectsClustering(partition.partitionKey(), row.clustering()))
- continue;
-
- if (includeAllColumns || !row.deletion().isLive())
- return true;
-
- if (row.primaryKeyLivenessInfo().isLive(FBUtilities.nowInSeconds()))
- return true;
-
- for (ColumnData data : row)
- {
- if (definition.metadata.getColumnDefinition(data.column().name) != null)
- return true;
- }
- }
-
- return false;
- }
-
- /**
- * Creates the clustering columns for the view based on the specified row and resolver policy
- *
- * @param temporalRow The current row
- * @param resolver The policy to use when selecting versions of cells use
- * @return The clustering object to use for the view
- */
- private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
- {
- CFMetaData viewCfm = definition.metadata;
- int numViewClustering = viewCfm.clusteringColumns().size();
- CBuilder clustering = CBuilder.create(viewCfm.comparator);
- for (int i = 0; i < numViewClustering; i++)
- {
- ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
- clustering.add(temporalRow.clusteringValue(definition, resolver));
- }
-
- return clustering.build();
- }
-
- /**
- * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
- */
- private PartitionUpdate createTombstone(TemporalRow temporalRow,
- DecoratedKey partitionKey,
- Row.Deletion deletion,
- TemporalRow.Resolver resolver,
- int nowInSec)
- {
- CFMetaData viewCfm = definition.metadata;
- Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
- builder.newRow(viewClustering(temporalRow, resolver));
- builder.addRowDeletion(deletion);
- return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
- }
-
- /**
- * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
- */
- private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
- DecoratedKey partitionKey,
- ColumnDefinition deletedColumn,
- DeletionTime deletionTime,
- TemporalRow.Resolver resolver,
- int nowInSec)
- {
- CFMetaData viewCfm = definition.metadata;
- Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
- builder.newRow(viewClustering(temporalRow, resolver));
- builder.addComplexDeletion(deletedColumn, deletionTime);
- return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+ return definition.metadata.getColumnDefinition(baseColumn.name);
}
/**
- * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
- * the TemporalRow and its Resolver
+ * The base column corresponding to the provided view column. This should
+ * never return {@code null} since a view can't have its "own" columns.
*/
- private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+ public ColumnDefinition getBaseColumn(ColumnDefinition viewColumn)
{
- List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
- Object[] partitionKey = new Object[partitionDefs.size()];
-
- for (int i = 0; i < partitionKey.length; i++)
- {
- ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
-
- if (value == null)
- return null;
-
- partitionKey[i] = value;
- }
-
- CFMetaData metadata = definition.metadata;
- return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
- .getKeyValidatorAsClusteringComparator()
- .make(partitionKey)));
+ ColumnDefinition baseColumn = baseCfs.metadata.getColumnDefinition(viewColumn.name);
+ assert baseColumn != null;
+ return baseColumn;
}
/**
- * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
- * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
- * mutation is necessary
- */
- private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
- {
- // Primary Key and Clustering columns do not generate tombstones
- if (viewPKIncludesOnlyBasePKColumns)
- return null;
-
- boolean hasUpdate = false;
- List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
- for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
- {
- if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
- hasUpdate = true;
- }
-
- if (!hasUpdate)
- return null;
-
- TemporalRow.Resolver resolver = TemporalRow.earliest;
- return createTombstone(temporalRow,
- viewPartitionKey(temporalRow, resolver),
- Row.Deletion.shadowable(new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec)),
- resolver,
- temporalRow.nowInSec);
- }
-
- /**
- * @return Mutation which is the transformed base table mutation for the view.
- */
- private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
- {
- TemporalRow.Resolver resolver = TemporalRow.latest;
-
- DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
- CFMetaData viewCfm = definition.metadata;
-
- if (partitionKey == null)
- {
- // Not having a partition key means we aren't updating anything
- return null;
- }
-
- Row.Builder regularBuilder = BTreeRow.unsortedBuilder(temporalRow.nowInSec);
-
- CBuilder clustering = CBuilder.create(viewCfm.comparator);
- for (int i = 0; i < viewCfm.clusteringColumns().size(); i++)
- {
- ColumnDefinition column = viewCfm.clusteringColumns().get(i);
- ByteBuffer value = temporalRow.clusteringValue(column, resolver);
-
- // handle single-column deletions correctly to avoid nulls in the view primary key
- if (value == null)
- return null;
-
- clustering.add(value);
- }
- regularBuilder.newRow(clustering.build());
- regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfm,
- temporalRow.viewClusteringTimestamp(),
- temporalRow.viewClusteringTtl(),
- temporalRow.viewClusteringLocalDeletionTime()));
-
- for (ColumnDefinition columnDefinition : viewCfm.allColumns())
- {
- if (columnDefinition.isPrimaryKeyColumn())
- continue;
-
- for (Cell cell : temporalRow.values(columnDefinition, resolver))
- {
- regularBuilder.addCell(cell);
- }
- }
-
- Row row = regularBuilder.build();
-
- // although we check for empty rows in updateAppliesToView(), if there are any good rows in the PartitionUpdate,
- // all rows in the partition will be processed, and we need to handle empty/non-live rows here (CASSANDRA-10614)
- if (row.isEmpty())
- return null;
-
- return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, row);
- }
-
- /**
- * @param partition Update which possibly contains deletion info for which to generate view tombstones.
- * @return View Tombstones which delete all of the rows which have been removed from the base table with
- * {@param partition}
- */
- private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition)
- {
- final TemporalRow.Resolver resolver = TemporalRow.earliest;
-
- DeletionInfo deletionInfo = partition.deletionInfo();
-
- List<Mutation> mutations = new ArrayList<>();
-
- // Check the complex columns to see if there are any which may have tombstones we need to create for the view
- if (!columns.baseComplexColumns.isEmpty())
- {
- for (Row row : partition)
- {
- if (!row.hasComplexDeletion())
- continue;
-
- TemporalRow temporalRow = rowSet.getClustering(row.clustering());
-
- assert temporalRow != null;
-
- for (ColumnDefinition definition : columns.baseComplexColumns)
- {
- ComplexColumnData columnData = row.getComplexColumnData(definition);
-
- if (columnData != null)
- {
- DeletionTime time = columnData.complexDeletion();
- if (!time.isLive())
- {
- DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
- if (targetKey != null)
- mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
- }
- }
- }
- }
- }
-
- ReadCommand command = null;
-
- if (!deletionInfo.isLive())
- {
- // We have to generate tombstones for all of the affected rows, but we don't have the information in order
- // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
- // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
- // entire partition of data which is not distributed on a single partition node.
- DecoratedKey dk = rowSet.dk;
-
- if (!deletionInfo.getPartitionDeletion().isLive())
- {
- command = getSelectStatement().internalReadForView(dk, rowSet.nowInSec);
- }
- else
- {
- SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
- Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
- while (tombstones.hasNext())
- {
- RangeTombstone tombstone = tombstones.next();
-
- builder.addSlice(tombstone.deletedSlice());
- }
-
- command = builder.build();
- }
- }
-
- if (command == null)
- {
- ReadQuery selectQuery = getReadQuery();
- SinglePartitionSliceBuilder builder = null;
- for (Row row : partition)
- {
- if (!row.deletion().isLive())
- {
- if (!selectQuery.selectsClustering(rowSet.dk, row.clustering()))
- continue;
-
- if (builder == null)
- builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
- builder.addSlice(Slice.make(row.clustering()));
- }
- }
-
- if (builder != null)
- command = builder.build();
- }
-
- if (command != null)
- {
- ReadQuery selectQuery = getReadQuery();
- assert selectQuery.selectsKey(rowSet.dk);
-
- // We may have already done this work for another MV update so check
- if (!rowSet.hasTombstonedExisting())
- {
- QueryPager pager = command.getPager(null, Server.CURRENT_VERSION);
-
- // Add all of the rows which were recovered from the query to the row set
- while (!pager.isExhausted())
- {
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
- {
- if (!iter.hasNext())
- break;
-
- try (RowIterator rowIterator = iter.next())
- {
- while (rowIterator.hasNext())
- {
- Row row = rowIterator.next();
- if (selectQuery.selectsClustering(rowSet.dk, row.clustering()))
- rowSet.addRow(row, false);
- }
- }
- }
- }
-
- //Incase we fetched nothing, avoid re checking on another MV update
- rowSet.setTombstonedExisting();
- }
-
- // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
- // for the view.
- for (TemporalRow temporalRow : rowSet)
- {
- DeletionTime deletionTime = temporalRow.deletionTime(partition);
- if (!deletionTime.isLive())
- {
- DecoratedKey value = viewPartitionKey(temporalRow, resolver);
- if (value != null)
- {
- PartitionUpdate update = createTombstone(temporalRow, value, Row.Deletion.regular(deletionTime), resolver, temporalRow.nowInSec);
- if (update != null)
- mutations.add(new Mutation(update));
- }
- }
- }
- }
-
- return !mutations.isEmpty() ? mutations : null;
- }
-
- /**
- * Read and update temporal rows in the set which have corresponding values stored on the local node
+ * Whether the view might be affected by the provided update.
+ * <p>
+ * Note that having this method return {@code true} is not an absolute guarantee that the view will be
+ * updated, just that it most likely will, but a {@code false} return guarantees it won't be affected).
+ *
+ * @param partitionKey the partition key that is updated.
+ * @param update the update being applied.
+ * @return {@code false} if we can guarantee that inserting {@code update} for key {@code partitionKey}
+ * won't affect the view in any way, {@code true} otherwise.
*/
- private void readLocalRows(TemporalRow.Set rowSet)
+ public boolean mayBeAffectedBy(DecoratedKey partitionKey, Row update)
{
- long start = System.currentTimeMillis();
- SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
-
- for (TemporalRow temporalRow : rowSet)
- builder.addSlice(temporalRow.baseSlice());
+ // We can guarantee that the view won't be affected if:
+ // - the clustering is excluded by the view filter (note that this isn't true of the filter on regular columns:
+ // even if an update don't match a view condition on a regular column, that update can still invalidate an pre-existing
+ // entry).
+ // - or the update don't modify any of the columns impacting the view (where "impacting" the view means that column is
+ // neither included in the view, nor used by the view filter).
+ if (!getReadQuery().selectsClustering(partitionKey, update.clustering()))
+ return false;
- QueryPager pager = builder.build().getPager(null, Server.CURRENT_VERSION);
+ // We want to find if the update modify any of the columns that are part of the view (in which case the view is affected).
+ // But if the view include all the base table columns, or the update has either a row deletion or a row liveness (note
+ // that for the liveness, it would be more "precise" to check if it's live, but pushing an update that is already expired
+ // is dump so it's ok not to optimize for it and it saves us from having to pass nowInSec to the method), we know the view
+ // is affected right away.
+ if (includeAllColumns || !update.deletion().isLive() || !update.primaryKeyLivenessInfo().isEmpty())
+ return true;
- while (!pager.isExhausted())
+ for (ColumnData data : update)
{
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
- {
- while (iter.hasNext())
- {
- try (RowIterator rows = iter.next())
- {
- while (rows.hasNext())
- {
- rowSet.addRow(rows.next(), false);
- }
- }
- }
- }
+ if (definition.metadata.getColumnDefinition(data.column().name) != null)
+ return true;
}
- baseCfs.metric.viewReadTime.update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
- }
-
- /**
- * @return Set of rows which are contained in the partition update {@param partition}
- */
- private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set<ColumnIdentifier> viewPrimaryKeyCols)
- {
-
- TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey());
-
- for (Row row : partition)
- rowSet.addRow(row, true);
-
- return rowSet;
+ return false;
}
/**
- * Splits the partition update up and adds the existing state to each row.
- * This data can be reused for multiple MV updates on the same base table
+ * Whether a given base row matches the view filter (and thus if is should have a corresponding entry).
+ * <p>
+ * Note that this differs from {@link #mayBeAffectedBy} in that the provide row <b>must</b> be the current
+ * state of the base row, not just some updates to it. This method also has no false positive: a base
+ * row either do or don't match the view filter.
*
- * @param partition the mutation
- * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
- * since all of the update will already be present in the base table.
- * @return The set of temoral rows contained in this update
+ * @param partitionKey the partition key that is updated.
+ * @param baseRow the current state of a particular base row.
+ * @param nowInSec the current time in seconds (to decide what is live and what isn't).
+ * @return {@code true} if {@code baseRow} matches the view filters, {@code false} otherwise.
*/
- public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set existing, boolean isBuilding)
+ public boolean matchesViewFilter(DecoratedKey partitionKey, Row baseRow, int nowInSec)
{
- if (!updateAffectsView(partition))
- return existing;
-
- Set<ColumnIdentifier> columns = new HashSet<>(this.columns.primaryKeyDefs.size());
- for (ColumnDefinition def : this.columns.primaryKeyDefs)
- columns.add(def.name);
-
- TemporalRow.Set rowSet;
- if (existing == null)
- {
- rowSet = separateRows(partition, columns);
-
- // If we are building the view, we do not want to add old values; they will always be the same
- if (!isBuilding)
- readLocalRows(rowSet);
- }
- else
- {
- rowSet = existing.withNewViewPrimaryKey(columns);
- }
-
- return rowSet;
+ return getReadQuery().selectsClustering(partitionKey, baseRow.clustering())
+ && getSelectStatement().rowFilterForInternalCalls().isSatisfiedBy(baseCfs.metadata, partitionKey, baseRow, nowInSec);
}
/**
@@ -656,61 +212,6 @@ public class View
return query;
}
- /**
- * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
- * since all of the update will already be present in the base table.
- * @return View mutations which represent the changes necessary as long as previously created mutations for the view
- * have been applied successfully. This is based solely on the changes that are necessary given the current
- * state of the base table and the newly applying partition data.
- */
- public Collection<Mutation> createMutations(AbstractBTreePartition partition, TemporalRow.Set rowSet, boolean isBuilding)
- {
- if (!updateAffectsView(partition))
- return null;
-
- ReadQuery selectQuery = getReadQuery();
- Collection<Mutation> mutations = null;
- for (TemporalRow temporalRow : rowSet)
- {
- // In updateAffectsView, we check the partition to see if there is at least one row that matches the
- // filters and is live. If there is more than one row in the partition, we need to re-check each one
- // individually.
- if (partition.rowCount() != 1 && !selectQuery.selectsClustering(partition.partitionKey(), temporalRow.baseClustering()))
- continue;
-
- // If we are building, there is no need to check for partition tombstones; those values will not be present
- // in the partition data
- if (!isBuilding)
- {
- PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
- if (partitionTombstone != null)
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.add(new Mutation(partitionTombstone));
- }
- }
-
- PartitionUpdate insert = createUpdatesForInserts(temporalRow);
- if (insert != null)
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.add(new Mutation(insert));
- }
- }
-
- if (!isBuilding)
- {
- Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
- if (deletion != null && !deletion.isEmpty())
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.addAll(deletion);
- }
- }
-
- return mutations;
- }
-
public synchronized void build()
{
if (this.builder != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 35b023b..b2b409b 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.view;
import java.util.Collection;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,9 +36,8 @@ import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.partitions.FilteredPartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
@@ -76,28 +76,22 @@ public class ViewBuilder extends CompactionInfo.Holder
if (!selectQuery.selectsKey(key))
return;
- QueryPager pager = view.getSelectStatement().internalReadForView(key, FBUtilities.nowInSeconds()).getPager(null, Server.CURRENT_VERSION);
+ int nowInSec = FBUtilities.nowInSeconds();
+ SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
- while (!pager.isExhausted())
+ // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
+ // and pretend that there is nothing pre-existing.
+ UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
+
+ Collection<Mutation> mutations;
+ try (ReadOrderGroup orderGroup = command.startOrderGroup();
+ UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
{
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
- {
- if (!partitionIterator.hasNext())
- return;
-
- try (RowIterator rowIterator = partitionIterator.next())
- {
- FilteredPartition partition = FilteredPartition.create(rowIterator);
- TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true);
-
- Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
-
- if (mutations != null)
- StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
- }
- }
+ mutations = baseCfs.keyspace.viewManager.forTable(baseCfs.metadata).generateViewUpdates(Collections.singleton(view), data, empty, nowInSec);
}
+
+ if (!mutations.isEmpty())
+ StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
}
public void run()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 9fe0544..fd04b97 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -19,23 +19,21 @@ package org.apache.cassandra.db.view;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Striped;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.service.StorageProxy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+
/**
* Manages {@link View}'s for a single {@link ColumnFamilyStore}. All of the views for that table are created when this
@@ -45,110 +43,30 @@ import org.slf4j.LoggerFactory;
* any views {@link ViewManager#updatesAffectView(Collection, boolean)}, provide locks to prevent multiple
* updates from creating incoherent updates in the view {@link ViewManager#acquireLockFor(ByteBuffer)}, and
* to affect change on the view.
+ *
+ * TODO: I think we can get rid of that class. For addition/removal of view by names, we could move it Keyspace. And we
+ * not sure it's even worth keeping viewsByName as none of the related operation are performance sensitive so we could
+ * find the view by iterating over the CFStore.viewManager directly.
+ * For the lock, it could move to Keyspace too, but I don't remmenber why it has to be at the keyspace level and if it
+ * can be at the table level, maybe that's where it should be.
*/
public class ViewManager
{
private static final Logger logger = LoggerFactory.getLogger(ViewManager.class);
- public class ForStore
- {
- private final ConcurrentNavigableMap<String, View> viewsByName;
-
- public ForStore()
- {
- this.viewsByName = new ConcurrentSkipListMap<>();
- }
-
- public Iterable<View> allViews()
- {
- return viewsByName.values();
- }
-
- public Iterable<ColumnFamilyStore> allViewsCfs()
- {
- List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
- for (View view : allViews())
- viewColumnFamilies.add(keyspace.getColumnFamilyStore(view.getDefinition().viewName));
- return viewColumnFamilies;
- }
-
- public void forceBlockingFlush()
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- viewCfs.forceBlockingFlush();
- }
-
- public void dumpMemtables()
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- viewCfs.dumpMemtable();
- }
-
- public void truncateBlocking(long truncatedAt)
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- {
- ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
- SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
- }
- }
-
- public void addView(View view)
- {
- viewsByName.put(view.name, view);
- }
-
- public void removeView(String name)
- {
- viewsByName.remove(name);
- }
- }
-
private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentViewWriters() * 1024);
private static final boolean enableCoordinatorBatchlog = Boolean.getBoolean("cassandra.mv_enable_coordinator_batchlog");
- private final ConcurrentNavigableMap<UUID, ForStore> viewManagersByStore;
- private final ConcurrentNavigableMap<String, View> viewsByName;
+ private final ConcurrentMap<String, View> viewsByName = new ConcurrentHashMap<>();
+ private final ConcurrentMap<UUID, TableViews> viewsByBaseTable = new ConcurrentHashMap<>();
private final Keyspace keyspace;
public ViewManager(Keyspace keyspace)
{
- this.viewManagersByStore = new ConcurrentSkipListMap<>();
- this.viewsByName = new ConcurrentSkipListMap<>();
this.keyspace = keyspace;
}
- /**
- * Calculates and pushes updates to the views replicas. The replicas are determined by
- * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
- */
- public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong baseComplete)
- {
- List<Mutation> mutations = null;
- TemporalRow.Set temporalRows = null;
- for (Map.Entry<String, View> view : viewsByName.entrySet())
- {
- // Make sure that we only get mutations from views which are affected since the set includes all views for a
- // keyspace. This will prevent calling getTemporalRowSet for the wrong base table.
- if (view.getValue().updateAffectsView(update))
- {
- temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false);
-
- Collection<Mutation> viewMutations = view.getValue().createMutations(update, temporalRows, false);
- if (viewMutations != null && !viewMutations.isEmpty())
- {
- if (mutations == null)
- mutations = Lists.newLinkedList();
- mutations.addAll(viewMutations);
- }
- }
- }
-
- if (mutations != null)
- StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete);
- }
-
public boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean coordinatorBatchlog)
{
if (coordinatorBatchlog && !enableCoordinatorBatchlog)
@@ -156,25 +74,22 @@ public class ViewManager
for (IMutation mutation : mutations)
{
- for (PartitionUpdate cf : mutation.getPartitionUpdates())
+ for (PartitionUpdate update : mutation.getPartitionUpdates())
{
- assert keyspace.getName().equals(cf.metadata().ksName);
+ assert keyspace.getName().equals(update.metadata().ksName);
if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
continue;
- for (View view : allViews())
- {
- if (view.updateAffectsView(cf))
- return true;
- }
+ if (!forTable(update.metadata()).updatedViews(update).isEmpty())
+ return true;
}
}
return false;
}
- public Iterable<View> allViews()
+ private Iterable<View> allViews()
{
return viewsByName.values();
}
@@ -222,7 +137,7 @@ public class ViewManager
public void addView(ViewDefinition definition)
{
View view = new View(definition, keyspace.getColumnFamilyStore(definition.baseTableId));
- forTable(view.getDefinition().baseTableId).addView(view);
+ forTable(view.getDefinition().baseTableMetadata()).add(view);
viewsByName.put(definition.viewName, view);
}
@@ -233,7 +148,7 @@ public class ViewManager
if (view == null)
return;
- forTable(view.getDefinition().baseTableId).removeView(name);
+ forTable(view.getDefinition().baseTableMetadata()).removeByName(name);
SystemKeyspace.setViewRemoved(keyspace.getName(), view.name);
}
@@ -243,17 +158,18 @@ public class ViewManager
view.build();
}
- public ForStore forTable(UUID baseId)
+ public TableViews forTable(CFMetaData metadata)
{
- ForStore forStore = viewManagersByStore.get(baseId);
- if (forStore == null)
+ UUID baseId = metadata.cfId;
+ TableViews views = viewsByBaseTable.get(baseId);
+ if (views == null)
{
- forStore = new ForStore();
- ForStore previous = viewManagersByStore.put(baseId, forStore);
+ views = new TableViews(metadata);
+ TableViews previous = viewsByBaseTable.putIfAbsent(baseId, views);
if (previous != null)
- forStore = previous;
+ views = previous;
}
- return forStore;
+ return views;
}
public static Lock acquireLockFor(ByteBuffer key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
new file mode 100644
index 0000000..af025cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+
+/**
+ * Creates the updates to apply to a view given the existing rows in the base
+ * table and the updates that we're applying to them (this handles updates
+ * on a single partition only).
+ *
+ * This class is used by passing the updates made to the base table to
+ * {@link #addBaseTableUpdate} and calling {@link #generateViewUpdates} once all updates have
+ * been handled to get the resulting view mutations.
+ */
+public class ViewUpdateGenerator
+{
+ private final View view;
+ private final int nowInSec;
+
+ private final CFMetaData baseMetadata;
+ private final DecoratedKey baseDecoratedKey;
+ private final ByteBuffer[] basePartitionKey;
+
+ private final CFMetaData viewMetadata;
+
+ private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>();
+
+ // Reused internally to build a new entry
+ private final ByteBuffer[] currentViewEntryPartitionKey;
+ private final Row.Builder currentViewEntryBuilder;
+
+ /**
+ * The type of type update action to perform to the view for a given base table
+ * update.
+ */
+ private enum UpdateAction
+ {
+ NONE, // There was no view entry and none should be added
+ NEW_ENTRY, // There was no entry but there is one post-update
+ DELETE_OLD, // There was an entry but there is nothing after update
+ UPDATE_EXISTING, // There was an entry and the update modifies it
+ SWITCH_ENTRY // There was an entry and there is still one after update,
+ // but they are not the same one.
+ };
+
+ /**
+ * Creates a new {@code ViewUpdateBuilder}.
+ *
+ * @param view the view for which this will be building updates for.
+ * @param basePartitionKey the partition key for the base table partition for which
+ * we'll handle updates for.
+ * @param nowInSec the current time in seconds. Used to decide if data are live or not
+ * and as base reference for new deletions.
+ */
+ public ViewUpdateGenerator(View view, DecoratedKey basePartitionKey, int nowInSec)
+ {
+ this.view = view;
+ this.nowInSec = nowInSec;
+
+ this.baseMetadata = view.getDefinition().baseTableMetadata();
+ this.baseDecoratedKey = basePartitionKey;
+ this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.getKeyValidator());
+
+ this.viewMetadata = view.getDefinition().metadata;
+
+ this.currentViewEntryPartitionKey = new ByteBuffer[viewMetadata.partitionKeyColumns().size()];
+ this.currentViewEntryBuilder = BTreeRow.sortedBuilder();
+ }
+
+ private static ByteBuffer[] extractKeyComponents(DecoratedKey partitionKey, AbstractType<?> type)
+ {
+ return type instanceof CompositeType
+ ? ((CompositeType)type).split(partitionKey.getKey())
+ : new ByteBuffer[]{ partitionKey.getKey() };
+ }
+
+ /**
+ * Adds to this generator the updates to be made to the view given a base table row
+ * before and after an update.
+ *
+ * @param existingBaseRow the base table row as it is before an update.
+ * @param mergedBaseRow the base table row after the update is applied (note that
+ * this is not just the new update, but rather the resulting row).
+ */
+ public void addBaseTableUpdate(Row existingBaseRow, Row mergedBaseRow)
+ {
+ switch (updateAction(existingBaseRow, mergedBaseRow))
+ {
+ case NONE:
+ return;
+ case NEW_ENTRY:
+ createEntry(mergedBaseRow);
+ return;
+ case DELETE_OLD:
+ deleteOldEntry(existingBaseRow);
+ return;
+ case UPDATE_EXISTING:
+ updateEntry(existingBaseRow, mergedBaseRow);
+ return;
+ case SWITCH_ENTRY:
+ createEntry(mergedBaseRow);
+ deleteOldEntry(existingBaseRow);
+ return;
+ }
+ }
+
+ /**
+ * Returns the updates that needs to be done to the view given the base table updates
+ * passed to {@link #generateViewMutations}.
+ *
+ * @return the updates to do to the view.
+ */
+ public Collection<PartitionUpdate> generateViewUpdates()
+ {
+ return updates.values();
+ }
+
+ /**
+ * Compute which type of action needs to be performed to the view for a base table row
+ * before and after an update.
+ */
+ private UpdateAction updateAction(Row existingBaseRow, Row mergedBaseRow)
+ {
+ // Having existing empty is useful, it just means we'll insert a brand new entry for mergedBaseRow,
+ // but if we have no update at all, we shouldn't get there.
+ assert !mergedBaseRow.isEmpty();
+
+ // Note that none of the base PK columns will differ since we're intrinsically dealing
+ // with the same base row. So we have to check 3 things:
+ // 1) that the clustering doesn't have a null, which can happen for compact tables. If that's the case,
+ // there is no corresponding entries.
+ // 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update.
+ // 3) whether mergedBaseRow actually match the view SELECT filter
+
+ if (baseMetadata.isCompactTable())
+ {
+ Clustering clustering = mergedBaseRow.clustering();
+ for (int i = 0; i < clustering.size(); i++)
+ {
+ if (clustering.get(i) == null)
+ return UpdateAction.NONE;
+ }
+ }
+
+ assert view.baseNonPKColumnsInViewPK.size() <= 1 : "We currently only support one base non-PK column in the view PK";
+ if (view.baseNonPKColumnsInViewPK.isEmpty())
+ {
+ // The view entry is necessarily the same pre and post update.
+
+ // Note that we allow existingBaseRow to be null and treat it as empty (see MultiViewUpdateBuilder.generateViewsMutations).
+ boolean existingHasLiveData = existingBaseRow != null && existingBaseRow.hasLiveData(nowInSec);
+ boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec);
+ return existingHasLiveData
+ ? (mergedHasLiveData ? UpdateAction.UPDATE_EXISTING : UpdateAction.DELETE_OLD)
+ : (mergedHasLiveData ? UpdateAction.NEW_ENTRY : UpdateAction.NONE);
+ }
+
+ ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0);
+ assert !baseColumn.isComplex() : "A complex column couldn't be part of the view PK";
+ Cell before = existingBaseRow == null ? null : existingBaseRow.getCell(baseColumn);
+ Cell after = mergedBaseRow.getCell(baseColumn);
+
+ // If the update didn't modified this column, the cells will be the same object so it's worth checking
+ if (before == after)
+ return before == null ? UpdateAction.NONE : UpdateAction.UPDATE_EXISTING;
+
+ if (!isLive(before))
+ return isLive(after) ? UpdateAction.NEW_ENTRY : UpdateAction.NONE;
+ if (!isLive(after))
+ return UpdateAction.DELETE_OLD;
+
+ return baseColumn.cellValueType().compare(before.value(), after.value()) == 0
+ ? UpdateAction.UPDATE_EXISTING
+ : UpdateAction.SWITCH_ENTRY;
+ }
+
+ private boolean matchesViewFilter(Row baseRow)
+ {
+ return view.matchesViewFilter(baseDecoratedKey, baseRow, nowInSec);
+ }
+
+ private boolean isLive(Cell cell)
+ {
+ return cell != null && cell.isLive(nowInSec);
+ }
+
+ /**
+ * Creates a view entry corresponding to the provided base row.
+ * <p>
+ * This method checks that the base row does match the view filter before applying it.
+ */
+ private void createEntry(Row baseRow)
+ {
+ // Before create a new entry, make sure it matches the view filter
+ if (!matchesViewFilter(baseRow))
+ return;
+
+ startNewUpdate(baseRow);
+ currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(baseRow));
+ currentViewEntryBuilder.addRowDeletion(baseRow.deletion());
+
+ for (ColumnData data : baseRow)
+ {
+ ColumnDefinition viewColumn = view.getViewColumn(data.column());
+ // If that base table column is not denormalized in the view, we had nothing to do.
+ // Alose, if it's part of the view PK it's already been taken into account in the clustering.
+ if (viewColumn == null || viewColumn.isPrimaryKeyColumn())
+ continue;
+
+ addColumnData(viewColumn, data);
+ }
+
+ submitUpdate();
+ }
+
+ /**
+ * Creates the updates to apply to the existing view entry given the base table row before
+ * and after the update, assuming that the update hasn't changed to which view entry the
+ * row correspond (that is, we know the columns composing the view PK haven't changed).
+ * <p>
+ * This method checks that the base row (before and after) does match the view filter before
+ * applying anything.
+ */
+ private void updateEntry(Row existingBaseRow, Row mergedBaseRow)
+ {
+ // While we know existingBaseRow and mergedBaseRow are corresponding to the same view entry,
+ // they may not match the view filter.
+ if (!matchesViewFilter(existingBaseRow))
+ {
+ createEntry(mergedBaseRow);
+ return;
+ }
+ if (!matchesViewFilter(mergedBaseRow))
+ {
+ deleteOldEntryInternal(existingBaseRow);
+ return;
+ }
+
+ startNewUpdate(mergedBaseRow);
+
+ // In theory, it may be the PK liveness and row deletion hasn't been change by the update
+ // and we could condition the 2 additions below. In practice though, it's as fast (if not
+ // faster) to compute those info than to check if they have changed so we keep it simple.
+ currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(mergedBaseRow));
+ currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion());
+
+ // We only add to the view update the cells from mergedBaseRow that differs from
+ // existingBaseRow. For that and for speed we can just cell pointer equality: if the update
+ // hasn't touched a cell, we know it will be the same object in existingBaseRow and
+ // mergedBaseRow (note that including more cells than we strictly should isn't a problem
+ // for correction, so even if the code change and pointer equality don't work anymore, it'll
+ // only a slightly inefficiency which we can fix then).
+ // Note: we could alternatively use Rows.diff() for this, but because it is a bit more generic
+ // than what we need here, it's also a bit less efficient (it allocates more in particular),
+ // and this might be called a lot of time for view updates. So, given that this is not a whole
+ // lot of code anyway, it's probably doing the diff manually.
+ PeekingIterator<ColumnData> existingIter = Iterators.peekingIterator(existingBaseRow.iterator());
+ for (ColumnData mergedData : mergedBaseRow)
+ {
+ ColumnDefinition baseColumn = mergedData.column();
+ ColumnDefinition viewColumn = view.getViewColumn(baseColumn);
+ // If that base table column is not denormalized in the view, we had nothing to do.
+ // Alose, if it's part of the view PK it's already been taken into account in the clustering.
+ if (viewColumn == null || viewColumn.isPrimaryKeyColumn())
+ continue;
+
+ ColumnData existingData = null;
+ // Find if there is data for that column in the existing row
+ while (existingIter.hasNext())
+ {
+ int cmp = baseColumn.compareTo(existingIter.peek().column());
+ if (cmp < 0)
+ break;
+
+ ColumnData next = existingIter.next();
+ if (cmp == 0)
+ {
+ existingData = next;
+ break;
+ }
+ }
+
+ if (existingData == null)
+ {
+ addColumnData(viewColumn, mergedData);
+ continue;
+ }
+
+ if (mergedData == existingData)
+ continue;
+
+ if (baseColumn.isComplex())
+ {
+ ComplexColumnData mergedComplexData = (ComplexColumnData)mergedData;
+ ComplexColumnData existingComplexData = (ComplexColumnData)existingData;
+ if (mergedComplexData.complexDeletion().supersedes(existingComplexData.complexDeletion()))
+ currentViewEntryBuilder.addComplexDeletion(viewColumn, mergedComplexData.complexDeletion());
+
+ PeekingIterator<Cell> existingCells = Iterators.peekingIterator(existingComplexData.iterator());
+ for (Cell mergedCell : mergedComplexData)
+ {
+ Cell existingCell = null;
+ // Find if there is corresponding cell in the existing row
+ while (existingCells.hasNext())
+ {
+ int cmp = baseColumn.cellPathComparator().compare(mergedCell.path(), existingCells.peek().path());
+ if (cmp > 0)
+ break;
+
+ Cell next = existingCells.next();
+ if (cmp == 0)
+ {
+ existingCell = next;
+ break;
+ }
+ }
+
+ if (mergedCell != existingCell)
+ addCell(viewColumn, mergedCell);
+ }
+ }
+ else
+ {
+ // Note that we've already eliminated the case where merged == existing
+ addCell(viewColumn, (Cell)mergedData);
+ }
+ }
+
+ submitUpdate();
+ }
+
+ /**
+ * Deletes the view entry corresponding to the provided base row.
+ * <p>
+ * This method checks that the base row does match the view filter before bothering.
+ */
+ private void deleteOldEntry(Row existingBaseRow)
+ {
+ // Before deleting an old entry, make sure it was matching the view filter (otherwise there is nothing to delete)
+ if (!matchesViewFilter(existingBaseRow))
+ return;
+
+ deleteOldEntryInternal(existingBaseRow);
+ }
+
+ private void deleteOldEntryInternal(Row existingBaseRow)
+ {
+ startNewUpdate(existingBaseRow);
+ DeletionTime dt = new DeletionTime(computeTimestampForEntryDeletion(existingBaseRow), nowInSec);
+ currentViewEntryBuilder.addRowDeletion(Row.Deletion.shadowable(dt));
+ submitUpdate();
+ }
+
+ /**
+ * Computes the partition key and clustering for a new view entry, and setup the internal
+ * row builder for the new row.
+ *
+ * This assumes that there is corresponding entry, i.e. no values for the partition key and
+ * clustering are null (since we have eliminated that case through updateAction).
+ */
+ private void startNewUpdate(Row baseRow)
+ {
+ ByteBuffer[] clusteringValues = new ByteBuffer[viewMetadata.clusteringColumns().size()];
+ for (ColumnDefinition viewColumn : viewMetadata.primaryKeyColumns())
+ {
+ ColumnDefinition baseColumn = view.getBaseColumn(viewColumn);
+ ByteBuffer value = getValueForPK(baseColumn, baseRow);
+ if (viewColumn.isPartitionKey())
+ currentViewEntryPartitionKey[viewColumn.position()] = value;
+ else
+ clusteringValues[viewColumn.position()] = value;
+ }
+
+ currentViewEntryBuilder.newRow(new Clustering(clusteringValues));
+ }
+
+ private LivenessInfo computeLivenessInfoForEntry(Row baseRow)
+ {
+ /*
+ * We need to compute both the timestamp and expiration.
+ *
+ * For the timestamp, it makes sense to use the bigger timestamp for all view PK columns.
+ *
+ * This is more complex for the expiration. We want to maintain consistency between the base and the view, so the
+ * entry should only exist as long as the base row exists _and_ has non-null values for all the columns that are part
+ * of the view PK.
+ * Which means we really have 2 cases:
+ * 1) either the columns for the base and view PKs are exactly the same: in that case, the view entry should live
+ * as long as the base row lives. This means the view entry should only expire once *everything* in the base row
+ * has expired. Which means the row TTL should be the max of any other TTL.
+ * 2) or there is a column that is not in the base PK but is in the view PK (we can only have one so far, we'll need
+ * to slightly adapt if we allow more later): in that case, as long as that column lives the entry does too, but
+ * as soon as it expires (or is deleted for that matter) the entry also should expire. So the expiration for the
+ * view is the one of that column, irregarding of any other expiration.
+ * To take an example of that case, if you have:
+ * CREATE TABLE t (a int, b int, c int, PRIMARY KEY (a, b))
+ * CREATE MATERIALIZED VIEW mv AS SELECT * FROM t WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)
+ * INSERT INTO t(a, b) VALUES (0, 0) USING TTL 3;
+ * UPDATE t SET c = 0 WHERE a = 0 AND b = 0;
+ * then even after 3 seconds elapsed, the row will still exist (it just won't have a "row marker" anymore) and so
+ * the MV should still have a corresponding entry.
+ */
+ assert view.baseNonPKColumnsInViewPK.size() <= 1; // This may change, but is currently an enforced limitation
+
+ LivenessInfo baseLiveness = baseRow.primaryKeyLivenessInfo();
+
+ if (view.baseNonPKColumnsInViewPK.isEmpty())
+ {
+ int ttl = baseLiveness.ttl();
+ int expirationTime = baseLiveness.localExpirationTime();
+ for (Cell cell : baseRow.cells())
+ {
+ if (cell.ttl() > ttl)
+ {
+ ttl = cell.ttl();
+ expirationTime = cell.localDeletionTime();
+ }
+ }
+ return ttl == baseLiveness.ttl()
+ ? baseLiveness
+ : LivenessInfo.create(baseLiveness.timestamp(), ttl, expirationTime);
+ }
+
+ ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0);
+ Cell cell = baseRow.getCell(baseColumn);
+ assert isLive(cell) : "We shouldn't have got there is the base row had no associated entry";
+
+ long timestamp = Math.max(baseLiveness.timestamp(), cell.timestamp());
+ return LivenessInfo.create(timestamp, cell.ttl(), cell.localDeletionTime());
+ }
+
+ private long computeTimestampForEntryDeletion(Row baseRow)
+ {
+ // We delete the old row with it's row entry timestamp using a shadowable deletion.
+ // We must make sure that the deletion deletes everything in the entry (or the entry will
+ // still show up), so we must use the bigger timestamp found in the existing row (for any
+ // column included in the view at least).
+ // TODO: We have a problem though: if the entry is "resurected" by a later update, we would
+ // need to ensure that the timestamp for then entry then is bigger than the tombstone
+ // we're just inserting, which is not currently guaranteed.
+ // This is a bug for a separate ticket though.
+ long timestamp = baseRow.primaryKeyLivenessInfo().timestamp();
+ for (ColumnData data : baseRow)
+ {
+ if (!view.getDefinition().includes(data.column().name))
+ continue;
+
+ timestamp = Math.max(timestamp, data.maxTimestamp());
+ }
+ return timestamp;
+ }
+
+ private void addColumnData(ColumnDefinition viewColumn, ColumnData baseTableData)
+ {
+ assert viewColumn.isComplex() == baseTableData.column().isComplex();
+ if (!viewColumn.isComplex())
+ {
+ addCell(viewColumn, (Cell)baseTableData);
+ return;
+ }
+
+ ComplexColumnData complexData = (ComplexColumnData)baseTableData;
+ currentViewEntryBuilder.addComplexDeletion(viewColumn, complexData.complexDeletion());
+ for (Cell cell : complexData)
+ addCell(viewColumn, cell);
+ }
+
+ private void addCell(ColumnDefinition viewColumn, Cell baseTableCell)
+ {
+ assert !viewColumn.isPrimaryKeyColumn();
+ currentViewEntryBuilder.addCell(baseTableCell.withUpdatedColumn(viewColumn));
+ }
+
+ /**
+ * Finish building the currently updated view entry and add it to the other built
+ * updates.
+ */
+ private void submitUpdate()
+ {
+ Row row = currentViewEntryBuilder.build();
+ // I'm not sure we can reach there is there is nothing is updated, but adding an empty row breaks things
+ // and it costs us nothing to be prudent here.
+ if (row.isEmpty())
+ return;
+
+ DecoratedKey partitionKey = makeCurrentPartitionKey();
+ PartitionUpdate update = updates.get(partitionKey);
+ if (update == null)
+ {
+ // We can't really know which columns of the view will be updated nor how many row will be updated for this key
+ // so we rely on hopefully sane defaults.
+ update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.partitionColumns(), 4);
+ updates.put(partitionKey, update);
+ }
+ update.add(row);
+ }
+
+ private DecoratedKey makeCurrentPartitionKey()
+ {
+ ByteBuffer rawKey = viewMetadata.partitionKeyColumns().size() == 1
+ ? currentViewEntryPartitionKey[0]
+ : CompositeType.build(currentViewEntryPartitionKey);
+
+ return viewMetadata.decorateKey(rawKey);
+ }
+
+ private ByteBuffer getValueForPK(ColumnDefinition column, Row row)
+ {
+ switch (column.kind)
+ {
+ case PARTITION_KEY:
+ return basePartitionKey[column.position()];
+ case CLUSTERING:
+ return row.clustering().get(column.position());
+ default:
+ // This shouldn't NPE as we shouldn't get there if the value can be null (or there is a bug in updateAction())
+ return row.getCell(column).value();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index ac4becb..4a1dc07 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -98,7 +98,7 @@ public class ViewTest extends CQLTester
@Test
public void testPartitionTombstone() throws Throwable
{
- createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1))");
+ createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1, c1))");
execute("USE " + keyspace());
executeNet(protocolVersion, "USE " + keyspace());
@@ -108,8 +108,8 @@ public class ViewTest extends CQLTester
updateView("INSERT INTO %s (k1, c1, val) VALUES (1, 2, 200)");
updateView("INSERT INTO %s (k1, c1, val) VALUES (1, 3, 300)");
- Assert.assertEquals(1, execute("select * from %s").size());
- Assert.assertEquals(1, execute("select * from view1").size());
+ Assert.assertEquals(2, execute("select * from %s").size());
+ Assert.assertEquals(2, execute("select * from view1").size());
updateView("DELETE FROM %s WHERE k1 = 1");
@@ -814,18 +814,58 @@ public class ViewTest extends CQLTester
createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
- updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TTL 5", 1, 1, 1, 1);
+ updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TTL 3", 1, 1, 1, 1);
- Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
updateView("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 1, 1, 2);
- Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(5));
List<Row> results = executeNet(protocolVersion, "SELECT d FROM mv WHERE c = 2 AND a = 1 AND b = 1").all();
Assert.assertEquals(1, results.size());
Assert.assertTrue("There should be a null result given back due to ttl expiry", results.get(0).isNull(0));
}
@Test
+ public void ttlExpirationTest() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ "a int," +
+ "b int," +
+ "c int," +
+ "d int," +
+ "PRIMARY KEY (a, b))");
+
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
+
+ updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TTL 3", 1, 1, 1, 1);
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(4));
+ Assert.assertEquals(0, executeNet(protocolVersion, "SELECT * FROM mv WHERE c = 1 AND a = 1 AND b = 1").all().size());
+ }
+
+ @Test
+ public void rowDeletionTest() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ "a int," +
+ "b int," +
+ "c int," +
+ "d int," +
+ "PRIMARY KEY (a, b))");
+
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
+
+ String table = keyspace() + "." + currentTable();
+ updateView("DELETE FROM " + table + " USING TIMESTAMP 6 WHERE a = 1 AND b = 1;");
+ updateView("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?) USING TIMESTAMP 3", 1, 1, 1, 1);
+ Assert.assertEquals(0, executeNet(protocolVersion, "SELECT * FROM mv WHERE c = 1 AND a = 1 AND b = 1").all().size());
+ }
+
+ @Test
public void conflictingTimestampTest() throws Throwable
{
createTable("CREATE TABLE %s (" +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/86ba2274/test/unit/org/apache/cassandra/db/rows/RowsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
index dede867..b47bea2 100644
--- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
@@ -239,7 +239,8 @@ public class RowsTest
BufferCell.live(kcvm, m, secondToTs(now), BB1, CellPath.create(BB1)),
BufferCell.live(kcvm, m, secondToTs(now), BB2, CellPath.create(BB2)));
expectedCells.forEach(originalBuilder::addCell);
- Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts, now), false);
+ // We need to use ts-1 so the deletion doesn't shadow what we've created
+ Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts-1, now), false);
originalBuilder.addRowDeletion(rowDeletion);
RowBuilder builder = new RowBuilder();
@@ -267,7 +268,8 @@ public class RowsTest
BufferCell.live(kcvm, m, ts, BB1, CellPath.create(BB1)),
BufferCell.live(kcvm, m, ts, BB2, CellPath.create(BB2)));
expectedCells.forEach(builder::addCell);
- Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts, now), false);
+ // We need to use ts-1 so the deletion doesn't shadow what we've created
+ Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts-1, now), false);
builder.addRowDeletion(rowDeletion);
StatsCollector collector = new StatsCollector();