You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2015/07/30 00:19:47 UTC

[5/6] cassandra git commit: Revert "Materialized Views"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
deleted file mode 100644
index 2ddc6ca..0000000
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-public abstract class AbstractReadCommandBuilder
-{
-    protected final ColumnFamilyStore cfs;
-    protected int nowInSeconds;
-
-    private int cqlLimit = -1;
-    private int pagingLimit = -1;
-    protected boolean reversed = false;
-
-    protected Set<ColumnIdentifier> columns;
-    protected final RowFilter filter = RowFilter.create();
-
-    private Slice.Bound lowerClusteringBound;
-    private Slice.Bound upperClusteringBound;
-
-    private NavigableSet<Clustering> clusterings;
-
-    // Use Util.cmd() instead of this ctor directly
-    AbstractReadCommandBuilder(ColumnFamilyStore cfs)
-    {
-        this.cfs = cfs;
-        this.nowInSeconds = FBUtilities.nowInSeconds();
-    }
-
-    public AbstractReadCommandBuilder withNowInSeconds(int nowInSec)
-    {
-        this.nowInSeconds = nowInSec;
-        return this;
-    }
-
-    public AbstractReadCommandBuilder fromIncl(Object... values)
-    {
-        assert lowerClusteringBound == null && clusterings == null;
-        this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values);
-        return this;
-    }
-
-    public AbstractReadCommandBuilder fromExcl(Object... values)
-    {
-        assert lowerClusteringBound == null && clusterings == null;
-        this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values);
-        return this;
-    }
-
-    public AbstractReadCommandBuilder toIncl(Object... values)
-    {
-        assert upperClusteringBound == null && clusterings == null;
-        this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values);
-        return this;
-    }
-
-    public AbstractReadCommandBuilder toExcl(Object... values)
-    {
-        assert upperClusteringBound == null && clusterings == null;
-        this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values);
-        return this;
-    }
-
-    public AbstractReadCommandBuilder includeRow(Object... values)
-    {
-        assert lowerClusteringBound == null && upperClusteringBound == null;
-
-        if (this.clusterings == null)
-            this.clusterings = new TreeSet<>(cfs.metadata.comparator);
-
-        this.clusterings.add(cfs.metadata.comparator.make(values));
-        return this;
-    }
-
-    public AbstractReadCommandBuilder reverse()
-    {
-        this.reversed = true;
-        return this;
-    }
-
-    public AbstractReadCommandBuilder withLimit(int newLimit)
-    {
-        this.cqlLimit = newLimit;
-        return this;
-    }
-
-    public AbstractReadCommandBuilder withPagingLimit(int newLimit)
-    {
-        this.pagingLimit = newLimit;
-        return this;
-    }
-
-    public AbstractReadCommandBuilder columns(String... columns)
-    {
-        if (this.columns == null)
-            this.columns = new HashSet<>();
-
-        for (String column : columns)
-            this.columns.add(ColumnIdentifier.getInterned(column, true));
-        return this;
-    }
-
-    private ByteBuffer bb(Object value, AbstractType<?> type)
-    {
-        return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value);
-    }
-
-    private AbstractType<?> forValues(AbstractType<?> collectionType)
-    {
-        assert collectionType instanceof CollectionType;
-        CollectionType ct = (CollectionType)collectionType;
-        switch (ct.kind)
-        {
-            case LIST:
-            case MAP:
-                return ct.valueComparator();
-            case SET:
-                return ct.nameComparator();
-        }
-        throw new AssertionError();
-    }
-
-    private AbstractType<?> forKeys(AbstractType<?> collectionType)
-    {
-        assert collectionType instanceof CollectionType;
-        CollectionType ct = (CollectionType)collectionType;
-        switch (ct.kind)
-        {
-            case LIST:
-            case MAP:
-                return ct.nameComparator();
-        }
-        throw new AssertionError();
-    }
-
-    public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value)
-    {
-        ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true));
-        assert def != null;
-
-        AbstractType<?> type = def.type;
-        if (op == Operator.CONTAINS)
-            type = forValues(type);
-        else if (op == Operator.CONTAINS_KEY)
-            type = forKeys(type);
-
-        this.filter.add(def, op, bb(value, type));
-        return this;
-    }
-
-    protected ColumnFilter makeColumnFilter()
-    {
-        if (columns == null || columns.isEmpty())
-            return ColumnFilter.all(cfs.metadata);
-
-        ColumnFilter.Builder filter = ColumnFilter.selectionBuilder();
-        for (ColumnIdentifier column : columns)
-            filter.add(cfs.metadata.getColumnDefinition(column));
-        return filter.build();
-    }
-
-    protected ClusteringIndexFilter makeFilter()
-    {
-        if (clusterings != null)
-        {
-            return new ClusteringIndexNamesFilter(clusterings, reversed);
-        }
-        else
-        {
-            Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound,
-                                     upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound);
-            return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed);
-        }
-    }
-
-    protected DataLimits makeLimits()
-    {
-        DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit);
-        if (pagingLimit >= 0)
-            limits = limits.forPaging(pagingLimit);
-        return limits;
-    }
-
-    public abstract ReadCommand build();
-
-    public static class SinglePartitionBuilder extends AbstractReadCommandBuilder
-    {
-        private final DecoratedKey partitionKey;
-
-        public SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key)
-        {
-            super(cfs);
-            this.partitionKey = key;
-        }
-
-        @Override
-        public ReadCommand build()
-        {
-            return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
-        }
-    }
-
-    public static class SinglePartitionSliceBuilder extends AbstractReadCommandBuilder
-    {
-        private final DecoratedKey partitionKey;
-        private Slices.Builder sliceBuilder;
-
-        public SinglePartitionSliceBuilder(ColumnFamilyStore cfs, DecoratedKey key)
-        {
-            super(cfs);
-            this.partitionKey = key;
-            sliceBuilder = new Slices.Builder(cfs.getComparator());
-        }
-
-        public SinglePartitionSliceBuilder addSlice(Slice slice)
-        {
-            sliceBuilder.add(slice);
-            return this;
-        }
-
-        @Override
-        protected ClusteringIndexFilter makeFilter()
-        {
-            return new ClusteringIndexSliceFilter(sliceBuilder.build(), reversed);
-        }
-
-        @Override
-        public ReadCommand build()
-        {
-            return SinglePartitionSliceCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
-        }
-    }
-
-    public static class PartitionRangeBuilder extends AbstractReadCommandBuilder
-    {
-        private DecoratedKey startKey;
-        private boolean startInclusive;
-        private DecoratedKey endKey;
-        private boolean endInclusive;
-
-        public PartitionRangeBuilder(ColumnFamilyStore cfs)
-        {
-            super(cfs);
-        }
-
-        public PartitionRangeBuilder fromKeyIncl(Object... values)
-        {
-            assert startKey == null;
-            this.startInclusive = true;
-            this.startKey = makeKey(cfs.metadata, values);
-            return this;
-        }
-
-        public PartitionRangeBuilder fromKeyExcl(Object... values)
-        {
-            assert startKey == null;
-            this.startInclusive = false;
-            this.startKey = makeKey(cfs.metadata, values);
-            return this;
-        }
-
-        public PartitionRangeBuilder toKeyIncl(Object... values)
-        {
-            assert endKey == null;
-            this.endInclusive = true;
-            this.endKey = makeKey(cfs.metadata, values);
-            return this;
-        }
-
-        public PartitionRangeBuilder toKeyExcl(Object... values)
-        {
-            assert endKey == null;
-            this.endInclusive = false;
-            this.endKey = makeKey(cfs.metadata, values);
-            return this;
-        }
-
-        @Override
-        public ReadCommand build()
-        {
-            PartitionPosition start = startKey;
-            if (start == null)
-            {
-                start = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
-                startInclusive = false;
-            }
-            PartitionPosition end = endKey;
-            if (end == null)
-            {
-                end = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
-                endInclusive = true;
-            }
-
-            AbstractBounds<PartitionPosition> bounds;
-            if (startInclusive && endInclusive)
-                bounds = new Bounds<>(start, end);
-            else if (startInclusive && !endInclusive)
-                bounds = new IncludingExcludingBounds<>(start, end);
-            else if (!startInclusive && endInclusive)
-                bounds = new Range<>(start, end);
-            else
-                bounds = new ExcludingBounds<>(start, end);
-
-            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
-        }
-
-        static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
-        {
-            if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
-                return (DecoratedKey)partitionKey[0];
-
-            ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
-            return StorageService.getPartitioner().decorateKey(key);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 24da365..6ac132c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -53,7 +53,6 @@ import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.view.MaterializedViewManager;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
@@ -160,7 +159,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
 
     public final SecondaryIndexManager indexManager;
-    public final MaterializedViewManager materializedViewManager;
 
     /* These are locally held copies to be changed from the config during runtime */
     private volatile DefaultInteger minCompactionThreshold;
@@ -197,7 +195,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         indexManager.reload();
 
-        materializedViewManager.reload();
         // If the CF comparator has changed, we need to change the memtable,
         // because the old one still aliases the previous comparator.
         if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator)
@@ -334,7 +331,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         this.partitioner = partitioner;
         this.directories = directories;
         this.indexManager = new SecondaryIndexManager(this);
-        this.materializedViewManager = new MaterializedViewManager(this);
         this.metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
@@ -455,7 +451,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         SystemKeyspace.removeTruncationRecord(metadata.cfId);
         data.dropSSTables();
         indexManager.invalidate();
-        materializedViewManager.invalidate();
 
         invalidateCaches();
     }
@@ -577,10 +572,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     // must be called after all sstables are loaded since row cache merges all row versions
-    public void init()
+    public void initRowCache()
     {
-        materializedViewManager.init();
-
         if (!isRowCacheEnabled())
             return;
 
@@ -1813,7 +1806,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     cfs.data.reset();
                     return null;
                 }
-            }, true, false);
+            }, true);
         }
     }
 
@@ -1841,16 +1834,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // flush the CF being truncated before forcing the new segment
             forceBlockingFlush();
 
-            materializedViewManager.forceBlockingFlush();
-
             // sleep a little to make sure that our truncatedAt comes after any sstable
             // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
         }
         else
         {
-            dumpMemtable();
-            materializedViewManager.dumpMemtables();
+            // just nuke the memtable data w/o writing to disk first
+            synchronized (data)
+            {
+                final Flush flush = new Flush(true);
+                flushExecutor.execute(flush);
+                postFlushExecutor.submit(flush.postFlush);
+            }
         }
 
         Runnable truncateRunnable = new Runnable()
@@ -1870,32 +1866,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 for (SecondaryIndex index : indexManager.getIndexes())
                     index.truncateBlocking(truncatedAt);
 
-                materializedViewManager.truncateBlocking(truncatedAt);
-
                 SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
                 logger.debug("cleaning out row cache");
                 invalidateCaches();
             }
         };
 
-        runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
+        runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
         logger.debug("truncate complete");
     }
 
-    /**
-     * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
-     */
-    public void dumpMemtable()
-    {
-        synchronized (data)
-        {
-            final Flush flush = new Flush(true);
-            flushExecutor.execute(flush);
-            postFlushExecutor.submit(flush.postFlush);
-        }
-    }
-
-    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
+    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
     {
         // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
         // and so we only run one major compaction at a time
@@ -1903,20 +1884,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
 
-            Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
-                                                               ? Iterables.concat(concatWithIndexes(), materializedViewManager.allViewsCfs())
-                                                               : concatWithIndexes();
-
-            for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+            Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
+            for (ColumnFamilyStore cfs : selfWithIndexes)
                 cfs.getCompactionStrategyManager().pause();
             try
             {
                 // interrupt in-progress compactions
-                CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation);
-                CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs);
+                CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, interruptValidation);
+                CompactionManager.instance.waitForCessation(selfWithIndexes);
 
                 // doublecheck that we finished, instead of timing out
-                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+                for (ColumnFamilyStore cfs : selfWithIndexes)
                 {
                     if (!cfs.getTracker().getCompacting().isEmpty())
                     {
@@ -1938,7 +1916,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
             finally
             {
-                for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+                for (ColumnFamilyStore cfs : selfWithIndexes)
                     cfs.getCompactionStrategyManager().resume();
             }
         }
@@ -1958,7 +1936,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
         };
 
-        return runWithCompactionsDisabled(callable, false, false);
+        return runWithCompactionsDisabled(callable, false);
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 78b593b..f37ce66 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -23,15 +23,13 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -39,17 +37,14 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.view.MaterializedViewManager;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.metrics.KeyspaceMetrics;
@@ -75,10 +70,7 @@ public class Keyspace
     }
 
     private volatile KeyspaceMetadata metadata;
-
-    //OpOrder is defined globally since we need to order writes across
-    //Keyspaces in the case of MaterializedViews (batchlog of MV mutations)
-    public static final OpOrder writeOrder = new OpOrder();
+    public final OpOrder writeOrder = new OpOrder();
 
     /* ColumnFamilyStore per column family */
     private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
@@ -130,7 +122,7 @@ public class Keyspace
 
                     // keyspace has to be constructed and in the cache before cacheRow can be called
                     for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
-                        cfs.init();
+                        cfs.initRowCache();
                 }
             }
         }
@@ -360,14 +352,10 @@ public class Keyspace
             // CFS being created for the first time, either on server startup or new CF being added.
             // We don't worry about races here; startup is safe, and adding multiple idential CFs
             // simultaneously is a "don't do that" scenario.
-            ColumnFamilyStore newCfs = ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables);
-
-            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, newCfs);
+            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
             // CFS mbean instantiation will error out before we hit this, but in case that changes...
             if (oldCfs != null)
                 throw new IllegalStateException("added multiple mappings for cf id " + cfId);
-
-            newCfs.init();
         }
         else
         {
@@ -392,41 +380,11 @@ public class Keyspace
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
      */
-    public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes)
+    public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
         if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
             throw new RuntimeException("Testing write failures");
 
-        Lock lock = null;
-        boolean requiresViewUpdate = updateIndexes && MaterializedViewManager.updatesAffectView(Collections.singleton(mutation), false);
-
-        if (requiresViewUpdate)
-        {
-            lock = MaterializedViewManager.acquireLockFor(mutation.key().getKey());
-
-            if (lock == null)
-            {
-                if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
-                {
-                    logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
-                    Tracing.trace("Could not acquire MV lock");
-                    throw new WriteTimeoutException(WriteType.MATERIALIZED_VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
-                }
-                else
-                {
-                    //This MV update can't happen right now. so rather than keep this thread busy
-                    // we will re-apply ourself to the queue and try again later
-                    StageManager.getStage(Stage.MUTATION).execute(() -> {
-                        if (writeCommitLog)
-                            mutation.apply();
-                        else
-                            mutation.applyUnsafe();
-                    });
-
-                    return;
-                }
-            }
-        }
         int nowInSec = FBUtilities.nowInSeconds();
         try (OpOrder.Group opGroup = writeOrder.start())
         {
@@ -443,26 +401,10 @@ public class Keyspace
                 ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId);
                 if (cfs == null)
                 {
-                    logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().cfId, upd.metadata().ksName, upd.metadata().cfName);
+                    logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId);
                     continue;
                 }
 
-                if (requiresViewUpdate)
-                {
-                    try
-                    {
-                        Tracing.trace("Create materialized view mutations from replica");
-                        cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(), upd);
-                    }
-                    catch (Exception e)
-                    {
-                        if (!(e instanceof WriteTimeoutException))
-                            logger.warn("Encountered exception when creating materialized view mutations", e);
-
-                        JVMStabilityInspector.inspectThrowable(e);
-                    }
-                }
-
                 Tracing.trace("Adding to {} memtable", upd.metadata().cfName);
                 SecondaryIndexManager.Updater updater = updateIndexes
                                                       ? cfs.indexManager.updaterFor(upd, opGroup, nowInSec)
@@ -470,11 +412,6 @@ public class Keyspace
                 cfs.apply(upd, updater, opGroup, replayPosition);
             }
         }
-        finally
-        {
-            if (lock != null)
-                lock.unlock();
-        }
     }
 
     public AbstractReplicationStrategy getReplicationStrategy()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index ace114b..3d49ca6 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -54,8 +54,6 @@ public class Mutation implements IMutation
     // map of column family id to mutations for that column family.
     private final Map<UUID, PartitionUpdate> modifications;
 
-    // Time at which this mutation was instantiated
-    public final long createdAt = System.currentTimeMillis();
     public Mutation(String keyspaceName, DecoratedKey key)
     {
         this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 640e45f..3baa93e 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -22,7 +22,6 @@ import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 
-import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -48,17 +47,10 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
                 replyTo = InetAddress.getByAddress(from);
             }
 
-        try
-        {
             message.payload.apply();
             WriteResponse response = new WriteResponse();
             Tracing.trace("Enqueuing response to {}", replyTo);
             MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
-        }
-        catch (WriteTimeoutException wto)
-        {
-            Tracing.trace("Payload application resulted in WriteTimeout, not replying");
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 7ea946b..7bfd552 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -101,8 +101,6 @@ public final class SystemKeyspace
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
     public static final String AVAILABLE_RANGES = "available_ranges";
-    public static final String MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS = "materializedviews_builds_in_progress";
-    public static final String BUILT_MATERIALIZEDVIEWS = "built_materializedviews";
 
     @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
     @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
@@ -263,24 +261,6 @@ public final class SystemKeyspace
                 + "ranges set<blob>,"
                 + "PRIMARY KEY ((keyspace_name)))");
 
-    public static final CFMetaData MaterializedViewsBuildsInProgress =
-        compile(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS,
-                "materialized views builds current progress",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "view_name text,"
-                + "last_token varchar,"
-                + "generation_number int,"
-                + "PRIMARY KEY ((keyspace_name), view_name))");
-
-    public static final CFMetaData BuiltMaterializedViews =
-    compile(BUILT_MATERIALIZEDVIEWS,
-            "built materialized views",
-            "CREATE TABLE \"%s\" ("
-            + "keyspace_name text,"
-            + "view_name text,"
-            + "PRIMARY KEY ((keyspace_name), view_name))");
-
     @Deprecated
     public static final CFMetaData LegacyKeyspaces =
         compile(LEGACY_KEYSPACES,
@@ -421,8 +401,6 @@ public final class SystemKeyspace
                          SSTableActivity,
                          SizeEstimates,
                          AvailableRanges,
-                         MaterializedViewsBuildsInProgress,
-                         BuiltMaterializedViews,
                          LegacyKeyspaces,
                          LegacyColumnfamilies,
                          LegacyColumns,
@@ -515,82 +493,6 @@ public final class SystemKeyspace
         return CompactionHistoryTabularData.from(queryResultSet);
     }
 
-    public static boolean isViewBuilt(String keyspaceName, String viewName)
-    {
-        String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?";
-        UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
-        return !result.isEmpty();
-    }
-
-    public static void setMaterializedViewBuilt(String keyspaceName, String viewName)
-    {
-        String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)";
-        executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
-        forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
-    }
-
-
-    public static void setMaterializedViewRemoved(String keyspaceName, String viewName)
-    {
-        String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ?";
-        executeInternal(String.format(buildReq, NAME, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
-        forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS);
-
-        String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ?";
-        executeInternal(String.format(builtReq, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
-        forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
-    }
-
-    public static void beginMaterializedViewBuild(String ksname, String viewName, int generationNumber)
-    {
-        executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS),
-                        ksname,
-                        viewName,
-                        generationNumber);
-    }
-
-    public static void finishMaterializedViewBuildStatus(String ksname, String viewName)
-    {
-        // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed
-        // materialized view build.
-        // If we flush the delete first, we'll have to restart from the beginning.
-        // Also, if the build succeeded, but the materialized view build failed, we will be able to skip the
-        // materialized view build check next boot.
-        setMaterializedViewBuilt(ksname, viewName);
-        forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
-        executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
-        forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS);
-    }
-
-    public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token)
-    {
-        String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
-        Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
-        executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
-    }
-
-    public static Pair<Integer, Token> getMaterializedViewBuildStatus(String ksname, String viewName)
-    {
-        String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
-        UntypedResultSet queryResultSet = executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
-        if (queryResultSet == null || queryResultSet.isEmpty())
-            return null;
-
-        UntypedResultSet.Row row = queryResultSet.one();
-
-        Integer generation = null;
-        Token lastKey = null;
-        if (row.has("generation_number"))
-            generation = row.getInt("generation_number");
-        if (row.has("last_key"))
-        {
-            Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
-            lastKey = factory.fromString(row.getString("last_key"));
-        }
-
-        return Pair.create(generation, lastKey);
-    }
-
     public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/WriteType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java
index 20fb6a9..4f4c88d 100644
--- a/src/java/org/apache/cassandra/db/WriteType.java
+++ b/src/java/org/apache/cassandra/db/WriteType.java
@@ -24,6 +24,5 @@ public enum WriteType
     UNLOGGED_BATCH,
     COUNTER,
     BATCH_LOG,
-    CAS,
-    MATERIALIZED_VIEW;
+    CAS;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3dd6f38..bf412d8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -58,7 +58,6 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
-import org.apache.cassandra.db.view.MaterializedViewBuilder;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
@@ -1366,31 +1365,6 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    public Future<?> submitMaterializedViewBuilder(final MaterializedViewBuilder builder)
-    {
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                metrics.beginCompaction(builder);
-                try
-                {
-                    builder.run();
-                }
-                finally
-                {
-                    metrics.finishCompaction(builder);
-                }
-            }
-        };
-        if (executor.isShutdown())
-        {
-            logger.info("Compaction executor has shut down, not submitting index build");
-            return null;
-        }
-
-        return executor.submit(runnable);
-    }
     public int getActiveCompactions()
     {
         return CompactionMetrics.getCompactions().size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 5e15f33..766eb1b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -426,7 +426,7 @@ public class CompactionStrategyManager implements INotificationConsumer
                     return tasks;
                 }
             }
-        }, false, false);
+        }, false);
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index f8f016c..5b6ce05 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -35,8 +35,7 @@ public enum OperationType
     VERIFY("Verify"),
     FLUSH("Flush"),
     STREAM("Stream"),
-    WRITE("Write"),
-    VIEW_BUILD("Materialized view build");
+    WRITE("Write");
 
     public final String type;
     public final String fileName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
deleted file mode 100644
index 082c71d..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ /dev/null
@@ -1,691 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.statements.CFProperties;
-import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
-import org.apache.cassandra.db.CBuilder;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeBackedRow;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.ColumnData;
-import org.apache.cassandra.db.rows.ComplexColumnData;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.service.pager.QueryPager;
-
-/**
- * A Materialized View copies data from a base table into a view table which can be queried independently from the
- * base. Every update which targets the base table must be fed through the {@link MaterializedViewManager} to ensure
- * that if a view needs to be updated, the updates are properly created and fed into the view.
- *
- * This class does the job of translating the base row to the view row.
- *
- * It handles reading existing state and figuring out what tombstones need to be generated.
- *
- * createMutations below is the "main method"
- *
- */
-public class MaterializedView
-{
-    /**
-     * The columns should all be updated together, so we use this object as group.
-     */
-    private static class MVColumns
-    {
-        //These are the base column definitions in terms of the *views* partitioning.
-        //Meaning we can see (for example) the partition key of the view contains a clustering key
-        //from the base table.
-        public final List<ColumnDefinition> partitionDefs;
-        public final List<ColumnDefinition> primaryKeyDefs;
-        public final List<ColumnDefinition> baseComplexColumns;
-
-        private MVColumns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
-        {
-            this.partitionDefs = partitionDefs;
-            this.primaryKeyDefs = primaryKeyDefs;
-            this.baseComplexColumns = baseComplexColumns;
-        }
-    }
-
-    public final String name;
-
-    private final ColumnFamilyStore baseCfs;
-    private ColumnFamilyStore _viewCfs = null;
-
-    private MVColumns columns;
-
-    private final boolean viewHasAllPrimaryKeys;
-    private final boolean includeAll;
-    private MaterializedViewBuilder builder;
-
-    public MaterializedView(MaterializedViewDefinition definition,
-                            ColumnFamilyStore baseCfs)
-    {
-        this.baseCfs = baseCfs;
-
-        name = definition.viewName;
-        includeAll = definition.includeAll;
-
-        viewHasAllPrimaryKeys = updateDefinition(definition);
-    }
-
-    /**
-     * Lazily fetch the CFS instance for the view.
-     * We do this lazily to avoid initilization issues.
-     *
-     * @return The views CFS instance
-     */
-    public ColumnFamilyStore getViewCfs()
-    {
-        if (_viewCfs == null)
-            _viewCfs = Keyspace.openAndGetStore(Schema.instance.getCFMetaData(baseCfs.keyspace.getName(), name));
-
-        return _viewCfs;
-    }
-
-
-    /**
-     * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
-     *
-     * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
-     * tombstone checks.
-     *
-     * @param columns a list of columns to lookup in the base table
-     * @param definitions lists to populate for the base table definitions
-     * @return true if all view PKs are also Base PKs
-     */
-    private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
-    {
-        boolean allArePrimaryKeys = true;
-        for (ColumnIdentifier identifier : columns)
-        {
-            ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
-            assert cdef != null : "Could not resolve column " + identifier.toString();
-
-            for (List<ColumnDefinition> list : definitions)
-            {
-                list.add(cdef);
-            }
-
-            allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
-        }
-
-        return allArePrimaryKeys;
-    }
-
-    /**
-     * This updates the columns stored which are dependent on the base CFMetaData.
-     *
-     * @return true if the view contains only columns which are part of the base's primary key; false if there is at
-     *         least one column which is not.
-     */
-    public boolean updateDefinition(MaterializedViewDefinition definition)
-    {
-        List<ColumnDefinition> partitionDefs = new ArrayList<>(definition.partitionColumns.size());
-        List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(definition.partitionColumns.size()
-                                                                + definition.clusteringColumns.size());
-        List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
-
-        // We only add the partition columns to the partitions list, but both partition columns and clustering
-        // columns are added to the primary keys list
-        boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(definition.partitionColumns, primaryKeyDefs, partitionDefs);
-        boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(definition.clusteringColumns, primaryKeyDefs);
-
-        for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
-        {
-            if (cdef.isComplex())
-            {
-                baseComplexColumns.add(cdef);
-            }
-        }
-
-        this.columns = new MVColumns(partitionDefs, primaryKeyDefs, baseComplexColumns);
-
-        return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
-    }
-
-    /**
-     * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
-     * <ul>
-     *     <li>View selects all columns</li>
-     *     <li>Update contains any range tombstones</li>
-     *     <li>Update touches one of the columns included in the view</li>
-     * </ul>
-     *
-     * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
-     * currently included in the view.
-     *
-     * @return true if {@param partition} modifies a column included in the view
-     */
-    public boolean updateAffectsView(AbstractThreadUnsafePartition partition)
-    {
-        // If we are including all of the columns, then any update will be included
-        if (includeAll)
-            return true;
-
-        // If there are range tombstones, tombstones will also need to be generated for the materialized view
-        // This requires a query of the base rows and generating tombstones for all of those values
-        if (!partition.deletionInfo().isLive())
-            return true;
-
-        // Check whether the update touches any of the columns included in the view
-        for (Row row : partition)
-        {
-            for (ColumnData data : row)
-            {
-                if (getViewCfs().metadata.getColumnDefinition(data.column().name) != null)
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * Creates the clustering columns for the view based on the specified row and resolver policy
-     *
-     * @param temporalRow The current row
-     * @param resolver The policy to use when selecting versions of cells use
-     * @return The clustering object to use for the view
-     */
-    private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
-    {
-        CFMetaData viewCfm = getViewCfs().metadata;
-        int numViewClustering = viewCfm.clusteringColumns().size();
-        CBuilder clustering = CBuilder.create(getViewCfs().getComparator());
-        for (int i = 0; i < numViewClustering; i++)
-        {
-            ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
-            clustering.add(temporalRow.clusteringValue(definition, resolver));
-        }
-
-        return clustering.build();
-    }
-
-    /**
-     * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
-     */
-    private PartitionUpdate createTombstone(TemporalRow temporalRow,
-                                            DecoratedKey partitionKey,
-                                            DeletionTime deletionTime,
-                                            TemporalRow.Resolver resolver,
-                                            int nowInSec)
-    {
-        CFMetaData viewCfm = getViewCfs().metadata;
-        Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
-        builder.newRow(viewClustering(temporalRow, resolver));
-        builder.addRowDeletion(deletionTime);
-        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
-    }
-
-    /**
-     * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
-     */
-    private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
-                                                   DecoratedKey partitionKey,
-                                                   ColumnDefinition deletedColumn,
-                                                   DeletionTime deletionTime,
-                                                   TemporalRow.Resolver resolver,
-                                                   int nowInSec)
-    {
-
-        CFMetaData viewCfm = getViewCfs().metadata;
-        Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
-        builder.newRow(viewClustering(temporalRow, resolver));
-        builder.addComplexDeletion(deletedColumn, deletionTime);
-        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
-    }
-
-    /**
-     * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
-     *         the TemporalRow and its Resolver
-     */
-    private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
-    {
-        List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
-        Object[] partitionKey = new Object[partitionDefs.size()];
-
-        for (int i = 0; i < partitionKey.length; i++)
-        {
-            ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
-
-            if (value == null)
-                return null;
-
-            partitionKey[i] = value;
-        }
-
-        return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata
-                                                                                     .getKeyValidatorAsClusteringComparator()
-                                                                                     .make(partitionKey)));
-    }
-
-    /**
-     * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
-     * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
-     * mutation is necessary
-     */
-    private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
-    {
-        // Primary Key and Clustering columns do not generate tombstones
-        if (viewHasAllPrimaryKeys)
-            return null;
-
-        boolean hasUpdate = false;
-        List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
-        for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
-        {
-            if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
-                hasUpdate = true;
-        }
-
-        if (!hasUpdate)
-            return null;
-
-        TemporalRow.Resolver resolver = TemporalRow.earliest;
-        return createTombstone(temporalRow,
-                               viewPartitionKey(temporalRow, resolver),
-                               new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec),
-                               resolver,
-                               temporalRow.nowInSec);
-    }
-
-    /**
-     * @return Mutation which is the transformed base table mutation for the materialized view.
-     */
-    private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
-    {
-        TemporalRow.Resolver resolver = TemporalRow.latest;
-
-        DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
-        ColumnFamilyStore viewCfs = getViewCfs();
-
-        if (partitionKey == null)
-        {
-            // Not having a partition key means we aren't updating anything
-            return null;
-        }
-
-        Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec);
-
-        CBuilder clustering = CBuilder.create(viewCfs.getComparator());
-        for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++)
-        {
-            clustering.add(temporalRow.clusteringValue(viewCfs.metadata.clusteringColumns().get(i), resolver));
-        }
-        regularBuilder.newRow(clustering.build());
-        regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfs.metadata,
-                                                                     temporalRow.viewClusteringTimestamp(),
-                                                                     temporalRow.viewClusteringTtl(),
-                                                                     temporalRow.viewClusteringLocalDeletionTime()));
-
-        for (ColumnDefinition columnDefinition : viewCfs.metadata.allColumns())
-        {
-            if (columnDefinition.isPrimaryKeyColumn())
-                continue;
-
-            for (Cell cell : temporalRow.values(columnDefinition, resolver))
-            {
-                regularBuilder.addCell(cell);
-            }
-        }
-
-        return PartitionUpdate.singleRowUpdate(viewCfs.metadata, partitionKey, regularBuilder.build());
-    }
-
-    /**
-     * @param partition Update which possibly contains deletion info for which to generate view tombstones.
-     * @return    View Tombstones which delete all of the rows which have been removed from the base table with
-     *            {@param partition}
-     */
-    private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition)
-    {
-        final TemporalRow.Resolver resolver = TemporalRow.earliest;
-
-        DeletionInfo deletionInfo = partition.deletionInfo();
-
-        List<Mutation> mutations = new ArrayList<>();
-
-        // Check the complex columns to see if there are any which may have tombstones we need to create for the view
-        if (!columns.baseComplexColumns.isEmpty())
-        {
-            for (Row row : partition)
-            {
-                if (!row.hasComplexDeletion())
-                    continue;
-
-                TemporalRow temporalRow = rowSet.getClustering(row.clustering());
-
-                assert temporalRow != null;
-
-                for (ColumnDefinition definition : columns.baseComplexColumns)
-                {
-                    ComplexColumnData columnData = row.getComplexColumnData(definition);
-
-                    if (columnData != null)
-                    {
-                        DeletionTime time = columnData.complexDeletion();
-                        if (!time.isLive())
-                        {
-                            DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
-                            if (targetKey != null)
-                                mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
-                        }
-                    }
-                }
-            }
-        }
-
-        ReadCommand command = null;
-
-        if (!deletionInfo.isLive())
-        {
-            // We have to generate tombstones for all of the affected rows, but we don't have the information in order
-            // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
-            // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
-            // entire partition of data which is not distributed on a single partition node.
-            DecoratedKey dk = rowSet.dk;
-
-            if (deletionInfo.hasRanges())
-            {
-                SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
-                Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
-                while (tombstones.hasNext())
-                {
-                    RangeTombstone tombstone = tombstones.next();
-
-                    builder.addSlice(tombstone.deletedSlice());
-                }
-
-                command = builder.build();
-            }
-            else
-            {
-                command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
-            }
-        }
-
-        if (command == null)
-        {
-            SinglePartitionSliceBuilder builder = null;
-            for (Row row : partition)
-            {
-                if (!row.deletion().isLive())
-                {
-                    if (builder == null)
-                        builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
-                    builder.addSlice(Slice.make(row.clustering()));
-                }
-            }
-
-            if (builder != null)
-                command = builder.build();
-        }
-
-        if (command != null)
-        {
-            QueryPager pager = command.getPager(null);
-
-            // Add all of the rows which were recovered from the query to the row set
-            while (!pager.isExhausted())
-            {
-                try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                     PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
-                {
-                    if (!iter.hasNext())
-                        break;
-
-                    try (RowIterator rowIterator = iter.next())
-                    {
-                        while (rowIterator.hasNext())
-                        {
-                            Row row = rowIterator.next();
-                            rowSet.addRow(row, false);
-                        }
-                    }
-                }
-            }
-
-            // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
-            // for the view.
-            for (TemporalRow temporalRow : rowSet)
-            {
-                DeletionTime deletionTime = temporalRow.deletionTime(partition);
-                if (!deletionTime.isLive())
-                {
-                    DecoratedKey value = viewPartitionKey(temporalRow, resolver);
-                    if (value != null)
-                    {
-                        PartitionUpdate update = createTombstone(temporalRow, value, deletionTime, resolver, temporalRow.nowInSec);
-                        if (update != null)
-                            mutations.add(new Mutation(update));
-                    }
-                }
-            }
-        }
-
-        return !mutations.isEmpty() ? mutations : null;
-    }
-
-    /**
-     * Read and update temporal rows in the set which have corresponding values stored on the local node
-     */
-    private void readLocalRows(TemporalRow.Set rowSet)
-    {
-        SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
-
-        for (TemporalRow temporalRow : rowSet)
-            builder.addSlice(temporalRow.baseSlice());
-
-        QueryPager pager = builder.build().getPager(null);
-
-        while (!pager.isExhausted())
-        {
-            try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                 PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
-            {
-                while (iter.hasNext())
-                {
-                    try (RowIterator rows = iter.next())
-                    {
-                        while (rows.hasNext())
-                        {
-                            rowSet.addRow(rows.next(), false);
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * @return Set of rows which are contained in the partition update {@param partition}
-     */
-    private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition)
-    {
-        Set<ColumnIdentifier> columns = new HashSet<>();
-        for (ColumnDefinition def : this.columns.primaryKeyDefs)
-            columns.add(def.name);
-
-        TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key);
-        for (Row row : partition)
-            rowSet.addRow(row, true);
-
-        return rowSet;
-    }
-
-    /**
-     * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
-     *                   since all of the update will already be present in the base table.
-     * @return View mutations which represent the changes necessary as long as previously created mutations for the view
-     *         have been applied successfully. This is based solely on the changes that are necessary given the current
-     *         state of the base table and the newly applying partition data.
-     */
-    public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding)
-    {
-        if (!updateAffectsView(partition))
-            return null;
-
-        TemporalRow.Set rowSet = separateRows(key, partition);
-
-        // If we are building the view, we do not want to add old values; they will always be the same
-        if (!isBuilding)
-            readLocalRows(rowSet);
-
-        Collection<Mutation> mutations = null;
-        for (TemporalRow temporalRow : rowSet)
-        {
-            // If we are building, there is no need to check for partition tombstones; those values will not be present
-            // in the partition data
-            if (!isBuilding)
-            {
-                PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
-                if (partitionTombstone != null)
-                {
-                    if (mutations == null) mutations = new LinkedList<>();
-                    mutations.add(new Mutation(partitionTombstone));
-                }
-            }
-
-            PartitionUpdate insert = createUpdatesForInserts(temporalRow);
-            if (insert != null)
-            {
-                if (mutations == null) mutations = new LinkedList<>();
-                mutations.add(new Mutation(insert));
-            }
-        }
-
-        if (!isBuilding)
-        {
-            Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
-            if (deletion != null && !deletion.isEmpty())
-            {
-                if (mutations == null) mutations = new LinkedList<>();
-                mutations.addAll(deletion);
-            }
-        }
-
-        return mutations;
-    }
-
-    public synchronized void build()
-    {
-        if (this.builder != null)
-        {
-            this.builder.stop();
-            this.builder = null;
-        }
-
-        this.builder = new MaterializedViewBuilder(baseCfs, this);
-        CompactionManager.instance.submitMaterializedViewBuilder(builder);
-    }
-
-    /**
-     * @return CFMetaData which represents the definition given
-     */
-    public static CFMetaData getCFMetaData(MaterializedViewDefinition definition,
-                                           CFMetaData baseCf,
-                                           CFProperties properties)
-    {
-        CFMetaData.Builder viewBuilder = CFMetaData.Builder
-                                         .createView(baseCf.ksName, definition.viewName);
-
-        ColumnDefinition nonPkTarget = null;
-
-        for (ColumnIdentifier targetIdentifier : definition.partitionColumns)
-        {
-            ColumnDefinition target = baseCf.getColumnDefinition(targetIdentifier);
-            if (!target.isPartitionKey())
-                nonPkTarget = target;
-
-            viewBuilder.addPartitionKey(target.name, properties.getReversableType(targetIdentifier, target.type));
-        }
-
-        Collection<ColumnDefinition> included = new ArrayList<>();
-        for(ColumnIdentifier identifier : definition.included)
-        {
-            ColumnDefinition cfDef = baseCf.getColumnDefinition(identifier);
-            assert cfDef != null;
-            included.add(cfDef);
-        }
-
-        boolean includeAll = included.isEmpty();
-
-        for (ColumnIdentifier ident : definition.clusteringColumns)
-        {
-            ColumnDefinition column = baseCf.getColumnDefinition(ident);
-            viewBuilder.addClusteringColumn(ident, properties.getReversableType(ident, column.type));
-        }
-
-        for (ColumnDefinition column : baseCf.partitionColumns().regulars.columns)
-        {
-            if (column != nonPkTarget && (includeAll || included.contains(column)))
-            {
-                viewBuilder.addRegularColumn(column.name, column.type);
-            }
-        }
-
-        //Add any extra clustering columns
-        for (ColumnDefinition column : Iterables.concat(baseCf.partitionKeyColumns(), baseCf.clusteringColumns()))
-        {
-            if ( (!definition.partitionColumns.contains(column.name) && !definition.clusteringColumns.contains(column.name)) &&
-                 (includeAll || included.contains(column)) )
-            {
-                viewBuilder.addRegularColumn(column.name, column.type);
-            }
-        }
-
-        CFMetaData cfm = viewBuilder.build();
-        properties.properties.applyToCFMetadata(cfm);
-
-        return cfm;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
deleted file mode 100644
index e8842ed..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.partitions.FilteredPartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-public class MaterializedViewBuilder extends CompactionInfo.Holder
-{
-    private final ColumnFamilyStore baseCfs;
-    private final MaterializedView view;
-    private final UUID compactionId;
-    private volatile Token prevToken = null;
-
-    private static final Logger logger = LoggerFactory.getLogger(MaterializedViewBuilder.class);
-
-    private volatile boolean isStopped = false;
-
-    public MaterializedViewBuilder(ColumnFamilyStore baseCfs, MaterializedView view)
-    {
-        this.baseCfs = baseCfs;
-        this.view = view;
-        compactionId = UUIDGen.getTimeUUID();
-    }
-
-    private void buildKey(DecoratedKey key)
-    {
-        QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
-
-        while (!pager.isExhausted())
-        {
-           try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
-           {
-               if (!partitionIterator.hasNext())
-                   return;
-
-               try (RowIterator rowIterator = partitionIterator.next())
-               {
-                   Collection<Mutation> mutations = view.createMutations(key.getKey(), FilteredPartition.create(rowIterator), true);
-
-                   if (mutations != null)
-                   {
-                       try
-                       {
-                           StorageProxy.mutateMV(key.getKey(), mutations);
-                           break;
-                       }
-                       catch (WriteTimeoutException ex)
-                       {
-                           NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES)
-                                       .warn("Encountered write timeout when building materialized view {}, the entries were stored in the batchlog and will be replayed at another time", view.name);
-                       }
-                   }
-               }
-           }
-        }
-    }
-
-    public void run()
-    {
-        String ksname = baseCfs.metadata.ksName, viewName = view.name;
-
-        if (SystemKeyspace.isViewBuilt(ksname, viewName))
-            return;
-
-        Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
-        final Pair<Integer, Token> buildStatus = SystemKeyspace.getMaterializedViewBuildStatus(ksname, viewName);
-        Token lastToken;
-        Function<View, Iterable<SSTableReader>> function;
-        if (buildStatus == null)
-        {
-            baseCfs.forceBlockingFlush();
-            function = View.select(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, generation);
-                }
-            }
-
-            SystemKeyspace.beginMaterializedViewBuild(ksname, viewName, generation);
-            lastToken = null;
-        }
-        else
-        {
-            function = new Function<View, Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> apply(View view)
-                {
-                    Iterable<SSTableReader> readers = View.select(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
-            lastToken = buildStatus.right;
-        }
-
-        prevToken = lastToken;
-        try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
-             ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
-        {
-            while (!isStopped && iter.hasNext())
-            {
-                DecoratedKey key = iter.next();
-                Token token = key.getToken();
-                if (lastToken == null || lastToken.compareTo(token) < 0)
-                {
-                    for (Range<Token> range : ranges)
-                    {
-                        if (range.contains(token))
-                        {
-                            buildKey(key);
-
-                            if (prevToken == null || prevToken.compareTo(token) != 0)
-                            {
-                                SystemKeyspace.updateMaterializedViewBuildStatus(ksname, viewName, key.getToken());
-                                prevToken = token;
-                            }
-                        }
-                    }
-                    lastToken = null;
-                }
-            }
-
-            SystemKeyspace.finishMaterializedViewBuildStatus(ksname, viewName);
-
-        }
-        catch (Exception e)
-        {
-            final MaterializedViewBuilder builder = new MaterializedViewBuilder(baseCfs, view);
-            ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitMaterializedViewBuilder(builder),
-                                                         5,
-                                                         TimeUnit.MINUTES);
-            logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
-        }
-    }
-
-    public CompactionInfo getCompactionInfo()
-    {
-        long rangesLeft = 0, rangesTotal = 0;
-        Token lastToken = prevToken;
-
-        // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
-        // percentage of a range covered by a second range, this is the best approximation that we can calculate.
-        // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
-        // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
-        // has.
-        for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
-        {
-            rangesLeft++;
-            rangesTotal++;
-            // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
-            // end of the method.
-            if (lastToken == null || range.contains(lastToken))
-                rangesLeft = 0;
-        }
-        return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
-    }
-
-    public void stop()
-    {
-        isStopped = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
deleted file mode 100644
index 7f97728..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.Lock;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Striped;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-
-/**
- * Manages {@link MaterializedView}'s for a single {@link ColumnFamilyStore}. All of the materialized views for that
- * table are created when this manager is initialized.
- *
- * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
- * any views {@link MaterializedViewManager#updateAffectsView(PartitionUpdate)}, provide locks to prevent multiple
- * updates from creating incoherent updates in the view {@link MaterializedViewManager#acquireLockFor(ByteBuffer)}, and
- * to affect change on the view.
- */
-public class MaterializedViewManager
-{
-    private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024);
-
-    private final ConcurrentNavigableMap<String, MaterializedView> viewsByName;
-
-    private final ColumnFamilyStore baseCfs;
-
-    public MaterializedViewManager(ColumnFamilyStore baseCfs)
-    {
-        this.viewsByName = new ConcurrentSkipListMap<>();
-
-        this.baseCfs = baseCfs;
-    }
-
-    public Iterable<MaterializedView> allViews()
-    {
-        return viewsByName.values();
-    }
-
-    public Iterable<ColumnFamilyStore> allViewsCfs()
-    {
-        List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
-        for (MaterializedView view : allViews())
-            viewColumnFamilies.add(view.getViewCfs());
-        return viewColumnFamilies;
-    }
-
-    public void init()
-    {
-        reload();
-    }
-
-    public void invalidate()
-    {
-        for (MaterializedView view : allViews())
-            removeMaterializedView(view.name);
-    }
-
-    public void reload()
-    {
-        Map<String, MaterializedViewDefinition> newViewsByName = new HashMap<>();
-        for (MaterializedViewDefinition definition : baseCfs.metadata.getMaterializedViews())
-        {
-            newViewsByName.put(definition.viewName, definition);
-        }
-
-        for (String viewName : viewsByName.keySet())
-        {
-            if (!newViewsByName.containsKey(viewName))
-                removeMaterializedView(viewName);
-        }
-
-        for (Map.Entry<String, MaterializedViewDefinition> entry : newViewsByName.entrySet())
-        {
-            if (!viewsByName.containsKey(entry.getKey()))
-                addMaterializedView(entry.getValue());
-        }
-
-        for (MaterializedView view : allViews())
-        {
-            view.build();
-            // We provide the new definition from the base metadata
-            view.updateDefinition(newViewsByName.get(view.name));
-        }
-    }
-
-    public void buildAllViews()
-    {
-        for (MaterializedView view : allViews())
-            view.build();
-    }
-
-    public void removeMaterializedView(String name)
-    {
-        MaterializedView view = viewsByName.remove(name);
-
-        if (view == null)
-            return;
-
-        SystemKeyspace.setMaterializedViewRemoved(baseCfs.metadata.ksName, view.name);
-    }
-
-    public void addMaterializedView(MaterializedViewDefinition definition)
-    {
-        MaterializedView view = new MaterializedView(definition, baseCfs);
-
-        viewsByName.put(definition.viewName, view);
-    }
-
-    /**
-     * Calculates and pushes updates to the views replicas. The replicas are determined by
-     * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
-     */
-    public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
-    {
-        // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the
-        // view node.
-        if (!StorageService.instance.isJoined()) return;
-
-        List<Mutation> mutations = null;
-        for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
-        {
-            Collection<Mutation> viewMutations = view.getValue().createMutations(key, update, false);
-            if (viewMutations != null && !viewMutations.isEmpty())
-            {
-                if (mutations == null)
-                    mutations = Lists.newLinkedList();
-                mutations.addAll(viewMutations);
-            }
-        }
-        if (mutations != null)
-        {
-            StorageProxy.mutateMV(key, mutations);
-        }
-    }
-
-    public boolean updateAffectsView(PartitionUpdate upd)
-    {
-        for (MaterializedView view : allViews())
-        {
-            if (view.updateAffectsView(upd))
-                return true;
-        }
-        return false;
-    }
-
-    public static Lock acquireLockFor(ByteBuffer key)
-    {
-        Lock lock = LOCKS.get(key);
-
-        if (lock.tryLock())
-            return lock;
-
-        return null;
-    }
-
-    public static boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean ignoreRf1)
-    {
-        for (IMutation mutation : mutations)
-        {
-            for (PartitionUpdate cf : mutation.getPartitionUpdates())
-            {
-                Keyspace keyspace = Keyspace.open(cf.metadata().ksName);
-
-                if (ignoreRf1 && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
-                    continue;
-
-                MaterializedViewManager viewManager = keyspace.getColumnFamilyStore(cf.metadata().cfId).materializedViewManager;
-                if (viewManager.updateAffectsView(cf))
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
-
-    public void forceBlockingFlush()
-    {
-        for (ColumnFamilyStore viewCfs : allViewsCfs())
-            viewCfs.forceBlockingFlush();
-    }
-
-    public void dumpMemtables()
-    {
-        for (ColumnFamilyStore viewCfs : allViewsCfs())
-            viewCfs.dumpMemtable();
-    }
-
-    public void truncateBlocking(long truncatedAt)
-    {
-        for (ColumnFamilyStore viewCfs : allViewsCfs())
-        {
-            ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
-            SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
-        }
-    }
-}