You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/06 11:47:45 UTC

[7/9] cassandra git commit: Merge commit '86ba227477b9f8595eb610ecaf950cfbc29dd36b' into cassandra-3.7

Merge commit '86ba227477b9f8595eb610ecaf950cfbc29dd36b' into cassandra-3.7


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a87fd715
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a87fd715
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a87fd715

Branch: refs/heads/trunk
Commit: a87fd715d6b26128603a404074ec3df42a595b2e
Parents: 4e364d7 86ba227
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri May 6 13:43:44 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri May 6 13:44:12 2016 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |   6 +
 .../apache/cassandra/config/ViewDefinition.java |   1 -
 .../cql3/statements/CreateViewStatement.java    |   4 +-
 .../cql3/statements/SelectStatement.java        |  41 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   6 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +-
 .../db/SinglePartitionReadCommand.java          |  33 +
 src/java/org/apache/cassandra/db/Slices.java    |   7 +
 .../apache/cassandra/db/filter/RowFilter.java   |  24 +
 .../SingletonUnfilteredPartitionIterator.java   |   3 +-
 .../apache/cassandra/db/rows/AbstractCell.java  |   5 +
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  34 +-
 .../apache/cassandra/db/rows/BufferCell.java    |   5 +
 src/java/org/apache/cassandra/db/rows/Cell.java |   2 +
 .../apache/cassandra/db/rows/ColumnData.java    |   2 +
 .../cassandra/db/rows/ComplexColumnData.java    |   8 +
 .../apache/cassandra/db/rows/NativeCell.java    |   5 +
 src/java/org/apache/cassandra/db/rows/Row.java  |  35 +-
 .../cassandra/db/rows/RowDiffListener.java      |   2 +-
 .../db/rows/UnfilteredRowIterators.java         |   2 +-
 .../apache/cassandra/db/view/TableViews.java    | 481 ++++++++++++++
 .../apache/cassandra/db/view/TemporalRow.java   | 601 ------------------
 src/java/org/apache/cassandra/db/view/View.java | 629 ++-----------------
 .../apache/cassandra/db/view/ViewBuilder.java   |  38 +-
 .../apache/cassandra/db/view/ViewManager.java   | 146 +----
 .../cassandra/db/view/ViewUpdateGenerator.java  | 549 ++++++++++++++++
 .../org/apache/cassandra/cql3/ViewTest.java     |  52 +-
 .../org/apache/cassandra/db/rows/RowsTest.java  |   6 +-
 28 files changed, 1400 insertions(+), 1329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/config/ViewDefinition.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/NativeCell.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/NativeCell.java
index 9d816f3,0000000..5930332
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@@ -1,151 -1,0 +1,156 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.rows;
 +
 +import java.nio.ByteBuffer;
 +import java.nio.ByteOrder;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.utils.ObjectSizes;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.memory.MemoryUtil;
 +import org.apache.cassandra.utils.memory.NativeAllocator;
 +
 +public class NativeCell extends AbstractCell
 +{
 +    private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeCell());
 +
 +    private static final long HAS_CELLPATH = 0;
 +    private static final long TIMESTAMP = 1;
 +    private static final long TTL = 9;
 +    private static final long DELETION = 13;
 +    private static final long LENGTH = 17;
 +    private static final long VALUE = 21;
 +
 +    private final long peer;
 +
 +    private NativeCell()
 +    {
 +        super(null);
 +        this.peer = 0;
 +    }
 +
 +    public NativeCell(NativeAllocator allocator,
 +                      OpOrder.Group writeOp,
 +                      Cell cell)
 +    {
 +        this(allocator,
 +             writeOp,
 +             cell.column(),
 +             cell.timestamp(),
 +             cell.ttl(),
 +             cell.localDeletionTime(),
 +             cell.value(),
 +             cell.path());
 +    }
 +
 +    public NativeCell(NativeAllocator allocator,
 +                      OpOrder.Group writeOp,
 +                      ColumnDefinition column,
 +                      long timestamp,
 +                      int ttl,
 +                      int localDeletionTime,
 +                      ByteBuffer value,
 +                      CellPath path)
 +    {
 +        super(column);
 +        long size = simpleSize(value.remaining());
 +
 +        assert value.order() == ByteOrder.BIG_ENDIAN;
 +        assert column.isComplex() == (path != null);
 +        if (path != null)
 +        {
 +            assert path.size() == 1;
 +            size += 4 + path.get(0).remaining();
 +        }
 +
 +        if (size > Integer.MAX_VALUE)
 +            throw new IllegalStateException();
 +
 +        // cellpath? : timestamp : ttl : localDeletionTime : length : <data> : [cell path length] : [<cell path data>]
 +        peer = allocator.allocate((int) size, writeOp);
 +        MemoryUtil.setByte(peer + HAS_CELLPATH, (byte)(path == null ? 0 : 1));
 +        MemoryUtil.setLong(peer + TIMESTAMP, timestamp);
 +        MemoryUtil.setInt(peer + TTL, ttl);
 +        MemoryUtil.setInt(peer + DELETION, localDeletionTime);
 +        MemoryUtil.setInt(peer + LENGTH, value.remaining());
 +        MemoryUtil.setBytes(peer + VALUE, value);
 +
 +        if (path != null)
 +        {
 +            ByteBuffer pathbuffer = path.get(0);
 +            assert pathbuffer.order() == ByteOrder.BIG_ENDIAN;
 +
 +            long offset = peer + VALUE + value.remaining();
 +            MemoryUtil.setInt(offset, pathbuffer.remaining());
 +            MemoryUtil.setBytes(offset + 4, pathbuffer);
 +        }
 +    }
 +
 +    private static long simpleSize(int length)
 +    {
 +        return VALUE + length;
 +    }
 +
 +    public long timestamp()
 +    {
 +        return MemoryUtil.getLong(peer + TIMESTAMP);
 +    }
 +
 +    public int ttl()
 +    {
 +        return MemoryUtil.getInt(peer + TTL);
 +    }
 +
 +    public int localDeletionTime()
 +    {
 +        return MemoryUtil.getInt(peer + DELETION);
 +    }
 +
 +    public ByteBuffer value()
 +    {
 +        int length = MemoryUtil.getInt(peer + LENGTH);
 +        return MemoryUtil.getByteBuffer(peer + VALUE, length, ByteOrder.BIG_ENDIAN);
 +    }
 +
 +    public CellPath path()
 +    {
 +        if (MemoryUtil.getByte(peer+ HAS_CELLPATH) == 0)
 +            return null;
 +
 +        long offset = peer + VALUE + MemoryUtil.getInt(peer + LENGTH);
 +        int size = MemoryUtil.getInt(offset);
 +        return CellPath.create(MemoryUtil.getByteBuffer(offset + 4, size, ByteOrder.BIG_ENDIAN));
 +    }
 +
 +    public Cell withUpdatedValue(ByteBuffer newValue)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
++    public Cell withUpdatedColumn(ColumnDefinition column)
++    {
++        return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), value(), path());
++    }
++
 +    public long unsharedHeapSizeExcludingData()
 +    {
 +        return EMPTY_SIZE;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/RowDiffListener.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/TableViews.java
index 0000000,893bdd5..e97e01c
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@@ -1,0 -1,481 +1,481 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.view;
+ 
+ import java.util.*;
+ import java.util.concurrent.CopyOnWriteArrayList;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Iterators;
+ import com.google.common.collect.PeekingIterator;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
+ import org.apache.cassandra.db.filter.*;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.db.partitions.*;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.service.StorageProxy;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.btree.BTreeSet;
+ 
+ 
+ /**
+  * Groups all the views for a given table.
+  */
+ public class TableViews extends AbstractCollection<View>
+ {
+     private final CFMetaData baseTableMetadata;
+ 
+     // We need this to be thread-safe, but the number of times this is changed (when a view is created in the keyspace)
+     // massively exceeds the number of time it's read (for every mutation on the keyspace), so a copy-on-write list is the best option.
+     private final List<View> views = new CopyOnWriteArrayList();
+ 
+     public TableViews(CFMetaData baseTableMetadata)
+     {
+         this.baseTableMetadata = baseTableMetadata;
+     }
+ 
+     public int size()
+     {
+         return views.size();
+     }
+ 
+     public Iterator<View> iterator()
+     {
+         return views.iterator();
+     }
+ 
+     public boolean contains(String viewName)
+     {
+         return Iterables.any(views, view -> view.name.equals(viewName));
+     }
+ 
+     public boolean add(View view)
+     {
+         // We should have validated that there is no existing view with this name at this point
+         assert !contains(view.name);
+         return views.add(view);
+     }
+ 
+     public Iterable<ColumnFamilyStore> allViewsCfs()
+     {
+         Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName);
+         return Iterables.transform(views, view -> keyspace.getColumnFamilyStore(view.getDefinition().viewName));
+     }
+ 
+     public void forceBlockingFlush()
+     {
+         for (ColumnFamilyStore viewCfs : allViewsCfs())
+             viewCfs.forceBlockingFlush();
+     }
+ 
+     public void dumpMemtables()
+     {
+         for (ColumnFamilyStore viewCfs : allViewsCfs())
+             viewCfs.dumpMemtable();
+     }
+ 
+     public void truncateBlocking(long truncatedAt)
+     {
+         for (ColumnFamilyStore viewCfs : allViewsCfs())
+         {
+             ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
+             SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
+         }
+     }
+ 
+     public void removeByName(String viewName)
+     {
+         views.removeIf(v -> v.name.equals(viewName));
+     }
+ 
+     /**
+      * Calculates and pushes updates to the views replicas. The replicas are determined by
+      * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
+      *
+      * @param update an update on the base table represented by this object.
+      * @param writeCommitLog whether we should write the commit log for the view updates.
+      * @param baseComplete time from epoch in ms that the local base mutation was (or will be) completed
+      */
+     public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong baseComplete)
+     {
+         assert update.metadata().cfId.equals(baseTableMetadata.cfId);
+ 
+         Collection<View> views = updatedViews(update);
+         if (views.isEmpty())
+             return;
+ 
+         // Read modified rows
+         int nowInSec = FBUtilities.nowInSeconds();
+         SinglePartitionReadCommand command = readExistingRowsCommand(update, views, nowInSec);
+         if (command == null)
+             return;
+ 
+         ColumnFamilyStore cfs = Keyspace.openAndGetStore(update.metadata());
+         long start = System.nanoTime();
+         Collection<Mutation> mutations;
 -        try (ReadOrderGroup orderGroup = command.startOrderGroup();
++        try (ReadExecutionController orderGroup = command.executionController();
+              UnfilteredRowIterator existings = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command);
+              UnfilteredRowIterator updates = update.unfilteredIterator())
+         {
+             mutations = generateViewUpdates(views, updates, existings, nowInSec);
+         }
+         Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS);
+ 
+         if (!mutations.isEmpty())
+             StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete);
+     }
+ 
+     /**
+      * Given some updates on the base table of this object and the existing values for the rows affected by that update, generates the
+      * mutation to be applied to the provided views.
+      *
+      * @param views the views potentially affected by {@code updates}.
+      * @param updates the base table updates being applied.
+      * @param existings the existing values for the rows affected by {@code updates}. This is used to decide if a view is
+      * obsoleted by the update and should be removed, gather the values for columns that may not be part of the update if
+      * a new view entry needs to be created, and compute the minimal updates to be applied if the view entry isn't changed
+      * but has simply some updated values. This will be empty for view building as we want to assume anything we'll pass
+      * to {@code updates} is new.
+      * @param nowInSec the current time in seconds.
+      * @return the mutations to apply to the {@code views}. This can be empty.
+      */
+     public Collection<Mutation> generateViewUpdates(Collection<View> views, UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec)
+     {
+         assert updates.metadata().cfId.equals(baseTableMetadata.cfId);
+ 
+         List<ViewUpdateGenerator> generators = new ArrayList<>(views.size());
+         for (View view : views)
+             generators.add(new ViewUpdateGenerator(view, updates.partitionKey(), nowInSec));
+ 
+         DeletionTracker existingsDeletion = new DeletionTracker(existings.partitionLevelDeletion());
+         DeletionTracker updatesDeletion = new DeletionTracker(updates.partitionLevelDeletion());
+ 
+         /*
+          * We iterate through the updates and the existing rows in parallel. This allows us to know the consequence
+          * on the view of each update.
+          */
+         PeekingIterator<Unfiltered> existingsIter = Iterators.peekingIterator(existings);
+         PeekingIterator<Unfiltered> updatesIter = Iterators.peekingIterator(updates);
+ 
+         while (existingsIter.hasNext() && updatesIter.hasNext())
+         {
+             Unfiltered existing = existingsIter.peek();
+             Unfiltered update = updatesIter.peek();
+ 
+             Row existingRow;
+             Row updateRow;
+             int cmp = baseTableMetadata.comparator.compare(update, existing);
+             if (cmp < 0)
+             {
+                 // We have an update where there was nothing before
+                 if (update.isRangeTombstoneMarker())
+                 {
+                     updatesDeletion.update(updatesIter.next());
+                     continue;
+                 }
+ 
+                 updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion());
+                 existingRow = emptyRow(updateRow.clustering(), existingsDeletion.currentDeletion());
+             }
+             else if (cmp > 0)
+             {
+                 // We have something existing but no update (which will happen either because it's a range tombstone marker in
+                 // existing, or because we've fetched the existing row due to some partition/range deletion in the updates)
+                 if (existing.isRangeTombstoneMarker())
+                 {
+                     existingsDeletion.update(existingsIter.next());
+                     continue;
+                 }
+ 
+                 existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion());
+                 updateRow = emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion());
+ 
+                 // The way we build the read command used for existing rows, we should always have updatesDeletion.currentDeletion()
+                 // that is not live, since we wouldn't have read the existing row otherwise. And we could assert that, but if we ever
+                 // change the read method so that it can slightly over-read in some case, that would be an easily avoiding bug lurking,
+                 // so we just handle the case.
+                 if (updateRow == null)
+                     continue;
+             }
+             else
+             {
+                 // We're updating a row that had pre-existing data
+                 if (update.isRangeTombstoneMarker())
+                 {
+                     assert existing.isRangeTombstoneMarker();
+                     updatesDeletion.update(updatesIter.next());
+                     existingsDeletion.update(existingsIter.next());
+                     continue;
+                 }
+ 
+                 assert !existing.isRangeTombstoneMarker();
+                 existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion());
+                 updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion());
+             }
+ 
+             addToViewUpdateGenerators(existingRow, updateRow, generators, nowInSec);
+         }
+ 
+         // We only care about more existing rows if the update deletion isn't live, i.e. if we had a partition deletion
+         if (!updatesDeletion.currentDeletion().isLive())
+         {
+             while (existingsIter.hasNext())
+             {
+                 Unfiltered existing = existingsIter.next();
+                 // If it's a range tombstone, we don't care, we're only looking for existing entry that gets deleted by
+                 // the new partition deletion
+                 if (existing.isRangeTombstoneMarker())
+                     continue;
+ 
+                 Row existingRow = (Row)existing;
+                 addToViewUpdateGenerators(existingRow, emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), generators, nowInSec);
+             }
+         }
+         while (updatesIter.hasNext())
+         {
+             Unfiltered update = updatesIter.next();
+             // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates
+             if (update.isRangeTombstoneMarker())
+                 continue;
+ 
+             Row updateRow = (Row)update;
+             addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), updateRow, generators, nowInSec);
+         }
+ 
+         return buildMutations(baseTableMetadata, generators);
+     }
+ 
+     /**
+      * Return the views that are potentially updated by the provided updates.
+      *
+      * @param updates the updates applied to the base table.
+      * @return the views affected by {@code updates}.
+      */
+     public Collection<View> updatedViews(PartitionUpdate updates)
+     {
+         List<View> matchingViews = new ArrayList<>(views.size());
+ 
+         for (View view : views)
+         {
+             ReadQuery selectQuery = view.getReadQuery();
+             if (!selectQuery.selectsKey(updates.partitionKey()))
+                 continue;
+ 
+             matchingViews.add(view);
+         }
+         return matchingViews;
+     }
+ 
+     /**
+      * Returns the command to use to read the existing rows required to generate view updates for the provided base
+      * base updates.
+      *
+      * @param updates the base table updates being applied.
+      * @param views the views potentially affected by {@code updates}.
+      * @param nowInSec the current time in seconds.
+      * @return the command to use to read the base table rows required to generate view updates for {@code updates}.
+      */
+     private SinglePartitionReadCommand readExistingRowsCommand(PartitionUpdate updates, Collection<View> views, int nowInSec)
+     {
+         Slices.Builder sliceBuilder = null;
+         DeletionInfo deletionInfo = updates.deletionInfo();
+         CFMetaData metadata = updates.metadata();
+         DecoratedKey key = updates.partitionKey();
+         // TODO: This is subtle: we need to gather all the slices that we have to fetch between partition del, range tombstones and rows.
+         if (!deletionInfo.isLive())
+         {
+             sliceBuilder = new Slices.Builder(metadata.comparator);
+             // Everything covered by a deletion might invalidate an existing view entry, which means we must read it to know. In practice
+             // though, the views involved might filter some base table clustering columns, in which case we can restrict what we read
+             // using those restrictions.
+             // If there is a partition deletion, then we can simply take each slices from each view select filter. They may overlap but
+             // the Slices.Builder handles that for us. Note that in many case this will just involve reading everything (as soon as any
+             // view involved has no clustering restrictions for instance).
+             // For range tombstone, we should theoretically take the difference between the range tombstoned and the slices selected
+             // by every views, but as we don't an easy way to compute that right now, we keep it simple and just use the tombstoned
+             // range.
+             // TODO: we should improve that latter part.
+             if (!deletionInfo.getPartitionDeletion().isLive())
+             {
+                 for (View view : views)
+                     sliceBuilder.addAll(view.getSelectStatement().clusteringIndexFilterAsSlices());
+             }
+             else
+             {
+                 assert deletionInfo.hasRanges();
+                 Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
+                 while (iter.hasNext())
+                     sliceBuilder.add(iter.next().deletedSlice());
+             }
+         }
+ 
+         // We need to read every row that is updated, unless we can prove that it has no impact on any view entries.
+ 
+         // If we had some slices from the deletions above, we'll continue using that. Otherwise, it's more efficient to build
+         // a names query.
+         BTreeSet.Builder<Clustering> namesBuilder = sliceBuilder == null ? BTreeSet.builder(metadata.comparator) : null;
+         for (Row row : updates)
+         {
+             // Don't read the existing state if we can prove the update won't affect any views
+             if (!affectsAnyViews(key, row, views))
+                 continue;
+ 
+             if (namesBuilder == null)
+                 sliceBuilder.add(Slice.make(row.clustering()));
+             else
+                 namesBuilder.add(row.clustering());
+         }
+ 
+         NavigableSet<Clustering> names = namesBuilder == null ? null : namesBuilder.build();
+         // If we have a slice builder, it means we had some deletions and we have to read. But if we had
+         // only row updates, it's possible none of them affected the views, in which case we have nothing
+         // to do.
+         if (names != null && names.isEmpty())
+             return null;
+ 
+         ClusteringIndexFilter clusteringFilter = names == null
+                                                ? new ClusteringIndexSliceFilter(sliceBuilder.build(), false)
+                                                : new ClusteringIndexNamesFilter(names, false);
+         // If we have more than one view, we should merge the queried columns by each views but to keep it simple we just
+         // include everything. We could change that in the future.
+         ColumnFilter queriedColumns = views.size() == 1
+                                     ? Iterables.getOnlyElement(views).getSelectStatement().queriedColumns()
+                                     : ColumnFilter.all(metadata);
+         // Note that the views could have restrictions on regular columns, but even if that's the case we shouldn't apply those
+         // when we read, because even if an existing row doesn't match the view filter, the update can change that in which
+         // case we'll need to know the existing content. There is also no easy way to merge those RowFilter when we have multiple views.
+         // TODO: we could still make sense to special case for when there is a single view and a small number of updates (and
+         // no deletions). Indeed, in that case we could check whether any of the update modify any of the restricted regular
+         // column, and if that's not the case we could use view filter. We keep it simple for now though.
+         RowFilter rowFilter = RowFilter.NONE;
+         return SinglePartitionReadCommand.create(metadata, nowInSec, queriedColumns, rowFilter, DataLimits.NONE, key, clusteringFilter);
+     }
+ 
+     private boolean affectsAnyViews(DecoratedKey partitionKey, Row update, Collection<View> views)
+     {
+         for (View view : views)
+         {
+             if (view.mayBeAffectedBy(partitionKey, update))
+                 return true;
+         }
+         return false;
+     }
+ 
+     /**
+      * Given an existing base row and the update that we're going to apply to this row, generate the modifications
+      * to apply to MVs using the provided {@code ViewUpdateGenerator}s.
+      *
+      * @param existingBaseRow the base table row as it is before an update.
+      * @param updateBaseRow the newly updates made to {@code existingBaseRow}.
+      * @param generators the view update generators to add the new changes to.
+      * @param nowInSec the current time in seconds. Used to decide if data is live or not.
+      */
+     private static void addToViewUpdateGenerators(Row existingBaseRow, Row updateBaseRow, Collection<ViewUpdateGenerator> generators, int nowInSec)
+     {
+         // Having existing empty is useful, it just means we'll insert a brand new entry for updateBaseRow,
+         // but if we have no update at all, we shouldn't get there.
+         assert !updateBaseRow.isEmpty();
+ 
+         // We allow existingBaseRow to be null, which we treat the same as being empty as an small optimization
+         // to avoid allocating empty row objects when we know there was nothing existing.
+         Row mergedBaseRow = existingBaseRow == null ? updateBaseRow : Rows.merge(existingBaseRow, updateBaseRow, nowInSec);
+         for (ViewUpdateGenerator generator : generators)
+             generator.addBaseTableUpdate(existingBaseRow, mergedBaseRow);
+     }
+ 
+     private static Row emptyRow(Clustering clustering, DeletionTime deletion)
+     {
+         // Returning null for an empty row is slightly ugly, but the case where there is no pre-existing row is fairly common
+         // (especially when building the view), so we want to avoid a dummy allocation of an empty row every time.
+         // And MultiViewUpdateBuilder knows how to deal with that.
+         return deletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(deletion));
+     }
+ 
+     /**
+      * Extracts (and potentially groups) the mutations generated by the provided view update generator.
+      * Returns the mutation that needs to be done to the views given the base table updates
+      * passed to {@link #addBaseTableUpdate}.
+      *
+      * @param baseTableMetadata the metadata for the base table being updated.
+      * @param generators the generators from which to extract the view mutations from.
+      * @return the mutations created by all the generators in {@code generators}.
+      */
+     private Collection<Mutation> buildMutations(CFMetaData baseTableMetadata, List<ViewUpdateGenerator> generators)
+     {
+         // One view is probably common enough and we can optimize a bit easily
+         if (generators.size() == 1)
+         {
+             Collection<PartitionUpdate> updates = generators.get(0).generateViewUpdates();
+             List<Mutation> mutations = new ArrayList<>(updates.size());
+             for (PartitionUpdate update : updates)
+                 mutations.add(new Mutation(update));
+             return mutations;
+         }
+ 
+         Map<DecoratedKey, Mutation> mutations = new HashMap<>();
+         for (ViewUpdateGenerator generator : generators)
+         {
+             for (PartitionUpdate update : generator.generateViewUpdates())
+             {
+                 DecoratedKey key = update.partitionKey();
+                 Mutation mutation = mutations.get(key);
+                 if (mutation == null)
+                 {
+                     mutation = new Mutation(baseTableMetadata.ksName, key);
+                     mutations.put(key, mutation);
+                 }
+                 mutation.add(update);
+             }
+         }
+         return mutations.values();
+     }
+ 
+     /**
+      * A simple helper that tracks for a given {@code UnfilteredRowIterator} what is the current deletion at any time of the
+      * iteration. It will be the currently open range tombstone deletion if there is one and the partition deletion otherwise.
+      */
+     private static class DeletionTracker
+     {
+         private final DeletionTime partitionDeletion;
+         private DeletionTime deletion;
+ 
+         public DeletionTracker(DeletionTime partitionDeletion)
+         {
+             this.partitionDeletion = partitionDeletion;
+         }
+ 
+         public void update(Unfiltered marker)
+         {
+             assert marker instanceof RangeTombstoneMarker;
+             RangeTombstoneMarker rtm = (RangeTombstoneMarker)marker;
+             this.deletion = rtm.isOpen(false)
+                           ? rtm.openDeletionTime(false)
+                           : null;
+         }
+ 
+         public DeletionTime currentDeletion()
+         {
+             return deletion == null ? partitionDeletion : deletion;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 8944122,b2b409b..65e26e2
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@@ -77,28 -76,22 +77,22 @@@ public class ViewBuilder extends Compac
          if (!selectQuery.selectsKey(key))
              return;
  
-         QueryPager pager = view.getSelectStatement().internalReadForView(key, FBUtilities.nowInSeconds()).getPager(null, Server.CURRENT_VERSION);
+         int nowInSec = FBUtilities.nowInSeconds();
+         SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
  
-         while (!pager.isExhausted())
+         // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
+         // and pretend that there is nothing pre-existing.
+         UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
+ 
+         Collection<Mutation> mutations;
 -        try (ReadOrderGroup orderGroup = command.startOrderGroup();
++        try (ReadExecutionController orderGroup = command.executionController();
+              UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
          {
-            try (ReadExecutionController executionController = pager.executionController();
-                 PartitionIterator partitionIterator = pager.fetchPageInternal(128, executionController))
-            {
-                if (!partitionIterator.hasNext())
-                    return;
- 
-                try (RowIterator rowIterator = partitionIterator.next())
-                {
-                    FilteredPartition partition = FilteredPartition.create(rowIterator);
-                    TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true);
- 
-                    Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
- 
-                    if (mutations != null)
-                        StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
-                }
-            }
+             mutations = baseCfs.keyspace.viewManager.forTable(baseCfs.metadata).generateViewUpdates(Collections.singleton(view), data, empty, nowInSec);
          }
+ 
+         if (!mutations.isEmpty())
+             StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
      }
  
      public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/ViewManager.java
index 6b8fe0d,fd04b97..bd73733
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@@ -30,24 -31,24 +31,25 @@@ import org.apache.cassandra.config.CFMe
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.ViewDefinition;
  import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.commitlog.ReplayPosition;
- import org.apache.cassandra.db.partitions.PartitionUpdate;
- import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.db.partitions.*;
 -
 +import org.apache.cassandra.repair.SystemDistributedKeyspace;
- import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.service.StorageService;
  
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
  /**
   * Manages {@link View}'s for a single {@link ColumnFamilyStore}. All of the views for that table are created when this
   * manager is initialized.
   *
   * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
 - * any views {@link ViewManager#updatesAffectView(Collection, boolean)}, provide locks to prevent multiple
 - * updates from creating incoherent updates in the view {@link ViewManager#acquireLockFor(ByteBuffer)}, and
 + * any views {@link #updatesAffectView(Collection, boolean)}, provide locks to prevent multiple
 + * updates from creating incoherent updates in the view {@link #acquireLockFor(int)}, and
   * to affect change on the view.
+  *
+  * TODO: I think we can get rid of that class. For addition/removal of view by names, we could move it Keyspace. And we
+  * not sure it's even worth keeping viewsByName as none of the related operation are performance sensitive so we could
+  * find the view by iterating over the CFStore.viewManager directly.
+  * For the lock, it could move to Keyspace too, but I don't remmenber why it has to be at the keyspace level and if it
+  * can be at the table level, maybe that's where it should be.
   */
  public class ViewManager
  {
@@@ -250,9 -148,8 +163,9 @@@
          if (view == null)
              return;
  
-         forTable(view.getDefinition().baseTableId).removeView(name);
+         forTable(view.getDefinition().baseTableMetadata()).removeByName(name);
          SystemKeyspace.setViewRemoved(keyspace.getName(), view.name);
 +        SystemDistributedKeyspace.setViewRemoved(keyspace.getName(), view.name);
      }
  
      public void buildAllViews()
@@@ -261,22 -158,23 +174,23 @@@
              view.build();
      }
  
-     public ForStore forTable(UUID baseId)
+     public TableViews forTable(CFMetaData metadata)
      {
-         ForStore forStore = viewManagersByStore.get(baseId);
-         if (forStore == null)
+         UUID baseId = metadata.cfId;
+         TableViews views = viewsByBaseTable.get(baseId);
+         if (views == null)
          {
-             forStore = new ForStore();
-             ForStore previous = viewManagersByStore.put(baseId, forStore);
+             views = new TableViews(metadata);
+             TableViews previous = viewsByBaseTable.putIfAbsent(baseId, views);
              if (previous != null)
-                 forStore = previous;
+                 views = previous;
          }
-         return forStore;
+         return views;
      }
  
 -    public static Lock acquireLockFor(ByteBuffer key)
 +    public static Lock acquireLockFor(int keyAndCfidHash)
      {
 -        Lock lock = LOCKS.get(key);
 +        Lock lock = LOCKS.get(keyAndCfidHash);
  
          if (lock.tryLock())
              return lock;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index 0000000,af025cb..4c6dbb7
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@@ -1,0 -1,549 +1,549 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.view;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.*;
+ 
+ import com.google.common.collect.Iterators;
+ import com.google.common.collect.PeekingIterator;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.ViewDefinition;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.db.partitions.*;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.marshal.CompositeType;
+ 
+ /**
+  * Creates the updates to apply to a view given the existing rows in the base
+  * table and the updates that we're applying to them (this handles updates
+  * on a single partition only).
+  *
+  * This class is used by passing the updates made to the base table to
+  * {@link #addBaseTableUpdate} and calling {@link #generateViewUpdates} once all updates have
+  * been handled to get the resulting view mutations.
+  */
+ public class ViewUpdateGenerator
+ {
+     private final View view;
+     private final int nowInSec;
+ 
+     private final CFMetaData baseMetadata;
+     private final DecoratedKey baseDecoratedKey;
+     private final ByteBuffer[] basePartitionKey;
+ 
+     private final CFMetaData viewMetadata;
+ 
+     private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>();
+ 
+     // Reused internally to build a new entry
+     private final ByteBuffer[] currentViewEntryPartitionKey;
+     private final Row.Builder currentViewEntryBuilder;
+ 
+     /**
+      * The type of type update action to perform to the view for a given base table
+      * update.
+      */
+     private enum UpdateAction
+     {
+         NONE,            // There was no view entry and none should be added
+         NEW_ENTRY,       // There was no entry but there is one post-update
+         DELETE_OLD,      // There was an entry but there is nothing after update
+         UPDATE_EXISTING, // There was an entry and the update modifies it
+         SWITCH_ENTRY     // There was an entry and there is still one after update,
+                          // but they are not the same one.
+     };
+ 
+     /**
+      * Creates a new {@code ViewUpdateBuilder}.
+      *
+      * @param view the view for which this will be building updates for.
+      * @param basePartitionKey the partition key for the base table partition for which
+      * we'll handle updates for.
+      * @param nowInSec the current time in seconds. Used to decide if data are live or not
+      * and as base reference for new deletions.
+      */
+     public ViewUpdateGenerator(View view, DecoratedKey basePartitionKey, int nowInSec)
+     {
+         this.view = view;
+         this.nowInSec = nowInSec;
+ 
+         this.baseMetadata = view.getDefinition().baseTableMetadata();
+         this.baseDecoratedKey = basePartitionKey;
+         this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.getKeyValidator());
+ 
+         this.viewMetadata = view.getDefinition().metadata;
+ 
+         this.currentViewEntryPartitionKey = new ByteBuffer[viewMetadata.partitionKeyColumns().size()];
+         this.currentViewEntryBuilder = BTreeRow.sortedBuilder();
+     }
+ 
+     private static ByteBuffer[] extractKeyComponents(DecoratedKey partitionKey, AbstractType<?> type)
+     {
+         return type instanceof CompositeType
+              ? ((CompositeType)type).split(partitionKey.getKey())
+              : new ByteBuffer[]{ partitionKey.getKey() };
+     }
+ 
+     /**
+      * Adds to this generator the updates to be made to the view given a base table row
+      * before and after an update.
+      *
+      * @param existingBaseRow the base table row as it is before an update.
+      * @param mergedBaseRow the base table row after the update is applied (note that
+      * this is not just the new update, but rather the resulting row).
+      */
+     public void addBaseTableUpdate(Row existingBaseRow, Row mergedBaseRow)
+     {
+         switch (updateAction(existingBaseRow, mergedBaseRow))
+         {
+             case NONE:
+                 return;
+             case NEW_ENTRY:
+                 createEntry(mergedBaseRow);
+                 return;
+             case DELETE_OLD:
+                 deleteOldEntry(existingBaseRow);
+                 return;
+             case UPDATE_EXISTING:
+                 updateEntry(existingBaseRow, mergedBaseRow);
+                 return;
+             case SWITCH_ENTRY:
+                 createEntry(mergedBaseRow);
+                 deleteOldEntry(existingBaseRow);
+                 return;
+         }
+     }
+ 
+     /**
+      * Returns the updates that needs to be done to the view given the base table updates
+      * passed to {@link #generateViewMutations}.
+      *
+      * @return the updates to do to the view.
+      */
+     public Collection<PartitionUpdate> generateViewUpdates()
+     {
+         return updates.values();
+     }
+ 
+     /**
+      * Compute which type of action needs to be performed to the view for a base table row
+      * before and after an update.
+      */
+     private UpdateAction updateAction(Row existingBaseRow, Row mergedBaseRow)
+     {
+         // Having existing empty is useful, it just means we'll insert a brand new entry for mergedBaseRow,
+         // but if we have no update at all, we shouldn't get there.
+         assert !mergedBaseRow.isEmpty();
+ 
+         // Note that none of the base PK columns will differ since we're intrinsically dealing
+         // with the same base row. So we have to check 3 things:
+         //   1) that the clustering doesn't have a null, which can happen for compact tables. If that's the case,
+         //      there is no corresponding entries.
+         //   2) if there is a column not part of the base PK in the view PK, whether it is changed by the update.
+         //   3) whether mergedBaseRow actually match the view SELECT filter
+ 
+         if (baseMetadata.isCompactTable())
+         {
+             Clustering clustering = mergedBaseRow.clustering();
+             for (int i = 0; i < clustering.size(); i++)
+             {
+                 if (clustering.get(i) == null)
+                     return UpdateAction.NONE;
+             }
+         }
+ 
+         assert view.baseNonPKColumnsInViewPK.size() <= 1 : "We currently only support one base non-PK column in the view PK";
+         if (view.baseNonPKColumnsInViewPK.isEmpty())
+         {
+             // The view entry is necessarily the same pre and post update.
+ 
+             // Note that we allow existingBaseRow to be null and treat it as empty (see MultiViewUpdateBuilder.generateViewsMutations).
+             boolean existingHasLiveData = existingBaseRow != null && existingBaseRow.hasLiveData(nowInSec);
+             boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec);
+             return existingHasLiveData
+                  ? (mergedHasLiveData ? UpdateAction.UPDATE_EXISTING : UpdateAction.DELETE_OLD)
+                  : (mergedHasLiveData ? UpdateAction.NEW_ENTRY : UpdateAction.NONE);
+         }
+ 
+         ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0);
+         assert !baseColumn.isComplex() : "A complex column couldn't be part of the view PK";
+         Cell before = existingBaseRow == null ? null : existingBaseRow.getCell(baseColumn);
+         Cell after = mergedBaseRow.getCell(baseColumn);
+ 
+         // If the update didn't modified this column, the cells will be the same object so it's worth checking
+         if (before == after)
+             return before == null ? UpdateAction.NONE : UpdateAction.UPDATE_EXISTING;
+ 
+         if (!isLive(before))
+             return isLive(after) ? UpdateAction.NEW_ENTRY : UpdateAction.NONE;
+         if (!isLive(after))
+             return UpdateAction.DELETE_OLD;
+ 
+         return baseColumn.cellValueType().compare(before.value(), after.value()) == 0
+              ? UpdateAction.UPDATE_EXISTING
+              : UpdateAction.SWITCH_ENTRY;
+     }
+ 
+     private boolean matchesViewFilter(Row baseRow)
+     {
+         return view.matchesViewFilter(baseDecoratedKey, baseRow, nowInSec);
+     }
+ 
+     private boolean isLive(Cell cell)
+     {
+         return cell != null && cell.isLive(nowInSec);
+     }
+ 
+     /**
+      * Creates a view entry corresponding to the provided base row.
+      * <p>
+      * This method checks that the base row does match the view filter before applying it.
+      */
+     private void createEntry(Row baseRow)
+     {
+         // Before create a new entry, make sure it matches the view filter
+         if (!matchesViewFilter(baseRow))
+             return;
+ 
+         startNewUpdate(baseRow);
+         currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(baseRow));
+         currentViewEntryBuilder.addRowDeletion(baseRow.deletion());
+ 
+         for (ColumnData data : baseRow)
+         {
+             ColumnDefinition viewColumn = view.getViewColumn(data.column());
+             // If that base table column is not denormalized in the view, we had nothing to do.
+             // Alose, if it's part of the view PK it's already been taken into account in the clustering.
+             if (viewColumn == null || viewColumn.isPrimaryKeyColumn())
+                 continue;
+ 
+             addColumnData(viewColumn, data);
+         }
+ 
+         submitUpdate();
+     }
+ 
+     /**
+      * Creates the updates to apply to the existing view entry given the base table row before
+      * and after the update, assuming that the update hasn't changed to which view entry the
+      * row correspond (that is, we know the columns composing the view PK haven't changed).
+      * <p>
+      * This method checks that the base row (before and after) does match the view filter before
+      * applying anything.
+      */
+     private void updateEntry(Row existingBaseRow, Row mergedBaseRow)
+     {
+         // While we know existingBaseRow and mergedBaseRow are corresponding to the same view entry,
+         // they may not match the view filter.
+         if (!matchesViewFilter(existingBaseRow))
+         {
+             createEntry(mergedBaseRow);
+             return;
+         }
+         if (!matchesViewFilter(mergedBaseRow))
+         {
+             deleteOldEntryInternal(existingBaseRow);
+             return;
+         }
+ 
+         startNewUpdate(mergedBaseRow);
+ 
+         // In theory, it may be the PK liveness and row deletion hasn't been change by the update
+         // and we could condition the 2 additions below. In practice though, it's as fast (if not
+         // faster) to compute those info than to check if they have changed so we keep it simple.
+         currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(mergedBaseRow));
+         currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion());
+ 
+         // We only add to the view update the cells from mergedBaseRow that differs from
+         // existingBaseRow. For that and for speed we can just cell pointer equality: if the update
+         // hasn't touched a cell, we know it will be the same object in existingBaseRow and
+         // mergedBaseRow (note that including more cells than we strictly should isn't a problem
+         // for correction, so even if the code change and pointer equality don't work anymore, it'll
+         // only a slightly inefficiency which we can fix then).
+         // Note: we could alternatively use Rows.diff() for this, but because it is a bit more generic
+         // than what we need here, it's also a bit less efficient (it allocates more in particular),
+         // and this might be called a lot of time for view updates. So, given that this is not a whole
+         // lot of code anyway, it's probably doing the diff manually.
+         PeekingIterator<ColumnData> existingIter = Iterators.peekingIterator(existingBaseRow.iterator());
+         for (ColumnData mergedData : mergedBaseRow)
+         {
+             ColumnDefinition baseColumn = mergedData.column();
+             ColumnDefinition viewColumn = view.getViewColumn(baseColumn);
+             // If that base table column is not denormalized in the view, we had nothing to do.
+             // Alose, if it's part of the view PK it's already been taken into account in the clustering.
+             if (viewColumn == null || viewColumn.isPrimaryKeyColumn())
+                 continue;
+ 
+             ColumnData existingData = null;
+             // Find if there is data for that column in the existing row
+             while (existingIter.hasNext())
+             {
+                 int cmp = baseColumn.compareTo(existingIter.peek().column());
+                 if (cmp < 0)
+                     break;
+ 
+                 ColumnData next = existingIter.next();
+                 if (cmp == 0)
+                 {
+                     existingData = next;
+                     break;
+                 }
+             }
+ 
+             if (existingData == null)
+             {
+                 addColumnData(viewColumn, mergedData);
+                 continue;
+             }
+ 
+             if (mergedData == existingData)
+                 continue;
+ 
+             if (baseColumn.isComplex())
+             {
+                 ComplexColumnData mergedComplexData = (ComplexColumnData)mergedData;
+                 ComplexColumnData existingComplexData = (ComplexColumnData)existingData;
+                 if (mergedComplexData.complexDeletion().supersedes(existingComplexData.complexDeletion()))
+                     currentViewEntryBuilder.addComplexDeletion(viewColumn, mergedComplexData.complexDeletion());
+ 
+                 PeekingIterator<Cell> existingCells = Iterators.peekingIterator(existingComplexData.iterator());
+                 for (Cell mergedCell : mergedComplexData)
+                 {
+                     Cell existingCell = null;
+                     // Find if there is corresponding cell in the existing row
+                     while (existingCells.hasNext())
+                     {
+                         int cmp = baseColumn.cellPathComparator().compare(mergedCell.path(), existingCells.peek().path());
+                         if (cmp > 0)
+                             break;
+ 
+                         Cell next = existingCells.next();
+                         if (cmp == 0)
+                         {
+                             existingCell = next;
+                             break;
+                         }
+                     }
+ 
+                     if (mergedCell != existingCell)
+                         addCell(viewColumn, mergedCell);
+                 }
+             }
+             else
+             {
+                 // Note that we've already eliminated the case where merged == existing
+                 addCell(viewColumn, (Cell)mergedData);
+             }
+         }
+ 
+         submitUpdate();
+     }
+ 
+     /**
+      * Deletes the view entry corresponding to the provided base row.
+      * <p>
+      * This method checks that the base row does match the view filter before bothering.
+      */
+     private void deleteOldEntry(Row existingBaseRow)
+     {
+         // Before deleting an old entry, make sure it was matching the view filter (otherwise there is nothing to delete)
+         if (!matchesViewFilter(existingBaseRow))
+             return;
+ 
+         deleteOldEntryInternal(existingBaseRow);
+     }
+ 
+     private void deleteOldEntryInternal(Row existingBaseRow)
+     {
+         startNewUpdate(existingBaseRow);
+         DeletionTime dt = new DeletionTime(computeTimestampForEntryDeletion(existingBaseRow), nowInSec);
+         currentViewEntryBuilder.addRowDeletion(Row.Deletion.shadowable(dt));
+         submitUpdate();
+     }
+ 
+     /**
+      * Computes the partition key and clustering for a new view entry, and setup the internal
+      * row builder for the new row.
+      *
+      * This assumes that there is corresponding entry, i.e. no values for the partition key and
+      * clustering are null (since we have eliminated that case through updateAction).
+      */
+     private void startNewUpdate(Row baseRow)
+     {
+         ByteBuffer[] clusteringValues = new ByteBuffer[viewMetadata.clusteringColumns().size()];
+         for (ColumnDefinition viewColumn : viewMetadata.primaryKeyColumns())
+         {
+             ColumnDefinition baseColumn = view.getBaseColumn(viewColumn);
+             ByteBuffer value = getValueForPK(baseColumn, baseRow);
+             if (viewColumn.isPartitionKey())
+                 currentViewEntryPartitionKey[viewColumn.position()] = value;
+             else
+                 clusteringValues[viewColumn.position()] = value;
+         }
+ 
 -        currentViewEntryBuilder.newRow(new Clustering(clusteringValues));
++        currentViewEntryBuilder.newRow(Clustering.make(clusteringValues));
+     }
+ 
+     private LivenessInfo computeLivenessInfoForEntry(Row baseRow)
+     {
+         /*
+          * We need to compute both the timestamp and expiration.
+          *
+          * For the timestamp, it makes sense to use the bigger timestamp for all view PK columns.
+          *
+          * This is more complex for the expiration. We want to maintain consistency between the base and the view, so the
+          * entry should only exist as long as the base row exists _and_ has non-null values for all the columns that are part
+          * of the view PK.
+          * Which means we really have 2 cases:
+          *   1) either the columns for the base and view PKs are exactly the same: in that case, the view entry should live
+          *      as long as the base row lives. This means the view entry should only expire once *everything* in the base row
+          *      has expired. Which means the row TTL should be the max of any other TTL.
+          *   2) or there is a column that is not in the base PK but is in the view PK (we can only have one so far, we'll need
+          *      to slightly adapt if we allow more later): in that case, as long as that column lives the entry does too, but
+          *      as soon as it expires (or is deleted for that matter) the entry also should expire. So the expiration for the
+          *      view is the one of that column, irregarding of any other expiration.
+          *      To take an example of that case, if you have:
+          *        CREATE TABLE t (a int, b int, c int, PRIMARY KEY (a, b))
+          *        CREATE MATERIALIZED VIEW mv AS SELECT * FROM t WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)
+          *        INSERT INTO t(a, b) VALUES (0, 0) USING TTL 3;
+          *        UPDATE t SET c = 0 WHERE a = 0 AND b = 0;
+          *      then even after 3 seconds elapsed, the row will still exist (it just won't have a "row marker" anymore) and so
+          *      the MV should still have a corresponding entry.
+          */
+         assert view.baseNonPKColumnsInViewPK.size() <= 1; // This may change, but is currently an enforced limitation
+ 
+         LivenessInfo baseLiveness = baseRow.primaryKeyLivenessInfo();
+ 
+         if (view.baseNonPKColumnsInViewPK.isEmpty())
+         {
+             int ttl = baseLiveness.ttl();
+             int expirationTime = baseLiveness.localExpirationTime();
+             for (Cell cell : baseRow.cells())
+             {
+                 if (cell.ttl() > ttl)
+                 {
+                     ttl = cell.ttl();
+                     expirationTime = cell.localDeletionTime();
+                 }
+             }
+             return ttl == baseLiveness.ttl()
+                  ? baseLiveness
 -                 : LivenessInfo.create(baseLiveness.timestamp(), ttl, expirationTime);
++                 : LivenessInfo.withExpirationTime(baseLiveness.timestamp(), ttl, expirationTime);
+         }
+ 
+         ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0);
+         Cell cell = baseRow.getCell(baseColumn);
+         assert isLive(cell) : "We shouldn't have got there is the base row had no associated entry";
+ 
+         long timestamp = Math.max(baseLiveness.timestamp(), cell.timestamp());
 -        return LivenessInfo.create(timestamp, cell.ttl(), cell.localDeletionTime());
++        return LivenessInfo.withExpirationTime(timestamp, cell.ttl(), cell.localDeletionTime());
+     }
+ 
+     private long computeTimestampForEntryDeletion(Row baseRow)
+     {
+         // We delete the old row with it's row entry timestamp using a shadowable deletion.
+         // We must make sure that the deletion deletes everything in the entry (or the entry will
+         // still show up), so we must use the bigger timestamp found in the existing row (for any
+         // column included in the view at least).
+         // TODO: We have a problem though: if the entry is "resurected" by a later update, we would
+         // need to ensure that the timestamp for then entry then is bigger than the tombstone
+         // we're just inserting, which is not currently guaranteed.
+         // This is a bug for a separate ticket though.
+         long timestamp = baseRow.primaryKeyLivenessInfo().timestamp();
+         for (ColumnData data : baseRow)
+         {
+             if (!view.getDefinition().includes(data.column().name))
+                 continue;
+ 
+             timestamp = Math.max(timestamp, data.maxTimestamp());
+         }
+         return timestamp;
+     }
+ 
+     private void addColumnData(ColumnDefinition viewColumn, ColumnData baseTableData)
+     {
+         assert viewColumn.isComplex() == baseTableData.column().isComplex();
+         if (!viewColumn.isComplex())
+         {
+             addCell(viewColumn, (Cell)baseTableData);
+             return;
+         }
+ 
+         ComplexColumnData complexData = (ComplexColumnData)baseTableData;
+         currentViewEntryBuilder.addComplexDeletion(viewColumn, complexData.complexDeletion());
+         for (Cell cell : complexData)
+             addCell(viewColumn, cell);
+     }
+ 
+     private void addCell(ColumnDefinition viewColumn, Cell baseTableCell)
+     {
+         assert !viewColumn.isPrimaryKeyColumn();
+         currentViewEntryBuilder.addCell(baseTableCell.withUpdatedColumn(viewColumn));
+     }
+ 
+     /**
+      * Finish building the currently updated view entry and add it to the other built
+      * updates.
+      */
+     private void submitUpdate()
+     {
+         Row row = currentViewEntryBuilder.build();
+         // I'm not sure we can reach there is there is nothing is updated, but adding an empty row breaks things
+         // and it costs us nothing to be prudent here.
+         if (row.isEmpty())
+             return;
+ 
+         DecoratedKey partitionKey = makeCurrentPartitionKey();
+         PartitionUpdate update = updates.get(partitionKey);
+         if (update == null)
+         {
+             // We can't really know which columns of the view will be updated nor how many row will be updated for this key
+             // so we rely on hopefully sane defaults.
+             update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.partitionColumns(), 4);
+             updates.put(partitionKey, update);
+         }
+         update.add(row);
+     }
+ 
+     private DecoratedKey makeCurrentPartitionKey()
+     {
+         ByteBuffer rawKey = viewMetadata.partitionKeyColumns().size() == 1
+                           ? currentViewEntryPartitionKey[0]
+                           : CompositeType.build(currentViewEntryPartitionKey);
+ 
+         return viewMetadata.decorateKey(rawKey);
+     }
+ 
+     private ByteBuffer getValueForPK(ColumnDefinition column, Row row)
+     {
+         switch (column.kind)
+         {
+             case PARTITION_KEY:
+                 return basePartitionKey[column.position()];
+             case CLUSTERING:
+                 return row.clustering().get(column.position());
+             default:
+                 // This shouldn't NPE as we shouldn't get there if the value can be null (or there is a bug in updateAction())
+                 return row.getCell(column).value();
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a87fd715/test/unit/org/apache/cassandra/db/rows/RowsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/rows/RowsTest.java
index 00ab6ca,b47bea2..ba03478
--- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
@@@ -235,11 -235,12 +235,12 @@@ public class RowsTes
          originalBuilder.addPrimaryKeyLivenessInfo(liveness);
          DeletionTime complexDeletion = new DeletionTime(ts-1, now);
          originalBuilder.addComplexDeletion(m, complexDeletion);
 -        List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, secondToTs(now), BB1),
 -                                                      BufferCell.live(kcvm, m, secondToTs(now), BB1, CellPath.create(BB1)),
 -                                                      BufferCell.live(kcvm, m, secondToTs(now), BB2, CellPath.create(BB2)));
 +        List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(v, secondToTs(now), BB1),
 +                                                      BufferCell.live(m, secondToTs(now), BB1, CellPath.create(BB1)),
 +                                                      BufferCell.live(m, secondToTs(now), BB2, CellPath.create(BB2)));
          expectedCells.forEach(originalBuilder::addCell);
-         Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts, now), false);
+         // We need to use ts-1 so the deletion doesn't shadow what we've created
+         Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts-1, now), false);
          originalBuilder.addRowDeletion(rowDeletion);
  
          RowBuilder builder = new RowBuilder();
@@@ -263,11 -264,12 +264,12 @@@
          builder.addPrimaryKeyLivenessInfo(liveness);
          DeletionTime complexDeletion = new DeletionTime(ts-1, now);
          builder.addComplexDeletion(m, complexDeletion);
 -        List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, ts, BB1),
 -                                                      BufferCell.live(kcvm, m, ts, BB1, CellPath.create(BB1)),
 -                                                      BufferCell.live(kcvm, m, ts, BB2, CellPath.create(BB2)));
 +        List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(v, ts, BB1),
 +                                                      BufferCell.live(m, ts, BB1, CellPath.create(BB1)),
 +                                                      BufferCell.live(m, ts, BB2, CellPath.create(BB2)));
          expectedCells.forEach(builder::addCell);
-         Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts, now), false);
+         // We need to use ts-1 so the deletion doesn't shadow what we've created
+         Row.Deletion rowDeletion = new Row.Deletion(new DeletionTime(ts-1, now), false);
          builder.addRowDeletion(rowDeletion);
  
          StatsCollector collector = new StatsCollector();