You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/07/30 02:55:22 UTC

[4/7] cassandra git commit: Revert "Revert "Materialized Views""

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
new file mode 100644
index 0000000..2ddc6ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractReadCommandBuilder
+{
+    protected final ColumnFamilyStore cfs;
+    protected int nowInSeconds;
+
+    private int cqlLimit = -1;
+    private int pagingLimit = -1;
+    protected boolean reversed = false;
+
+    protected Set<ColumnIdentifier> columns;
+    protected final RowFilter filter = RowFilter.create();
+
+    private Slice.Bound lowerClusteringBound;
+    private Slice.Bound upperClusteringBound;
+
+    private NavigableSet<Clustering> clusterings;
+
+    // Use Util.cmd() instead of this ctor directly
+    AbstractReadCommandBuilder(ColumnFamilyStore cfs)
+    {
+        this.cfs = cfs;
+        this.nowInSeconds = FBUtilities.nowInSeconds();
+    }
+
+    public AbstractReadCommandBuilder withNowInSeconds(int nowInSec)
+    {
+        this.nowInSeconds = nowInSec;
+        return this;
+    }
+
+    public AbstractReadCommandBuilder fromIncl(Object... values)
+    {
+        assert lowerClusteringBound == null && clusterings == null;
+        this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values);
+        return this;
+    }
+
+    public AbstractReadCommandBuilder fromExcl(Object... values)
+    {
+        assert lowerClusteringBound == null && clusterings == null;
+        this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values);
+        return this;
+    }
+
+    public AbstractReadCommandBuilder toIncl(Object... values)
+    {
+        assert upperClusteringBound == null && clusterings == null;
+        this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values);
+        return this;
+    }
+
+    public AbstractReadCommandBuilder toExcl(Object... values)
+    {
+        assert upperClusteringBound == null && clusterings == null;
+        this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values);
+        return this;
+    }
+
+    public AbstractReadCommandBuilder includeRow(Object... values)
+    {
+        assert lowerClusteringBound == null && upperClusteringBound == null;
+
+        if (this.clusterings == null)
+            this.clusterings = new TreeSet<>(cfs.metadata.comparator);
+
+        this.clusterings.add(cfs.metadata.comparator.make(values));
+        return this;
+    }
+
+    public AbstractReadCommandBuilder reverse()
+    {
+        this.reversed = true;
+        return this;
+    }
+
+    public AbstractReadCommandBuilder withLimit(int newLimit)
+    {
+        this.cqlLimit = newLimit;
+        return this;
+    }
+
+    public AbstractReadCommandBuilder withPagingLimit(int newLimit)
+    {
+        this.pagingLimit = newLimit;
+        return this;
+    }
+
+    public AbstractReadCommandBuilder columns(String... columns)
+    {
+        if (this.columns == null)
+            this.columns = new HashSet<>();
+
+        for (String column : columns)
+            this.columns.add(ColumnIdentifier.getInterned(column, true));
+        return this;
+    }
+
+    private ByteBuffer bb(Object value, AbstractType<?> type)
+    {
+        return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value);
+    }
+
+    private AbstractType<?> forValues(AbstractType<?> collectionType)
+    {
+        assert collectionType instanceof CollectionType;
+        CollectionType ct = (CollectionType)collectionType;
+        switch (ct.kind)
+        {
+            case LIST:
+            case MAP:
+                return ct.valueComparator();
+            case SET:
+                return ct.nameComparator();
+        }
+        throw new AssertionError();
+    }
+
+    private AbstractType<?> forKeys(AbstractType<?> collectionType)
+    {
+        assert collectionType instanceof CollectionType;
+        CollectionType ct = (CollectionType)collectionType;
+        switch (ct.kind)
+        {
+            case LIST:
+            case MAP:
+                return ct.nameComparator();
+        }
+        throw new AssertionError();
+    }
+
+    public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value)
+    {
+        ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true));
+        assert def != null;
+
+        AbstractType<?> type = def.type;
+        if (op == Operator.CONTAINS)
+            type = forValues(type);
+        else if (op == Operator.CONTAINS_KEY)
+            type = forKeys(type);
+
+        this.filter.add(def, op, bb(value, type));
+        return this;
+    }
+
+    protected ColumnFilter makeColumnFilter()
+    {
+        if (columns == null || columns.isEmpty())
+            return ColumnFilter.all(cfs.metadata);
+
+        ColumnFilter.Builder filter = ColumnFilter.selectionBuilder();
+        for (ColumnIdentifier column : columns)
+            filter.add(cfs.metadata.getColumnDefinition(column));
+        return filter.build();
+    }
+
+    protected ClusteringIndexFilter makeFilter()
+    {
+        if (clusterings != null)
+        {
+            return new ClusteringIndexNamesFilter(clusterings, reversed);
+        }
+        else
+        {
+            Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound,
+                                     upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound);
+            return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed);
+        }
+    }
+
+    protected DataLimits makeLimits()
+    {
+        DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit);
+        if (pagingLimit >= 0)
+            limits = limits.forPaging(pagingLimit);
+        return limits;
+    }
+
+    public abstract ReadCommand build();
+
+    public static class SinglePartitionBuilder extends AbstractReadCommandBuilder
+    {
+        private final DecoratedKey partitionKey;
+
+        public SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key)
+        {
+            super(cfs);
+            this.partitionKey = key;
+        }
+
+        @Override
+        public ReadCommand build()
+        {
+            return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
+        }
+    }
+
+    public static class SinglePartitionSliceBuilder extends AbstractReadCommandBuilder
+    {
+        private final DecoratedKey partitionKey;
+        private Slices.Builder sliceBuilder;
+
+        public SinglePartitionSliceBuilder(ColumnFamilyStore cfs, DecoratedKey key)
+        {
+            super(cfs);
+            this.partitionKey = key;
+            sliceBuilder = new Slices.Builder(cfs.getComparator());
+        }
+
+        public SinglePartitionSliceBuilder addSlice(Slice slice)
+        {
+            sliceBuilder.add(slice);
+            return this;
+        }
+
+        @Override
+        protected ClusteringIndexFilter makeFilter()
+        {
+            return new ClusteringIndexSliceFilter(sliceBuilder.build(), reversed);
+        }
+
+        @Override
+        public ReadCommand build()
+        {
+            return SinglePartitionSliceCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
+        }
+    }
+
+    public static class PartitionRangeBuilder extends AbstractReadCommandBuilder
+    {
+        private DecoratedKey startKey;
+        private boolean startInclusive;
+        private DecoratedKey endKey;
+        private boolean endInclusive;
+
+        public PartitionRangeBuilder(ColumnFamilyStore cfs)
+        {
+            super(cfs);
+        }
+
+        public PartitionRangeBuilder fromKeyIncl(Object... values)
+        {
+            assert startKey == null;
+            this.startInclusive = true;
+            this.startKey = makeKey(cfs.metadata, values);
+            return this;
+        }
+
+        public PartitionRangeBuilder fromKeyExcl(Object... values)
+        {
+            assert startKey == null;
+            this.startInclusive = false;
+            this.startKey = makeKey(cfs.metadata, values);
+            return this;
+        }
+
+        public PartitionRangeBuilder toKeyIncl(Object... values)
+        {
+            assert endKey == null;
+            this.endInclusive = true;
+            this.endKey = makeKey(cfs.metadata, values);
+            return this;
+        }
+
+        public PartitionRangeBuilder toKeyExcl(Object... values)
+        {
+            assert endKey == null;
+            this.endInclusive = false;
+            this.endKey = makeKey(cfs.metadata, values);
+            return this;
+        }
+
+        @Override
+        public ReadCommand build()
+        {
+            PartitionPosition start = startKey;
+            if (start == null)
+            {
+                start = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+                startInclusive = false;
+            }
+            PartitionPosition end = endKey;
+            if (end == null)
+            {
+                end = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+                endInclusive = true;
+            }
+
+            AbstractBounds<PartitionPosition> bounds;
+            if (startInclusive && endInclusive)
+                bounds = new Bounds<>(start, end);
+            else if (startInclusive && !endInclusive)
+                bounds = new IncludingExcludingBounds<>(start, end);
+            else if (!startInclusive && endInclusive)
+                bounds = new Range<>(start, end);
+            else
+                bounds = new ExcludingBounds<>(start, end);
+
+            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+        }
+
+        static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
+        {
+            if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
+                return (DecoratedKey)partitionKey[0];
+
+            ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
+            return StorageService.getPartitioner().decorateKey(key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6ac132c..24da365 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.view.MaterializedViewManager;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
@@ -159,6 +160,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
 
     public final SecondaryIndexManager indexManager;
+    public final MaterializedViewManager materializedViewManager;
 
     /* These are locally held copies to be changed from the config during runtime */
     private volatile DefaultInteger minCompactionThreshold;
@@ -195,6 +197,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         indexManager.reload();
 
+        materializedViewManager.reload();
         // If the CF comparator has changed, we need to change the memtable,
         // because the old one still aliases the previous comparator.
         if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator)
@@ -331,6 +334,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         this.partitioner = partitioner;
         this.directories = directories;
         this.indexManager = new SecondaryIndexManager(this);
+        this.materializedViewManager = new MaterializedViewManager(this);
         this.metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
@@ -451,6 +455,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         SystemKeyspace.removeTruncationRecord(metadata.cfId);
         data.dropSSTables();
         indexManager.invalidate();
+        materializedViewManager.invalidate();
 
         invalidateCaches();
     }
@@ -572,8 +577,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     // must be called after all sstables are loaded since row cache merges all row versions
-    public void initRowCache()
+    public void init()
     {
+        materializedViewManager.init();
+
         if (!isRowCacheEnabled())
             return;
 
@@ -1806,7 +1813,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     cfs.data.reset();
                     return null;
                 }
-            }, true);
+            }, true, false);
         }
     }
 
@@ -1834,19 +1841,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // flush the CF being truncated before forcing the new segment
             forceBlockingFlush();
 
+            materializedViewManager.forceBlockingFlush();
+
             // sleep a little to make sure that our truncatedAt comes after any sstable
             // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
         }
         else
         {
-            // just nuke the memtable data w/o writing to disk first
-            synchronized (data)
-            {
-                final Flush flush = new Flush(true);
-                flushExecutor.execute(flush);
-                postFlushExecutor.submit(flush.postFlush);
-            }
+            dumpMemtable();
+            materializedViewManager.dumpMemtables();
         }
 
         Runnable truncateRunnable = new Runnable()
@@ -1866,17 +1870,32 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 for (SecondaryIndex index : indexManager.getIndexes())
                     index.truncateBlocking(truncatedAt);
 
+                materializedViewManager.truncateBlocking(truncatedAt);
+
                 SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
                 logger.debug("cleaning out row cache");
                 invalidateCaches();
             }
         };
 
-        runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
+        runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
         logger.debug("truncate complete");
     }
 
-    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
+    /**
+     * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
+     */
+    public void dumpMemtable()
+    {
+        synchronized (data)
+        {
+            final Flush flush = new Flush(true);
+            flushExecutor.execute(flush);
+            postFlushExecutor.submit(flush.postFlush);
+        }
+    }
+
+    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
     {
         // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
         // and so we only run one major compaction at a time
@@ -1884,17 +1903,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
 
-            Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
-            for (ColumnFamilyStore cfs : selfWithIndexes)
+            Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
+                                                               ? Iterables.concat(concatWithIndexes(), materializedViewManager.allViewsCfs())
+                                                               : concatWithIndexes();
+
+            for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
                 cfs.getCompactionStrategyManager().pause();
             try
             {
                 // interrupt in-progress compactions
-                CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, interruptValidation);
-                CompactionManager.instance.waitForCessation(selfWithIndexes);
+                CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation);
+                CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs);
 
                 // doublecheck that we finished, instead of timing out
-                for (ColumnFamilyStore cfs : selfWithIndexes)
+                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
                 {
                     if (!cfs.getTracker().getCompacting().isEmpty())
                     {
@@ -1916,7 +1938,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
             finally
             {
-                for (ColumnFamilyStore cfs : selfWithIndexes)
+                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
                     cfs.getCompactionStrategyManager().resume();
             }
         }
@@ -1936,7 +1958,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
         };
 
-        return runWithCompactionsDisabled(callable, false);
+        return runWithCompactionsDisabled(callable, false, false);
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index f37ce66..78b593b 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -23,13 +23,15 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -37,14 +39,17 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.view.MaterializedViewManager;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.metrics.KeyspaceMetrics;
@@ -70,7 +75,10 @@ public class Keyspace
     }
 
     private volatile KeyspaceMetadata metadata;
-    public final OpOrder writeOrder = new OpOrder();
+
+    //OpOrder is defined globally since we need to order writes across
+    //Keyspaces in the case of MaterializedViews (batchlog of MV mutations)
+    public static final OpOrder writeOrder = new OpOrder();
 
     /* ColumnFamilyStore per column family */
     private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
@@ -122,7 +130,7 @@ public class Keyspace
 
                     // keyspace has to be constructed and in the cache before cacheRow can be called
                     for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
-                        cfs.initRowCache();
+                        cfs.init();
                 }
             }
         }
@@ -352,10 +360,14 @@ public class Keyspace
             // CFS being created for the first time, either on server startup or new CF being added.
             // We don't worry about races here; startup is safe, and adding multiple idential CFs
             // simultaneously is a "don't do that" scenario.
-            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+            ColumnFamilyStore newCfs = ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables);
+
+            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, newCfs);
             // CFS mbean instantiation will error out before we hit this, but in case that changes...
             if (oldCfs != null)
                 throw new IllegalStateException("added multiple mappings for cf id " + cfId);
+
+            newCfs.init();
         }
         else
         {
@@ -380,11 +392,41 @@ public class Keyspace
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
      */
-    public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+    public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes)
     {
         if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
             throw new RuntimeException("Testing write failures");
 
+        Lock lock = null;
+        boolean requiresViewUpdate = updateIndexes && MaterializedViewManager.updatesAffectView(Collections.singleton(mutation), false);
+
+        if (requiresViewUpdate)
+        {
+            lock = MaterializedViewManager.acquireLockFor(mutation.key().getKey());
+
+            if (lock == null)
+            {
+                if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+                {
+                    logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
+                    Tracing.trace("Could not acquire MV lock");
+                    throw new WriteTimeoutException(WriteType.MATERIALIZED_VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
+                }
+                else
+                {
+                    //This MV update can't happen right now. so rather than keep this thread busy
+                    // we will re-apply ourself to the queue and try again later
+                    StageManager.getStage(Stage.MUTATION).execute(() -> {
+                        if (writeCommitLog)
+                            mutation.apply();
+                        else
+                            mutation.applyUnsafe();
+                    });
+
+                    return;
+                }
+            }
+        }
         int nowInSec = FBUtilities.nowInSeconds();
         try (OpOrder.Group opGroup = writeOrder.start())
         {
@@ -401,10 +443,26 @@ public class Keyspace
                 ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId);
                 if (cfs == null)
                 {
-                    logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId);
+                    logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().cfId, upd.metadata().ksName, upd.metadata().cfName);
                     continue;
                 }
 
+                if (requiresViewUpdate)
+                {
+                    try
+                    {
+                        Tracing.trace("Create materialized view mutations from replica");
+                        cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(), upd);
+                    }
+                    catch (Exception e)
+                    {
+                        if (!(e instanceof WriteTimeoutException))
+                            logger.warn("Encountered exception when creating materialized view mutations", e);
+
+                        JVMStabilityInspector.inspectThrowable(e);
+                    }
+                }
+
                 Tracing.trace("Adding to {} memtable", upd.metadata().cfName);
                 SecondaryIndexManager.Updater updater = updateIndexes
                                                       ? cfs.indexManager.updaterFor(upd, opGroup, nowInSec)
@@ -412,6 +470,11 @@ public class Keyspace
                 cfs.apply(upd, updater, opGroup, replayPosition);
             }
         }
+        finally
+        {
+            if (lock != null)
+                lock.unlock();
+        }
     }
 
     public AbstractReplicationStrategy getReplicationStrategy()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 3d49ca6..ace114b 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -54,6 +54,8 @@ public class Mutation implements IMutation
     // map of column family id to mutations for that column family.
     private final Map<UUID, PartitionUpdate> modifications;
 
+    // Time at which this mutation was instantiated
+    public final long createdAt = System.currentTimeMillis();
     public Mutation(String keyspaceName, DecoratedKey key)
     {
         this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 3baa93e..640e45f 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -22,6 +22,7 @@ import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -47,10 +48,17 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
                 replyTo = InetAddress.getByAddress(from);
             }
 
+        try
+        {
             message.payload.apply();
             WriteResponse response = new WriteResponse();
             Tracing.trace("Enqueuing response to {}", replyTo);
             MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
+        }
+        catch (WriteTimeoutException wto)
+        {
+            Tracing.trace("Payload application resulted in WriteTimeout, not replying");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 7bfd552..7ea946b 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -101,6 +101,8 @@ public final class SystemKeyspace
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
     public static final String AVAILABLE_RANGES = "available_ranges";
+    public static final String MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS = "materializedviews_builds_in_progress";
+    public static final String BUILT_MATERIALIZEDVIEWS = "built_materializedviews";
 
     @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
     @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
@@ -261,6 +263,24 @@ public final class SystemKeyspace
                 + "ranges set<blob>,"
                 + "PRIMARY KEY ((keyspace_name)))");
 
+    public static final CFMetaData MaterializedViewsBuildsInProgress =
+        compile(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS,
+                "materialized views builds current progress",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "view_name text,"
+                + "last_token varchar,"
+                + "generation_number int,"
+                + "PRIMARY KEY ((keyspace_name), view_name))");
+
+    public static final CFMetaData BuiltMaterializedViews =
+    compile(BUILT_MATERIALIZEDVIEWS,
+            "built materialized views",
+            "CREATE TABLE \"%s\" ("
+            + "keyspace_name text,"
+            + "view_name text,"
+            + "PRIMARY KEY ((keyspace_name), view_name))");
+
     @Deprecated
     public static final CFMetaData LegacyKeyspaces =
         compile(LEGACY_KEYSPACES,
@@ -401,6 +421,8 @@ public final class SystemKeyspace
                          SSTableActivity,
                          SizeEstimates,
                          AvailableRanges,
+                         MaterializedViewsBuildsInProgress,
+                         BuiltMaterializedViews,
                          LegacyKeyspaces,
                          LegacyColumnfamilies,
                          LegacyColumns,
@@ -493,6 +515,82 @@ public final class SystemKeyspace
         return CompactionHistoryTabularData.from(queryResultSet);
     }
 
+    public static boolean isViewBuilt(String keyspaceName, String viewName)
+    {
+        String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?";
+        UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
+        return !result.isEmpty();
+    }
+
+    public static void setMaterializedViewBuilt(String keyspaceName, String viewName)
+    {
+        String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)";
+        executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
+        forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
+    }
+
+
+    public static void setMaterializedViewRemoved(String keyspaceName, String viewName)
+    {
+        String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ?";
+        executeInternal(String.format(buildReq, NAME, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
+        forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS);
+
+        String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ?";
+        executeInternal(String.format(builtReq, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
+        forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
+    }
+
+    public static void beginMaterializedViewBuild(String ksname, String viewName, int generationNumber)
+    {
+        executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS),
+                        ksname,
+                        viewName,
+                        generationNumber);
+    }
+
+    public static void finishMaterializedViewBuildStatus(String ksname, String viewName)
+    {
+        // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed
+        // materialized view build.
+        // If we flush the delete first, we'll have to restart from the beginning.
+        // Also, if the build succeeded, but the materialized view build failed, we will be able to skip the
+        // materialized view build check next boot.
+        setMaterializedViewBuilt(ksname, viewName);
+        forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
+        executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+        forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS);
+    }
+
+    public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token)
+    {
+        String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
+        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+        executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
+    }
+
+    public static Pair<Integer, Token> getMaterializedViewBuildStatus(String ksname, String viewName)
+    {
+        String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
+        UntypedResultSet queryResultSet = executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+        if (queryResultSet == null || queryResultSet.isEmpty())
+            return null;
+
+        UntypedResultSet.Row row = queryResultSet.one();
+
+        Integer generation = null;
+        Token lastKey = null;
+        if (row.has("generation_number"))
+            generation = row.getInt("generation_number");
+        if (row.has("last_key"))
+        {
+            Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+            lastKey = factory.fromString(row.getString("last_key"));
+        }
+
+        return Pair.create(generation, lastKey);
+    }
+
     public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/WriteType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java
index 4f4c88d..20fb6a9 100644
--- a/src/java/org/apache/cassandra/db/WriteType.java
+++ b/src/java/org/apache/cassandra/db/WriteType.java
@@ -24,5 +24,6 @@ public enum WriteType
     UNLOGGED_BATCH,
     COUNTER,
     BATCH_LOG,
-    CAS;
+    CAS,
+    MATERIALIZED_VIEW;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bf412d8..3dd6f38 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
+import org.apache.cassandra.db.view.MaterializedViewBuilder;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
@@ -1365,6 +1366,31 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    public Future<?> submitMaterializedViewBuilder(final MaterializedViewBuilder builder)
+    {
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                metrics.beginCompaction(builder);
+                try
+                {
+                    builder.run();
+                }
+                finally
+                {
+                    metrics.finishCompaction(builder);
+                }
+            }
+        };
+        if (executor.isShutdown())
+        {
+            logger.info("Compaction executor has shut down, not submitting index build");
+            return null;
+        }
+
+        return executor.submit(runnable);
+    }
     public int getActiveCompactions()
     {
         return CompactionMetrics.getCompactions().size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 766eb1b..5e15f33 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -426,7 +426,7 @@ public class CompactionStrategyManager implements INotificationConsumer
                     return tasks;
                 }
             }
-        }, false);
+        }, false, false);
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 5b6ce05..f8f016c 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -35,7 +35,8 @@ public enum OperationType
     VERIFY("Verify"),
     FLUSH("Flush"),
     STREAM("Stream"),
-    WRITE("Write");
+    WRITE("Write"),
+    VIEW_BUILD("Materialized view build");
 
     public final String type;
     public final String fileName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
new file mode 100644
index 0000000..082c71d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.statements.CFProperties;
+import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
+import org.apache.cassandra.db.CBuilder;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeBackedRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.service.pager.QueryPager;
+
+/**
+ * A Materialized View copies data from a base table into a view table which can be queried independently from the
+ * base. Every update which targets the base table must be fed through the {@link MaterializedViewManager} to ensure
+ * that if a view needs to be updated, the updates are properly created and fed into the view.
+ *
+ * This class does the job of translating the base row to the view row.
+ *
+ * It handles reading existing state and figuring out what tombstones need to be generated.
+ *
+ * createMutations below is the "main method"
+ *
+ */
+public class MaterializedView
+{
+    /**
+     * The columns should all be updated together, so we use this object as group.
+     */
+    private static class MVColumns
+    {
+        //These are the base column definitions in terms of the *views* partitioning.
+        //Meaning we can see (for example) the partition key of the view contains a clustering key
+        //from the base table.
+        public final List<ColumnDefinition> partitionDefs;
+        public final List<ColumnDefinition> primaryKeyDefs;
+        public final List<ColumnDefinition> baseComplexColumns;
+
+        private MVColumns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
+        {
+            this.partitionDefs = partitionDefs;
+            this.primaryKeyDefs = primaryKeyDefs;
+            this.baseComplexColumns = baseComplexColumns;
+        }
+    }
+
+    public final String name;
+
+    private final ColumnFamilyStore baseCfs;
+    private ColumnFamilyStore _viewCfs = null;
+
+    private MVColumns columns;
+
+    private final boolean viewHasAllPrimaryKeys;
+    private final boolean includeAll;
+    private MaterializedViewBuilder builder;
+
+    public MaterializedView(MaterializedViewDefinition definition,
+                            ColumnFamilyStore baseCfs)
+    {
+        this.baseCfs = baseCfs;
+
+        name = definition.viewName;
+        includeAll = definition.includeAll;
+
+        viewHasAllPrimaryKeys = updateDefinition(definition);
+    }
+
+    /**
+     * Lazily fetch the CFS instance for the view.
+     * We do this lazily to avoid initilization issues.
+     *
+     * @return The views CFS instance
+     */
+    public ColumnFamilyStore getViewCfs()
+    {
+        if (_viewCfs == null)
+            _viewCfs = Keyspace.openAndGetStore(Schema.instance.getCFMetaData(baseCfs.keyspace.getName(), name));
+
+        return _viewCfs;
+    }
+
+
+    /**
+     * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
+     *
+     * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
+     * tombstone checks.
+     *
+     * @param columns a list of columns to lookup in the base table
+     * @param definitions lists to populate for the base table definitions
+     * @return true if all view PKs are also Base PKs
+     */
+    private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
+    {
+        boolean allArePrimaryKeys = true;
+        for (ColumnIdentifier identifier : columns)
+        {
+            ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
+            assert cdef != null : "Could not resolve column " + identifier.toString();
+
+            for (List<ColumnDefinition> list : definitions)
+            {
+                list.add(cdef);
+            }
+
+            allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
+        }
+
+        return allArePrimaryKeys;
+    }
+
+    /**
+     * This updates the columns stored which are dependent on the base CFMetaData.
+     *
+     * @return true if the view contains only columns which are part of the base's primary key; false if there is at
+     *         least one column which is not.
+     */
+    public boolean updateDefinition(MaterializedViewDefinition definition)
+    {
+        List<ColumnDefinition> partitionDefs = new ArrayList<>(definition.partitionColumns.size());
+        List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(definition.partitionColumns.size()
+                                                                + definition.clusteringColumns.size());
+        List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
+
+        // We only add the partition columns to the partitions list, but both partition columns and clustering
+        // columns are added to the primary keys list
+        boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(definition.partitionColumns, primaryKeyDefs, partitionDefs);
+        boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(definition.clusteringColumns, primaryKeyDefs);
+
+        for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
+        {
+            if (cdef.isComplex())
+            {
+                baseComplexColumns.add(cdef);
+            }
+        }
+
+        this.columns = new MVColumns(partitionDefs, primaryKeyDefs, baseComplexColumns);
+
+        return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
+    }
+
+    /**
+     * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
+     * <ul>
+     *     <li>View selects all columns</li>
+     *     <li>Update contains any range tombstones</li>
+     *     <li>Update touches one of the columns included in the view</li>
+     * </ul>
+     *
+     * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
+     * currently included in the view.
+     *
+     * @return true if {@param partition} modifies a column included in the view
+     */
+    public boolean updateAffectsView(AbstractThreadUnsafePartition partition)
+    {
+        // If we are including all of the columns, then any update will be included
+        if (includeAll)
+            return true;
+
+        // If there are range tombstones, tombstones will also need to be generated for the materialized view
+        // This requires a query of the base rows and generating tombstones for all of those values
+        if (!partition.deletionInfo().isLive())
+            return true;
+
+        // Check whether the update touches any of the columns included in the view
+        for (Row row : partition)
+        {
+            for (ColumnData data : row)
+            {
+                if (getViewCfs().metadata.getColumnDefinition(data.column().name) != null)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Creates the clustering columns for the view based on the specified row and resolver policy
+     *
+     * @param temporalRow The current row
+     * @param resolver The policy to use when selecting versions of cells use
+     * @return The clustering object to use for the view
+     */
+    private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+    {
+        CFMetaData viewCfm = getViewCfs().metadata;
+        int numViewClustering = viewCfm.clusteringColumns().size();
+        CBuilder clustering = CBuilder.create(getViewCfs().getComparator());
+        for (int i = 0; i < numViewClustering; i++)
+        {
+            ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
+            clustering.add(temporalRow.clusteringValue(definition, resolver));
+        }
+
+        return clustering.build();
+    }
+
+    /**
+     * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
+     */
+    private PartitionUpdate createTombstone(TemporalRow temporalRow,
+                                            DecoratedKey partitionKey,
+                                            DeletionTime deletionTime,
+                                            TemporalRow.Resolver resolver,
+                                            int nowInSec)
+    {
+        CFMetaData viewCfm = getViewCfs().metadata;
+        Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
+        builder.newRow(viewClustering(temporalRow, resolver));
+        builder.addRowDeletion(deletionTime);
+        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+    }
+
+    /**
+     * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
+     */
+    private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
+                                                   DecoratedKey partitionKey,
+                                                   ColumnDefinition deletedColumn,
+                                                   DeletionTime deletionTime,
+                                                   TemporalRow.Resolver resolver,
+                                                   int nowInSec)
+    {
+
+        CFMetaData viewCfm = getViewCfs().metadata;
+        Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
+        builder.newRow(viewClustering(temporalRow, resolver));
+        builder.addComplexDeletion(deletedColumn, deletionTime);
+        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+    }
+
+    /**
+     * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
+     *         the TemporalRow and its Resolver
+     */
+    private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+    {
+        List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
+        Object[] partitionKey = new Object[partitionDefs.size()];
+
+        for (int i = 0; i < partitionKey.length; i++)
+        {
+            ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
+
+            if (value == null)
+                return null;
+
+            partitionKey[i] = value;
+        }
+
+        return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata
+                                                                                     .getKeyValidatorAsClusteringComparator()
+                                                                                     .make(partitionKey)));
+    }
+
+    /**
+     * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
+     * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
+     * mutation is necessary
+     */
+    private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
+    {
+        // Primary Key and Clustering columns do not generate tombstones
+        if (viewHasAllPrimaryKeys)
+            return null;
+
+        boolean hasUpdate = false;
+        List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
+        for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
+        {
+            if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
+                hasUpdate = true;
+        }
+
+        if (!hasUpdate)
+            return null;
+
+        TemporalRow.Resolver resolver = TemporalRow.earliest;
+        return createTombstone(temporalRow,
+                               viewPartitionKey(temporalRow, resolver),
+                               new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec),
+                               resolver,
+                               temporalRow.nowInSec);
+    }
+
+    /**
+     * @return Mutation which is the transformed base table mutation for the materialized view.
+     */
+    private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
+    {
+        TemporalRow.Resolver resolver = TemporalRow.latest;
+
+        DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
+        ColumnFamilyStore viewCfs = getViewCfs();
+
+        if (partitionKey == null)
+        {
+            // Not having a partition key means we aren't updating anything
+            return null;
+        }
+
+        Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec);
+
+        CBuilder clustering = CBuilder.create(viewCfs.getComparator());
+        for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++)
+        {
+            clustering.add(temporalRow.clusteringValue(viewCfs.metadata.clusteringColumns().get(i), resolver));
+        }
+        regularBuilder.newRow(clustering.build());
+        regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfs.metadata,
+                                                                     temporalRow.viewClusteringTimestamp(),
+                                                                     temporalRow.viewClusteringTtl(),
+                                                                     temporalRow.viewClusteringLocalDeletionTime()));
+
+        for (ColumnDefinition columnDefinition : viewCfs.metadata.allColumns())
+        {
+            if (columnDefinition.isPrimaryKeyColumn())
+                continue;
+
+            for (Cell cell : temporalRow.values(columnDefinition, resolver))
+            {
+                regularBuilder.addCell(cell);
+            }
+        }
+
+        return PartitionUpdate.singleRowUpdate(viewCfs.metadata, partitionKey, regularBuilder.build());
+    }
+
+    /**
+     * @param partition Update which possibly contains deletion info for which to generate view tombstones.
+     * @return    View Tombstones which delete all of the rows which have been removed from the base table with
+     *            {@param partition}
+     */
+    private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition)
+    {
+        final TemporalRow.Resolver resolver = TemporalRow.earliest;
+
+        DeletionInfo deletionInfo = partition.deletionInfo();
+
+        List<Mutation> mutations = new ArrayList<>();
+
+        // Check the complex columns to see if there are any which may have tombstones we need to create for the view
+        if (!columns.baseComplexColumns.isEmpty())
+        {
+            for (Row row : partition)
+            {
+                if (!row.hasComplexDeletion())
+                    continue;
+
+                TemporalRow temporalRow = rowSet.getClustering(row.clustering());
+
+                assert temporalRow != null;
+
+                for (ColumnDefinition definition : columns.baseComplexColumns)
+                {
+                    ComplexColumnData columnData = row.getComplexColumnData(definition);
+
+                    if (columnData != null)
+                    {
+                        DeletionTime time = columnData.complexDeletion();
+                        if (!time.isLive())
+                        {
+                            DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
+                            if (targetKey != null)
+                                mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
+                        }
+                    }
+                }
+            }
+        }
+
+        ReadCommand command = null;
+
+        if (!deletionInfo.isLive())
+        {
+            // We have to generate tombstones for all of the affected rows, but we don't have the information in order
+            // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
+            // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
+            // entire partition of data which is not distributed on a single partition node.
+            DecoratedKey dk = rowSet.dk;
+
+            if (deletionInfo.hasRanges())
+            {
+                SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
+                Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
+                while (tombstones.hasNext())
+                {
+                    RangeTombstone tombstone = tombstones.next();
+
+                    builder.addSlice(tombstone.deletedSlice());
+                }
+
+                command = builder.build();
+            }
+            else
+            {
+                command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
+            }
+        }
+
+        if (command == null)
+        {
+            SinglePartitionSliceBuilder builder = null;
+            for (Row row : partition)
+            {
+                if (!row.deletion().isLive())
+                {
+                    if (builder == null)
+                        builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
+                    builder.addSlice(Slice.make(row.clustering()));
+                }
+            }
+
+            if (builder != null)
+                command = builder.build();
+        }
+
+        if (command != null)
+        {
+            QueryPager pager = command.getPager(null);
+
+            // Add all of the rows which were recovered from the query to the row set
+            while (!pager.isExhausted())
+            {
+                try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+                     PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+                {
+                    if (!iter.hasNext())
+                        break;
+
+                    try (RowIterator rowIterator = iter.next())
+                    {
+                        while (rowIterator.hasNext())
+                        {
+                            Row row = rowIterator.next();
+                            rowSet.addRow(row, false);
+                        }
+                    }
+                }
+            }
+
+            // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
+            // for the view.
+            for (TemporalRow temporalRow : rowSet)
+            {
+                DeletionTime deletionTime = temporalRow.deletionTime(partition);
+                if (!deletionTime.isLive())
+                {
+                    DecoratedKey value = viewPartitionKey(temporalRow, resolver);
+                    if (value != null)
+                    {
+                        PartitionUpdate update = createTombstone(temporalRow, value, deletionTime, resolver, temporalRow.nowInSec);
+                        if (update != null)
+                            mutations.add(new Mutation(update));
+                    }
+                }
+            }
+        }
+
+        return !mutations.isEmpty() ? mutations : null;
+    }
+
+    /**
+     * Read and update temporal rows in the set which have corresponding values stored on the local node
+     */
+    private void readLocalRows(TemporalRow.Set rowSet)
+    {
+        SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
+
+        for (TemporalRow temporalRow : rowSet)
+            builder.addSlice(temporalRow.baseSlice());
+
+        QueryPager pager = builder.build().getPager(null);
+
+        while (!pager.isExhausted())
+        {
+            try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+                 PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+            {
+                while (iter.hasNext())
+                {
+                    try (RowIterator rows = iter.next())
+                    {
+                        while (rows.hasNext())
+                        {
+                            rowSet.addRow(rows.next(), false);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Set of rows which are contained in the partition update {@param partition}
+     */
+    private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition)
+    {
+        Set<ColumnIdentifier> columns = new HashSet<>();
+        for (ColumnDefinition def : this.columns.primaryKeyDefs)
+            columns.add(def.name);
+
+        TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key);
+        for (Row row : partition)
+            rowSet.addRow(row, true);
+
+        return rowSet;
+    }
+
+    /**
+     * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
+     *                   since all of the update will already be present in the base table.
+     * @return View mutations which represent the changes necessary as long as previously created mutations for the view
+     *         have been applied successfully. This is based solely on the changes that are necessary given the current
+     *         state of the base table and the newly applying partition data.
+     */
+    public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding)
+    {
+        if (!updateAffectsView(partition))
+            return null;
+
+        TemporalRow.Set rowSet = separateRows(key, partition);
+
+        // If we are building the view, we do not want to add old values; they will always be the same
+        if (!isBuilding)
+            readLocalRows(rowSet);
+
+        Collection<Mutation> mutations = null;
+        for (TemporalRow temporalRow : rowSet)
+        {
+            // If we are building, there is no need to check for partition tombstones; those values will not be present
+            // in the partition data
+            if (!isBuilding)
+            {
+                PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
+                if (partitionTombstone != null)
+                {
+                    if (mutations == null) mutations = new LinkedList<>();
+                    mutations.add(new Mutation(partitionTombstone));
+                }
+            }
+
+            PartitionUpdate insert = createUpdatesForInserts(temporalRow);
+            if (insert != null)
+            {
+                if (mutations == null) mutations = new LinkedList<>();
+                mutations.add(new Mutation(insert));
+            }
+        }
+
+        if (!isBuilding)
+        {
+            Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
+            if (deletion != null && !deletion.isEmpty())
+            {
+                if (mutations == null) mutations = new LinkedList<>();
+                mutations.addAll(deletion);
+            }
+        }
+
+        return mutations;
+    }
+
+    public synchronized void build()
+    {
+        if (this.builder != null)
+        {
+            this.builder.stop();
+            this.builder = null;
+        }
+
+        this.builder = new MaterializedViewBuilder(baseCfs, this);
+        CompactionManager.instance.submitMaterializedViewBuilder(builder);
+    }
+
+    /**
+     * @return CFMetaData which represents the definition given
+     */
+    public static CFMetaData getCFMetaData(MaterializedViewDefinition definition,
+                                           CFMetaData baseCf,
+                                           CFProperties properties)
+    {
+        CFMetaData.Builder viewBuilder = CFMetaData.Builder
+                                         .createView(baseCf.ksName, definition.viewName);
+
+        ColumnDefinition nonPkTarget = null;
+
+        for (ColumnIdentifier targetIdentifier : definition.partitionColumns)
+        {
+            ColumnDefinition target = baseCf.getColumnDefinition(targetIdentifier);
+            if (!target.isPartitionKey())
+                nonPkTarget = target;
+
+            viewBuilder.addPartitionKey(target.name, properties.getReversableType(targetIdentifier, target.type));
+        }
+
+        Collection<ColumnDefinition> included = new ArrayList<>();
+        for(ColumnIdentifier identifier : definition.included)
+        {
+            ColumnDefinition cfDef = baseCf.getColumnDefinition(identifier);
+            assert cfDef != null;
+            included.add(cfDef);
+        }
+
+        boolean includeAll = included.isEmpty();
+
+        for (ColumnIdentifier ident : definition.clusteringColumns)
+        {
+            ColumnDefinition column = baseCf.getColumnDefinition(ident);
+            viewBuilder.addClusteringColumn(ident, properties.getReversableType(ident, column.type));
+        }
+
+        for (ColumnDefinition column : baseCf.partitionColumns().regulars.columns)
+        {
+            if (column != nonPkTarget && (includeAll || included.contains(column)))
+            {
+                viewBuilder.addRegularColumn(column.name, column.type);
+            }
+        }
+
+        //Add any extra clustering columns
+        for (ColumnDefinition column : Iterables.concat(baseCf.partitionKeyColumns(), baseCf.clusteringColumns()))
+        {
+            if ( (!definition.partitionColumns.contains(column.name) && !definition.clusteringColumns.contains(column.name)) &&
+                 (includeAll || included.contains(column)) )
+            {
+                viewBuilder.addRegularColumn(column.name, column.type);
+            }
+        }
+
+        CFMetaData cfm = viewBuilder.build();
+        properties.properties.applyToCFMetadata(cfm);
+
+        return cfm;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
new file mode 100644
index 0000000..e8842ed
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class MaterializedViewBuilder extends CompactionInfo.Holder
+{
+    private final ColumnFamilyStore baseCfs;
+    private final MaterializedView view;
+    private final UUID compactionId;
+    private volatile Token prevToken = null;
+
+    private static final Logger logger = LoggerFactory.getLogger(MaterializedViewBuilder.class);
+
+    private volatile boolean isStopped = false;
+
+    public MaterializedViewBuilder(ColumnFamilyStore baseCfs, MaterializedView view)
+    {
+        this.baseCfs = baseCfs;
+        this.view = view;
+        compactionId = UUIDGen.getTimeUUID();
+    }
+
+    private void buildKey(DecoratedKey key)
+    {
+        QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
+
+        while (!pager.isExhausted())
+        {
+           try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+                PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
+           {
+               if (!partitionIterator.hasNext())
+                   return;
+
+               try (RowIterator rowIterator = partitionIterator.next())
+               {
+                   Collection<Mutation> mutations = view.createMutations(key.getKey(), FilteredPartition.create(rowIterator), true);
+
+                   if (mutations != null)
+                   {
+                       try
+                       {
+                           StorageProxy.mutateMV(key.getKey(), mutations);
+                           break;
+                       }
+                       catch (WriteTimeoutException ex)
+                       {
+                           NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES)
+                                       .warn("Encountered write timeout when building materialized view {}, the entries were stored in the batchlog and will be replayed at another time", view.name);
+                       }
+                   }
+               }
+           }
+        }
+    }
+
+    public void run()
+    {
+        String ksname = baseCfs.metadata.ksName, viewName = view.name;
+
+        if (SystemKeyspace.isViewBuilt(ksname, viewName))
+            return;
+
+        Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+        final Pair<Integer, Token> buildStatus = SystemKeyspace.getMaterializedViewBuildStatus(ksname, viewName);
+        Token lastToken;
+        Function<View, Iterable<SSTableReader>> function;
+        if (buildStatus == null)
+        {
+            baseCfs.forceBlockingFlush();
+            function = View.select(SSTableSet.CANONICAL);
+            int generation = Integer.MIN_VALUE;
+
+            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
+            {
+                for (SSTableReader reader : temp)
+                {
+                    generation = Math.max(reader.descriptor.generation, generation);
+                }
+            }
+
+            SystemKeyspace.beginMaterializedViewBuild(ksname, viewName, generation);
+            lastToken = null;
+        }
+        else
+        {
+            function = new Function<View, Iterable<SSTableReader>>()
+            {
+                @Nullable
+                public Iterable<SSTableReader> apply(View view)
+                {
+                    Iterable<SSTableReader> readers = View.select(SSTableSet.CANONICAL).apply(view);
+                    if (readers != null)
+                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
+                    return null;
+                }
+            };
+            lastToken = buildStatus.right;
+        }
+
+        prevToken = lastToken;
+        try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
+             ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
+        {
+            while (!isStopped && iter.hasNext())
+            {
+                DecoratedKey key = iter.next();
+                Token token = key.getToken();
+                if (lastToken == null || lastToken.compareTo(token) < 0)
+                {
+                    for (Range<Token> range : ranges)
+                    {
+                        if (range.contains(token))
+                        {
+                            buildKey(key);
+
+                            if (prevToken == null || prevToken.compareTo(token) != 0)
+                            {
+                                SystemKeyspace.updateMaterializedViewBuildStatus(ksname, viewName, key.getToken());
+                                prevToken = token;
+                            }
+                        }
+                    }
+                    lastToken = null;
+                }
+            }
+
+            SystemKeyspace.finishMaterializedViewBuildStatus(ksname, viewName);
+
+        }
+        catch (Exception e)
+        {
+            final MaterializedViewBuilder builder = new MaterializedViewBuilder(baseCfs, view);
+            ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitMaterializedViewBuilder(builder),
+                                                         5,
+                                                         TimeUnit.MINUTES);
+            logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
+        }
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        long rangesLeft = 0, rangesTotal = 0;
+        Token lastToken = prevToken;
+
+        // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
+        // percentage of a range covered by a second range, this is the best approximation that we can calculate.
+        // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
+        // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
+        // has.
+        for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
+        {
+            rangesLeft++;
+            rangesTotal++;
+            // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
+            // end of the method.
+            if (lastToken == null || range.contains(lastToken))
+                rangesLeft = 0;
+        }
+        return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
+    }
+
+    public void stop()
+    {
+        isStopped = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
new file mode 100644
index 0000000..7f97728
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * Manages {@link MaterializedView}'s for a single {@link ColumnFamilyStore}. All of the materialized views for that
+ * table are created when this manager is initialized.
+ *
+ * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
+ * any views {@link MaterializedViewManager#updateAffectsView(PartitionUpdate)}, provide locks to prevent multiple
+ * updates from creating incoherent updates in the view {@link MaterializedViewManager#acquireLockFor(ByteBuffer)}, and
+ * to affect change on the view.
+ */
+public class MaterializedViewManager
+{
+    private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024);
+
+    private final ConcurrentNavigableMap<String, MaterializedView> viewsByName;
+
+    private final ColumnFamilyStore baseCfs;
+
+    public MaterializedViewManager(ColumnFamilyStore baseCfs)
+    {
+        this.viewsByName = new ConcurrentSkipListMap<>();
+
+        this.baseCfs = baseCfs;
+    }
+
+    public Iterable<MaterializedView> allViews()
+    {
+        return viewsByName.values();
+    }
+
+    public Iterable<ColumnFamilyStore> allViewsCfs()
+    {
+        List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
+        for (MaterializedView view : allViews())
+            viewColumnFamilies.add(view.getViewCfs());
+        return viewColumnFamilies;
+    }
+
+    public void init()
+    {
+        reload();
+    }
+
+    public void invalidate()
+    {
+        for (MaterializedView view : allViews())
+            removeMaterializedView(view.name);
+    }
+
+    public void reload()
+    {
+        Map<String, MaterializedViewDefinition> newViewsByName = new HashMap<>();
+        for (MaterializedViewDefinition definition : baseCfs.metadata.getMaterializedViews())
+        {
+            newViewsByName.put(definition.viewName, definition);
+        }
+
+        for (String viewName : viewsByName.keySet())
+        {
+            if (!newViewsByName.containsKey(viewName))
+                removeMaterializedView(viewName);
+        }
+
+        for (Map.Entry<String, MaterializedViewDefinition> entry : newViewsByName.entrySet())
+        {
+            if (!viewsByName.containsKey(entry.getKey()))
+                addMaterializedView(entry.getValue());
+        }
+
+        for (MaterializedView view : allViews())
+        {
+            view.build();
+            // We provide the new definition from the base metadata
+            view.updateDefinition(newViewsByName.get(view.name));
+        }
+    }
+
+    public void buildAllViews()
+    {
+        for (MaterializedView view : allViews())
+            view.build();
+    }
+
+    public void removeMaterializedView(String name)
+    {
+        MaterializedView view = viewsByName.remove(name);
+
+        if (view == null)
+            return;
+
+        SystemKeyspace.setMaterializedViewRemoved(baseCfs.metadata.ksName, view.name);
+    }
+
+    public void addMaterializedView(MaterializedViewDefinition definition)
+    {
+        MaterializedView view = new MaterializedView(definition, baseCfs);
+
+        viewsByName.put(definition.viewName, view);
+    }
+
+    /**
+     * Calculates and pushes updates to the views replicas. The replicas are determined by
+     * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
+     */
+    public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
+    {
+        // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the
+        // view node.
+        if (!StorageService.instance.isJoined()) return;
+
+        List<Mutation> mutations = null;
+        for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
+        {
+            Collection<Mutation> viewMutations = view.getValue().createMutations(key, update, false);
+            if (viewMutations != null && !viewMutations.isEmpty())
+            {
+                if (mutations == null)
+                    mutations = Lists.newLinkedList();
+                mutations.addAll(viewMutations);
+            }
+        }
+        if (mutations != null)
+        {
+            StorageProxy.mutateMV(key, mutations);
+        }
+    }
+
+    public boolean updateAffectsView(PartitionUpdate upd)
+    {
+        for (MaterializedView view : allViews())
+        {
+            if (view.updateAffectsView(upd))
+                return true;
+        }
+        return false;
+    }
+
+    public static Lock acquireLockFor(ByteBuffer key)
+    {
+        Lock lock = LOCKS.get(key);
+
+        if (lock.tryLock())
+            return lock;
+
+        return null;
+    }
+
+    public static boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean ignoreRf1)
+    {
+        for (IMutation mutation : mutations)
+        {
+            for (PartitionUpdate cf : mutation.getPartitionUpdates())
+            {
+                Keyspace keyspace = Keyspace.open(cf.metadata().ksName);
+
+                if (ignoreRf1 && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
+                    continue;
+
+                MaterializedViewManager viewManager = keyspace.getColumnFamilyStore(cf.metadata().cfId).materializedViewManager;
+                if (viewManager.updateAffectsView(cf))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+
+    public void forceBlockingFlush()
+    {
+        for (ColumnFamilyStore viewCfs : allViewsCfs())
+            viewCfs.forceBlockingFlush();
+    }
+
+    public void dumpMemtables()
+    {
+        for (ColumnFamilyStore viewCfs : allViewsCfs())
+            viewCfs.dumpMemtable();
+    }
+
+    public void truncateBlocking(long truncatedAt)
+    {
+        for (ColumnFamilyStore viewCfs : allViewsCfs())
+        {
+            ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
+            SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
+        }
+    }
+}