You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/16 16:35:16 UTC

[5/6] cassandra git commit: Improve MV schema representation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
new file mode 100644
index 0000000..1a020ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.selection.RawSelector;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.Event;
+
+public class CreateViewStatement extends SchemaAlteringStatement
+{
+    private final CFName baseName;
+    private final List<RawSelector> selectClause;
+    private final List<ColumnIdentifier.Raw> notNullWhereClause;
+    private final List<ColumnIdentifier.Raw> partitionKeys;
+    private final List<ColumnIdentifier.Raw> clusteringKeys;
+    public final CFProperties properties = new CFProperties();
+    private final boolean ifNotExists;
+
+    public CreateViewStatement(CFName viewName,
+                               CFName baseName,
+                               List<RawSelector> selectClause,
+                               List<ColumnIdentifier.Raw> notNullWhereClause,
+                               List<ColumnIdentifier.Raw> partitionKeys,
+                               List<ColumnIdentifier.Raw> clusteringKeys,
+                               boolean ifNotExists)
+    {
+        super(viewName);
+        this.baseName = baseName;
+        this.selectClause = selectClause;
+        this.notNullWhereClause = notNullWhereClause;
+        this.partitionKeys = partitionKeys;
+        this.clusteringKeys = clusteringKeys;
+        this.ifNotExists = ifNotExists;
+    }
+
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        if (!baseName.hasKeyspace())
+            baseName.setKeyspace(keyspace(), true);
+        state.hasColumnFamilyAccess(keyspace(), baseName.getColumnFamily(), Permission.ALTER);
+    }
+
+    public void validate(ClientState state) throws RequestValidationException
+    {
+        // We do validation in announceMigration to reduce doubling up of work
+    }
+
+    private interface AddColumn {
+        void add(ColumnIdentifier identifier, AbstractType<?> type);
+    }
+
+    private void add(CFMetaData baseCfm, Iterable<ColumnIdentifier> columns, AddColumn adder)
+    {
+        for (ColumnIdentifier column : columns)
+        {
+            AbstractType<?> type = baseCfm.getColumnDefinition(column).type;
+            if (properties.definedOrdering.containsKey(column))
+            {
+                boolean desc = properties.definedOrdering.get(column);
+                if (!desc && type.isReversed())
+                {
+                    type = ((ReversedType)type).baseType;
+                }
+                else if (desc && !type.isReversed())
+                {
+                    type = ReversedType.getInstance(type);
+                }
+            }
+            adder.add(column, type);
+        }
+    }
+
+    public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
+    {
+        // We need to make sure that:
+        //  - primary key includes all columns in base table's primary key
+        //  - make sure that the select statement does not have anything other than columns
+        //    and their names match the base table's names
+        //  - make sure that primary key does not include any collections
+        //  - make sure there is no where clause in the select statement
+        //  - make sure there is not currently a table or view
+        //  - make sure baseTable gcGraceSeconds > 0
+
+        properties.validate();
+
+        if (properties.useCompactStorage)
+            throw new InvalidRequestException("Cannot use 'COMPACT STORAGE' when defining a materialized view");
+
+        // We enforce the keyspace because if the RF is different, the logic to wait for a
+        // specific replica would break
+        if (!baseName.getKeyspace().equals(keyspace()))
+            throw new InvalidRequestException("Cannot create a materialized view on a table in a separate keyspace");
+
+        CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily());
+
+        if (cfm.isCounter())
+            throw new InvalidRequestException("Materialized views are not supported on counter tables");
+        if (cfm.isView())
+            throw new InvalidRequestException("Materialized views cannot be created against other materialized views");
+
+        if (cfm.params.gcGraceSeconds == 0)
+        {
+            throw new InvalidRequestException(String.format("Cannot create materialized view '%s' for base table " +
+                                                            "'%s' with gc_grace_seconds of 0, since this value is " +
+                                                            "used to TTL undelivered updates. Setting gc_grace_seconds" +
+                                                            " too low might cause undelivered updates to expire " +
+                                                            "before being replayed.", cfName.getColumnFamily(),
+                                                            baseName.getColumnFamily()));
+        }
+
+        Set<ColumnIdentifier> included = new HashSet<>();
+        for (RawSelector selector : selectClause)
+        {
+            Selectable.Raw selectable = selector.selectable;
+            if (selectable instanceof Selectable.WithFieldSelection.Raw)
+                throw new InvalidRequestException("Cannot select out a part of type when defining a materialized view");
+            if (selectable instanceof Selectable.WithFunction.Raw)
+                throw new InvalidRequestException("Cannot use function when defining a materialized view");
+            if (selectable instanceof Selectable.WritetimeOrTTL.Raw)
+                throw new InvalidRequestException("Cannot use function when defining a materialized view");
+            ColumnIdentifier identifier = (ColumnIdentifier) selectable.prepare(cfm);
+            if (selector.alias != null)
+                throw new InvalidRequestException(String.format("Cannot alias column '%s' as '%s' when defining a materialized view", identifier.toString(), selector.alias.toString()));
+
+            ColumnDefinition cdef = cfm.getColumnDefinition(identifier);
+
+            if (cdef == null)
+                throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+            if (cdef.isStatic())
+                ClientWarn.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier));
+            else
+                included.add(identifier);
+        }
+
+        Set<ColumnIdentifier.Raw> targetPrimaryKeys = new HashSet<>();
+        for (ColumnIdentifier.Raw identifier : Iterables.concat(partitionKeys, clusteringKeys))
+        {
+            if (!targetPrimaryKeys.add(identifier))
+                throw new InvalidRequestException("Duplicate entry found in PRIMARY KEY: "+identifier);
+
+            ColumnDefinition cdef = cfm.getColumnDefinition(identifier.prepare(cfm));
+
+            if (cdef == null)
+                throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+            if (cfm.getColumnDefinition(identifier.prepare(cfm)).type.isMultiCell())
+                throw new InvalidRequestException(String.format("Cannot use MultiCell column '%s' in PRIMARY KEY of materialized view", identifier));
+
+            if (cdef.isStatic())
+                throw new InvalidRequestException(String.format("Cannot use Static column '%s' in PRIMARY KEY of materialized view", identifier));
+        }
+
+        Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();
+        for (ColumnDefinition definition : Iterables.concat(cfm.partitionKeyColumns(), cfm.clusteringColumns()))
+            basePrimaryKeyCols.add(definition.name);
+
+        List<ColumnIdentifier> targetClusteringColumns = new ArrayList<>();
+        List<ColumnIdentifier> targetPartitionKeys = new ArrayList<>();
+        Set<ColumnIdentifier> notNullColumns = new HashSet<>();
+        if (notNullWhereClause != null)
+        {
+            for (ColumnIdentifier.Raw raw : notNullWhereClause)
+            {
+                notNullColumns.add(raw.prepare(cfm));
+            }
+        }
+
+        // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used
+        boolean hasNonPKColumn = false;
+        for (ColumnIdentifier.Raw raw : partitionKeys)
+        {
+            hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, notNullColumns);
+        }
+
+        for (ColumnIdentifier.Raw raw : clusteringKeys)
+        {
+            hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, notNullColumns);
+        }
+
+        // We need to include all of the primary key colums from the base table in order to make sure that we do not
+        // overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in
+        // the view because if we need to generate a tombstone, we have no way of knowing which value is currently being
+        // used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require
+        // that they include all of the columns. We provide them with a list of all of the columns left to include.
+        boolean missingClusteringColumns = false;
+        StringBuilder columnNames = new StringBuilder();
+        List<ColumnIdentifier> includedColumns = new ArrayList<>();
+        for (ColumnDefinition def : cfm.allColumns())
+        {
+            ColumnIdentifier identifier = def.name;
+
+            if ((included.isEmpty() || included.contains(identifier))
+                && !targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier)
+                && !def.isStatic())
+            {
+                includedColumns.add(identifier);
+            }
+            if (!def.isPrimaryKeyColumn()) continue;
+
+            if (!targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier))
+            {
+                if (missingClusteringColumns)
+                    columnNames.append(',');
+                else
+                    missingClusteringColumns = true;
+                columnNames.append(identifier);
+            }
+        }
+        if (missingClusteringColumns)
+            throw new InvalidRequestException(String.format("Cannot create Materialized View %s without primary key columns from base %s (%s)",
+                                                            columnFamily(), baseName.getColumnFamily(), columnNames.toString()));
+
+        if (targetPartitionKeys.isEmpty())
+            throw new InvalidRequestException("Must select at least a column for a Materialized View");
+
+        if (targetClusteringColumns.isEmpty())
+            throw new InvalidRequestException("No columns are defined for Materialized View other than primary key");
+
+        CFMetaData.Builder cfmBuilder = CFMetaData.Builder.createView(keyspace(), columnFamily());
+        add(cfm, targetPartitionKeys, cfmBuilder::addPartitionKey);
+        add(cfm, targetClusteringColumns, cfmBuilder::addClusteringColumn);
+        add(cfm, includedColumns, cfmBuilder::addRegularColumn);
+        TableParams params = properties.properties.asNewTableParams();
+        CFMetaData viewCfm = cfmBuilder.build().params(params);
+        ViewDefinition definition = new ViewDefinition(keyspace(),
+                                                       columnFamily(),
+                                                       Schema.instance.getId(keyspace(), baseName.getColumnFamily()),
+                                                       included.isEmpty(),
+                                                       viewCfm);
+
+        try
+        {
+            MigrationManager.announceNewView(definition, isLocalOnly);
+        }
+        catch (AlreadyExistsException e)
+        {
+            if (ifNotExists)
+                return false;
+            throw e;
+        }
+
+        return true;
+    }
+
+    private static boolean getColumnIdentifier(CFMetaData cfm,
+                                               Set<ColumnIdentifier> basePK,
+                                               boolean hasNonPKColumn,
+                                               ColumnIdentifier.Raw raw,
+                                               List<ColumnIdentifier> columns,
+                                               Set<ColumnIdentifier> allowedPKColumns)
+    {
+        ColumnIdentifier identifier = raw.prepare(cfm);
+
+        boolean isPk = basePK.contains(identifier);
+        if (!isPk && hasNonPKColumn)
+        {
+            throw new InvalidRequestException(String.format("Cannot include more than one non-primary key column '%s' in materialized view partition key", identifier));
+        }
+
+        // We don't need to include the "IS NOT NULL" filter on a non-composite partition key
+        // because we will never allow a single partition key to be NULL
+        boolean isSinglePartitionKey = cfm.getColumnDefinition(identifier).isPartitionKey()
+                                       && cfm.partitionKeyColumns().size() == 1;
+        if (!allowedPKColumns.remove(identifier) && !isSinglePartitionKey)
+        {
+            throw new InvalidRequestException(String.format("Primary key column '%s' is required to be filtered by 'IS NOT NULL'", identifier));
+        }
+
+        columns.add(identifier);
+        return !isPk;
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
deleted file mode 100644
index 8adba45..0000000
--- a/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.statements;
-
-import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.CFName;
-import org.apache.cassandra.db.view.MaterializedView;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.transport.Event;
-
-public class DropMaterializedViewStatement extends SchemaAlteringStatement
-{
-    public final boolean ifExists;
-
-    public DropMaterializedViewStatement(CFName cf, boolean ifExists)
-    {
-        super(cf);
-        this.ifExists = ifExists;
-    }
-
-    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
-    {
-        CFMetaData baseTable = MaterializedView.findBaseTable(keyspace(), columnFamily());
-        if (baseTable != null)
-            state.hasColumnFamilyAccess(keyspace(), baseTable.cfName, Permission.ALTER);
-    }
-
-    public void validate(ClientState state)
-    {
-        // validated in findIndexedCf()
-    }
-
-    public Event.SchemaChange changeEvent()
-    {
-        return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
-    }
-
-    public boolean announceMigration(boolean isLocalOnly) throws InvalidRequestException, ConfigurationException
-    {
-        try
-        {
-            CFMetaData viewCfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
-            if (viewCfm == null)
-                throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", columnFamily(), keyspace()));
-            if (!viewCfm.isMaterializedView())
-                throw new ConfigurationException(String.format("Cannot drop non materialized view '%s' in keyspace '%s'", columnFamily(), keyspace()));
-
-            CFMetaData baseCfm = MaterializedView.findBaseTable(keyspace(), columnFamily());
-            if (baseCfm == null)
-            {
-                if (ifExists)
-                    throw new ConfigurationException(String.format("Cannot drop materialized view '%s' in keyspace '%s' without base CF.", columnFamily(), keyspace()));
-                else
-                    throw new InvalidRequestException(String.format("View '%s' could not be found in any of the tables of keyspace '%s'", cfName, keyspace()));
-            }
-
-            CFMetaData updatedCfm = baseCfm.copy();
-            updatedCfm.materializedViews(updatedCfm.getMaterializedViews().without(columnFamily()));
-            MigrationManager.announceColumnFamilyUpdate(updatedCfm, false, isLocalOnly);
-            MigrationManager.announceColumnFamilyDrop(keyspace(), columnFamily(), isLocalOnly);
-            return true;
-        }
-        catch (ConfigurationException e)
-        {
-            if (ifExists)
-                return false;
-            throw e;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
index 35dc947..14d89d9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
@@ -19,12 +19,13 @@ package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.transport.Event;
@@ -61,20 +62,24 @@ public class DropTableStatement extends SchemaAlteringStatement
     {
         try
         {
-            CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
+            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace());
+            CFMetaData cfm = ksm.tables.getNullable(columnFamily());
             if (cfm != null)
             {
-                if (cfm.isMaterializedView())
+                if (cfm.isView())
                     throw new InvalidRequestException("Cannot use DROP TABLE on Materialized View");
 
                 boolean rejectDrop = false;
                 StringBuilder messageBuilder = new StringBuilder();
-                for (MaterializedViewDefinition def : cfm.getMaterializedViews())
+                for (ViewDefinition def : ksm.views)
                 {
-                    if (rejectDrop)
-                        messageBuilder.append(',');
-                    rejectDrop = true;
-                    messageBuilder.append(def.viewName);
+                    if (def.baseTableId.equals(cfm.cfId))
+                    {
+                        if (rejectDrop)
+                            messageBuilder.append(',');
+                        rejectDrop = true;
+                        messageBuilder.append(def.viewName);
+                    }
                 }
                 if (rejectDrop)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index 75f6200..74c8c36 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -87,7 +87,7 @@ public class DropTypeStatement extends SchemaAlteringStatement
             if (!ut.name.equals(name.getUserTypeName()) && isUsedBy(ut))
                 throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by user type %s", name, ut.asCQL3Type()));
 
-        for (CFMetaData cfm : ksm.tables)
+        for (CFMetaData cfm : ksm.tablesAndViews())
             for (ColumnDefinition def : cfm.allColumns())
                 if (isUsedBy(def.type))
                     throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by table %s.%s", name, cfm.ksName, cfm.cfName));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
new file mode 100644
index 0000000..f2be370
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.Event;
+
+public class DropViewStatement extends SchemaAlteringStatement
+{
+    public final boolean ifExists;
+
+    public DropViewStatement(CFName cf, boolean ifExists)
+    {
+        super(cf);
+        this.ifExists = ifExists;
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        CFMetaData baseTable = View.findBaseTable(keyspace(), columnFamily());
+        if (baseTable != null)
+            state.hasColumnFamilyAccess(keyspace(), baseTable.cfName, Permission.ALTER);
+    }
+
+    public void validate(ClientState state)
+    {
+        // validated in findIndexedCf()
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+    }
+
+    public boolean announceMigration(boolean isLocalOnly) throws InvalidRequestException, ConfigurationException
+    {
+        try
+        {
+//            ViewDefinition view = Schema.instance.getViewDefinition(keyspace(), columnFamily());
+//            if (view == null)
+//            {
+//                if (Schema.instance.getCFMetaData(keyspace(), columnFamily()) != null)
+//                    throw new ConfigurationException(String.format("Cannot drop table '%s' in keyspace '%s'.", columnFamily(), keyspace()));
+//
+//                throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", columnFamily(), keyspace()));
+//            }
+//
+//            CFMetaData baseCfm = Schema.instance.getCFMetaData(view.baseTableId);
+//            if (baseCfm == null)
+//            {
+//                if (ifExists)
+//                    throw new ConfigurationException(String.format("Cannot drop materialized view '%s' in keyspace '%s' without base CF.", columnFamily(), keyspace()));
+//                else
+//                    throw new InvalidRequestException(String.format("View '%s' could not be found in any of the tables of keyspace '%s'", cfName, keyspace()));
+//            }
+
+            MigrationManager.announceViewDrop(keyspace(), columnFamily(), isLocalOnly);
+            return true;
+        }
+        catch (ConfigurationException e)
+        {
+            if (ifExists)
+                return false;
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 3855b6a..a04af4c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -21,13 +21,13 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterables;
-
-import static org.apache.cassandra.cql3.statements.RequestValidations.checkNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.ColumnIdentifier.Raw;
 import org.apache.cassandra.cql3.functions.Function;
@@ -38,10 +38,8 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
@@ -52,9 +50,9 @@ import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNull;
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 
 /*
@@ -170,14 +168,9 @@ public abstract class ModificationStatement implements CQLStatement
         return cfm.isCounter();
     }
 
-    public boolean isMaterializedView()
-    {
-        return cfm.isMaterializedView();
-    }
-
-    public boolean hasMaterializedViews()
+    public boolean isView()
     {
-        return !cfm.getMaterializedViews().isEmpty();
+        return cfm.isView();
     }
 
     public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
@@ -203,13 +196,16 @@ public abstract class ModificationStatement implements CQLStatement
         if (hasConditions())
             state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT);
 
-        // MV updates need to get the current state from the table, and might update the materialized views
+        // MV updates need to get the current state from the table, and might update the views
         // Require Permission.SELECT on the base table, and Permission.MODIFY on the views
-        if (hasMaterializedViews())
+        Iterator<ViewDefinition> views = View.findAll(keyspace(), columnFamily()).iterator();
+        if (views.hasNext())
         {
             state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT);
-            for (MaterializedViewDefinition view : cfm.getMaterializedViews())
-                state.hasColumnFamilyAccess(keyspace(), view.viewName, Permission.MODIFY);
+            do
+            {
+                state.hasColumnFamilyAccess(keyspace(), views.next().viewName, Permission.MODIFY);
+            } while (views.hasNext());
         }
 
         for (Function function : getFunctions())
@@ -221,7 +217,7 @@ public abstract class ModificationStatement implements CQLStatement
         checkFalse(hasConditions() && attrs.isTimestampSet(), "Cannot provide custom timestamp for conditional updates");
         checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide custom timestamp for counter updates");
         checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide custom TTL for counter updates");
-        checkFalse(isMaterializedView(), "Cannot directly modify a materialized view");
+        checkFalse(isView(), "Cannot directly modify a materialized view");
     }
 
     public PartitionColumns updatedColumns()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7ad6c09..18e402b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.rows.ComplexColumnData;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.view.MaterializedView;
+import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.index.Index;
@@ -175,9 +175,9 @@ public class SelectStatement implements CQLStatement
 
     public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
     {
-        if (cfm.isMaterializedView())
+        if (cfm.isView())
         {
-            CFMetaData baseTable = MaterializedView.findBaseTable(keyspace(), columnFamily());
+            CFMetaData baseTable = View.findBaseTable(keyspace(), columnFamily());
             if (baseTable != null)
                 state.hasColumnFamilyAccess(keyspace(), baseTable.cfName, Permission.SELECT);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 5dd306a..66b3da0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -65,7 +65,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         try
         {
             CFMetaData metaData = Schema.instance.getCFMetaData(keyspace(), columnFamily());
-            if (metaData.isMaterializedView())
+            if (metaData.isView())
                 throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
 
             StorageProxy.truncateBlocking(keyspace(), columnFamily());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c7d8926..0d6d801 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -48,11 +48,11 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.view.ViewManager;
 import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.db.view.MaterializedViewManager;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -193,7 +193,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
 
     public final SecondaryIndexManager indexManager;
-    public final MaterializedViewManager materializedViewManager;
+    public final ViewManager.ForStore viewManager;
 
     /* These are locally held copies to be changed from the config during runtime */
     private volatile DefaultInteger minCompactionThreshold;
@@ -231,7 +231,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         indexManager.reload();
 
-        materializedViewManager.reload();
         // If the CF comparator has changed, we need to change the memtable,
         // because the old one still aliases the previous comparator.
         if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator)
@@ -377,7 +376,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         this.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
         this.directories = directories;
         this.indexManager = new SecondaryIndexManager(this);
-        this.materializedViewManager = new MaterializedViewManager(this);
+        this.viewManager = keyspace.viewManager.forTable(metadata.cfId);
         this.metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
@@ -513,7 +512,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         data.dropSSTables();
         LifecycleTransaction.waitForDeletions();
         indexManager.invalidateAllIndexesBlocking();
-        materializedViewManager.invalidate();
 
         invalidateCaches();
     }
@@ -633,8 +631,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     // must be called after all sstables are loaded since row cache merges all row versions
     public void init()
     {
-        materializedViewManager.init();
-
         if (!isRowCacheEnabled())
             return;
 
@@ -1912,7 +1908,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // flush the CF being truncated before forcing the new segment
             forceBlockingFlush();
 
-            materializedViewManager.forceBlockingFlush();
+            viewManager.forceBlockingFlush();
 
             // sleep a little to make sure that our truncatedAt comes after any sstable
             // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
@@ -1921,7 +1917,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         else
         {
             dumpMemtable();
-            materializedViewManager.dumpMemtables();
+            viewManager.dumpMemtables();
         }
 
         Runnable truncateRunnable = new Runnable()
@@ -1940,7 +1936,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 indexManager.truncateAllIndexesBlocking(truncatedAt);
 
-                materializedViewManager.truncateBlocking(truncatedAt);
+                viewManager.truncateBlocking(truncatedAt);
 
                 SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
                 logger.debug("cleaning out row cache");
@@ -1974,7 +1970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
 
             Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
-                                                               ? Iterables.concat(concatWithIndexes(), materializedViewManager.allViewsCfs())
+                                                               ? Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
                                                                : concatWithIndexes();
 
             for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 4661bae..1169b45 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.view.MaterializedViewManager;
+import org.apache.cassandra.db.view.ViewManager;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.SecondaryIndexManager;
@@ -81,12 +81,13 @@ public class Keyspace
     private volatile KeyspaceMetadata metadata;
 
     //OpOrder is defined globally since we need to order writes across
-    //Keyspaces in the case of MaterializedViews (batchlog of MV mutations)
+    //Keyspaces in the case of Views (batchlog of view mutations)
     public static final OpOrder writeOrder = new OpOrder();
 
     /* ColumnFamilyStore per column family */
     private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
     private volatile AbstractReplicationStrategy replicationStrategy;
+    public final ViewManager viewManager;
 
     public static final Function<String,Keyspace> keyspaceTransformer = new Function<String, Keyspace>()
     {
@@ -305,11 +306,13 @@ public class Keyspace
         createReplicationStrategy(metadata);
 
         this.metric = new KeyspaceMetrics(this);
-        for (CFMetaData cfm : metadata.tables)
+        this.viewManager = new ViewManager(this);
+        for (CFMetaData cfm : metadata.tablesAndViews())
         {
             logger.debug("Initializing {}.{}", getName(), cfm.cfName);
             initCf(cfm.cfId, cfm.cfName, loadSSTables);
         }
+        this.viewManager.reload();
     }
 
     private Keyspace(KeyspaceMetadata metadata)
@@ -317,6 +320,7 @@ public class Keyspace
         this.metadata = metadata;
         createReplicationStrategy(metadata);
         this.metric = new KeyspaceMetrics(this);
+        this.viewManager = new ViewManager(this);
     }
 
     public static Keyspace mockKS(KeyspaceMetadata metadata)
@@ -418,11 +422,11 @@ public class Keyspace
             throw new RuntimeException("Testing write failures");
 
         Lock lock = null;
-        boolean requiresViewUpdate = updateIndexes && MaterializedViewManager.updatesAffectView(Collections.singleton(mutation), false);
+        boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
 
         if (requiresViewUpdate)
         {
-            lock = MaterializedViewManager.acquireLockFor(mutation.key().getKey());
+            lock = ViewManager.acquireLockFor(mutation.key().getKey());
 
             if (lock == null)
             {
@@ -430,11 +434,11 @@ public class Keyspace
                 {
                     logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
                     Tracing.trace("Could not acquire MV lock");
-                    throw new WriteTimeoutException(WriteType.MATERIALIZED_VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
+                    throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
                 }
                 else
                 {
-                    //This MV update can't happen right now. so rather than keep this thread busy
+                    //This view update can't happen right now. so rather than keep this thread busy
                     // we will re-apply ourself to the queue and try again later
                     StageManager.getStage(Stage.MUTATION).execute(() -> {
                         if (writeCommitLog)
@@ -472,7 +476,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Creating materialized view mutations from base table replica");
-                        cfs.materializedViewManager.pushViewReplicaUpdates(upd, !isClReplay);
+                        viewManager.pushViewReplicaUpdates(upd, !isClReplay);
                     }
                     catch (Throwable t)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index d54ee8b..6b2585e 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -95,8 +95,8 @@ public final class SystemKeyspace
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
     public static final String AVAILABLE_RANGES = "available_ranges";
-    public static final String MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS = "materialized_views_builds_in_progress";
-    public static final String BUILT_MATERIALIZED_VIEWS = "built_materialized_views";
+    public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
+    public static final String BUILT_VIEWS = "built_views";
 
     @Deprecated public static final String LEGACY_HINTS = "hints";
     @Deprecated public static final String LEGACY_BATCHLOG = "batchlog";
@@ -246,9 +246,9 @@ public final class SystemKeyspace
                 + "ranges set<blob>,"
                 + "PRIMARY KEY ((keyspace_name)))");
 
-    private static final CFMetaData MaterializedViewsBuildsInProgress =
-        compile(MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS,
-                "materialized views builds current progress",
+    private static final CFMetaData ViewsBuildsInProgress =
+        compile(VIEWS_BUILDS_IN_PROGRESS,
+                "views builds current progress",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "view_name text,"
@@ -256,9 +256,9 @@ public final class SystemKeyspace
                 + "generation_number int,"
                 + "PRIMARY KEY ((keyspace_name), view_name))");
 
-    private static final CFMetaData BuiltMaterializedViews =
-        compile(BUILT_MATERIALIZED_VIEWS,
-                "built materialized views",
+    private static final CFMetaData BuiltViews =
+        compile(BUILT_VIEWS,
+                "built views",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "view_name text,"
@@ -414,7 +414,7 @@ public final class SystemKeyspace
 
     public static KeyspaceMetadata metadata()
     {
-        return KeyspaceMetadata.create(NAME, KeyspaceParams.local(), tables(), Types.none(), functions());
+        return KeyspaceMetadata.create(NAME, KeyspaceParams.local(), tables(), Views.none(), Types.none(), functions());
     }
 
     private static Tables tables()
@@ -430,8 +430,8 @@ public final class SystemKeyspace
                          SSTableActivity,
                          SizeEstimates,
                          AvailableRanges,
-                         MaterializedViewsBuildsInProgress,
-                         BuiltMaterializedViews,
+                         ViewsBuildsInProgress,
+                         BuiltViews,
                          LegacyHints,
                          LegacyBatchlog,
                          LegacyKeyspaces,
@@ -531,61 +531,61 @@ public final class SystemKeyspace
     public static boolean isViewBuilt(String keyspaceName, String viewName)
     {
         String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?";
-        UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_MATERIALIZED_VIEWS), keyspaceName, viewName);
+        UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, viewName);
         return !result.isEmpty();
     }
 
-    public static void setMaterializedViewBuilt(String keyspaceName, String viewName)
+    public static void setViewBuilt(String keyspaceName, String viewName)
     {
         String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)";
-        executeInternal(String.format(req, NAME, BUILT_MATERIALIZED_VIEWS), keyspaceName, viewName);
-        forceBlockingFlush(BUILT_MATERIALIZED_VIEWS);
+        executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, viewName);
+        forceBlockingFlush(BUILT_VIEWS);
     }
 
 
-    public static void setMaterializedViewRemoved(String keyspaceName, String viewName)
+    public static void setViewRemoved(String keyspaceName, String viewName)
     {
         String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ?";
-        executeInternal(String.format(buildReq, NAME, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
-        forceBlockingFlush(MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS);
+        executeInternal(String.format(buildReq, NAME, VIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
+        forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
 
         String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ?";
-        executeInternal(String.format(builtReq, NAME, BUILT_MATERIALIZED_VIEWS), keyspaceName, viewName);
-        forceBlockingFlush(BUILT_MATERIALIZED_VIEWS);
+        executeInternal(String.format(builtReq, NAME, BUILT_VIEWS), keyspaceName, viewName);
+        forceBlockingFlush(BUILT_VIEWS);
     }
 
-    public static void beginMaterializedViewBuild(String ksname, String viewName, int generationNumber)
+    public static void beginViewBuild(String ksname, String viewName, int generationNumber)
     {
-        executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS),
+        executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS),
                         ksname,
                         viewName,
                         generationNumber);
     }
 
-    public static void finishMaterializedViewBuildStatus(String ksname, String viewName)
+    public static void finishViewBuildStatus(String ksname, String viewName)
     {
         // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed
-        // materialized view build.
+        // view build.
         // If we flush the delete first, we'll have to restart from the beginning.
-        // Also, if the build succeeded, but the materialized view build failed, we will be able to skip the
-        // materialized view build check next boot.
-        setMaterializedViewBuilt(ksname, viewName);
-        forceBlockingFlush(BUILT_MATERIALIZED_VIEWS);
-        executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
-        forceBlockingFlush(MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS);
+        // Also, if the build succeeded, but the view build failed, we will be able to skip the view build check
+        // next boot.
+        setViewBuilt(ksname, viewName);
+        forceBlockingFlush(BUILT_VIEWS);
+        executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+        forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
     }
 
-    public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token)
+    public static void updateViewBuildStatus(String ksname, String viewName, Token token)
     {
         String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
-        Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory();
-        executeInternal(String.format(req, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
+        Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory();
+        executeInternal(String.format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
     }
 
-    public static Pair<Integer, Token> getMaterializedViewBuildStatus(String ksname, String viewName)
+    public static Pair<Integer, Token> getViewBuildStatus(String ksname, String viewName)
     {
         String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
-        UntypedResultSet queryResultSet = executeInternal(String.format(req, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+        UntypedResultSet queryResultSet = executeInternal(String.format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
         if (queryResultSet == null || queryResultSet.isEmpty())
             return null;
 
@@ -597,7 +597,7 @@ public final class SystemKeyspace
             generation = row.getInt("generation_number");
         if (row.has("last_key"))
         {
-            Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory();
+            Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory();
             lastKey = factory.fromString(row.getString("last_key"));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/WriteType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java
index 20fb6a9..fdbe97d 100644
--- a/src/java/org/apache/cassandra/db/WriteType.java
+++ b/src/java/org/apache/cassandra/db/WriteType.java
@@ -25,5 +25,5 @@ public enum WriteType
     COUNTER,
     BATCH_LOG,
     CAS,
-    MATERIALIZED_VIEW;
+    VIEW;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5207f49..75d50e7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -45,7 +45,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.view.MaterializedViewBuilder;
+import org.apache.cassandra.db.view.ViewBuilder;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -1334,7 +1334,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    public Future<?> submitMaterializedViewBuilder(final MaterializedViewBuilder builder)
+    public Future<?> submitViewBuilder(final ViewBuilder builder)
     {
         Runnable runnable = new Runnable()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index f8f016c..a69622b 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -36,7 +36,7 @@ public enum OperationType
     FLUSH("Flush"),
     STREAM("Stream"),
     WRITE("Write"),
-    VIEW_BUILD("Materialized view build");
+    VIEW_BUILD("View build");
 
     public final String type;
     public final String fileName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
deleted file mode 100644
index 52034cc..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.statements.CFProperties;
-import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
-import org.apache.cassandra.db.CBuilder;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.partitions.AbstractBTreePartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeRow;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.ColumnData;
-import org.apache.cassandra.db.rows.ComplexColumnData;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.service.pager.QueryPager;
-
-/**
- * A Materialized View copies data from a base table into a view table which can be queried independently from the
- * base. Every update which targets the base table must be fed through the {@link MaterializedViewManager} to ensure
- * that if a view needs to be updated, the updates are properly created and fed into the view.
- *
- * This class does the job of translating the base row to the view row.
- *
- * It handles reading existing state and figuring out what tombstones need to be generated.
- *
- * createMutations below is the "main method"
- *
- */
-public class MaterializedView
-{
-    /**
-     * The columns should all be updated together, so we use this object as group.
-     */
-    private static class MVColumns
-    {
-        //These are the base column definitions in terms of the *views* partitioning.
-        //Meaning we can see (for example) the partition key of the view contains a clustering key
-        //from the base table.
-        public final List<ColumnDefinition> partitionDefs;
-        public final List<ColumnDefinition> primaryKeyDefs;
-        public final List<ColumnDefinition> baseComplexColumns;
-
-        private MVColumns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
-        {
-            this.partitionDefs = partitionDefs;
-            this.primaryKeyDefs = primaryKeyDefs;
-            this.baseComplexColumns = baseComplexColumns;
-        }
-    }
-
-    public final String name;
-
-    private final ColumnFamilyStore baseCfs;
-    private ColumnFamilyStore _viewCfs = null;
-
-    private MVColumns columns;
-
-    private final boolean viewHasAllPrimaryKeys;
-    private final boolean includeAll;
-    private MaterializedViewBuilder builder;
-
-    public MaterializedView(MaterializedViewDefinition definition,
-                            ColumnFamilyStore baseCfs)
-    {
-        this.baseCfs = baseCfs;
-
-        name = definition.viewName;
-        includeAll = definition.includeAll;
-
-        viewHasAllPrimaryKeys = updateDefinition(definition);
-    }
-
-    /**
-     * Lazily fetch the CFS instance for the view.
-     * We do this lazily to avoid initilization issues.
-     *
-     * @return The views CFS instance
-     */
-    public ColumnFamilyStore getViewCfs()
-    {
-        if (_viewCfs == null)
-            _viewCfs = Keyspace.openAndGetStore(Schema.instance.getCFMetaData(baseCfs.keyspace.getName(), name));
-
-        return _viewCfs;
-    }
-
-
-    /**
-     * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
-     *
-     * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
-     * tombstone checks.
-     *
-     * @param columns a list of columns to lookup in the base table
-     * @param definitions lists to populate for the base table definitions
-     * @return true if all view PKs are also Base PKs
-     */
-    private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
-    {
-        boolean allArePrimaryKeys = true;
-        for (ColumnIdentifier identifier : columns)
-        {
-            ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
-            assert cdef != null : "Could not resolve column " + identifier.toString();
-
-            for (List<ColumnDefinition> list : definitions)
-            {
-                list.add(cdef);
-            }
-
-            allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
-        }
-
-        return allArePrimaryKeys;
-    }
-
-    /**
-     * This updates the columns stored which are dependent on the base CFMetaData.
-     *
-     * @return true if the view contains only columns which are part of the base's primary key; false if there is at
-     *         least one column which is not.
-     */
-    public boolean updateDefinition(MaterializedViewDefinition definition)
-    {
-        List<ColumnDefinition> partitionDefs = new ArrayList<>(definition.partitionColumns.size());
-        List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(definition.partitionColumns.size()
-                                                                + definition.clusteringColumns.size());
-        List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
-
-        // We only add the partition columns to the partitions list, but both partition columns and clustering
-        // columns are added to the primary keys list
-        boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(definition.partitionColumns, primaryKeyDefs, partitionDefs);
-        boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(definition.clusteringColumns, primaryKeyDefs);
-
-        for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
-        {
-            if (cdef.isComplex())
-            {
-                baseComplexColumns.add(cdef);
-            }
-        }
-
-        this.columns = new MVColumns(partitionDefs, primaryKeyDefs, baseComplexColumns);
-
-        return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
-    }
-
-    /**
-     * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
-     * <ul>
-     *     <li>View selects all columns</li>
-     *     <li>Update contains any range tombstones</li>
-     *     <li>Update touches one of the columns included in the view</li>
-     * </ul>
-     *
-     * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
-     * currently included in the view.
-     *
-     * @return true if {@param partition} modifies a column included in the view
-     */
-    public boolean updateAffectsView(AbstractBTreePartition partition)
-    {
-        // If we are including all of the columns, then any update will be included
-        if (includeAll)
-            return true;
-
-        // If there are range tombstones, tombstones will also need to be generated for the materialized view
-        // This requires a query of the base rows and generating tombstones for all of those values
-        if (!partition.deletionInfo().isLive())
-            return true;
-
-        // Check each row for deletion or update
-        for (Row row : partition)
-        {
-            if (row.hasComplexDeletion())
-                return true;
-            if (!row.deletion().isLive())
-                return true;
-
-            for (ColumnData data : row)
-            {
-                if (getViewCfs().metadata.getColumnDefinition(data.column().name) != null)
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * Creates the clustering columns for the view based on the specified row and resolver policy
-     *
-     * @param temporalRow The current row
-     * @param resolver The policy to use when selecting versions of cells use
-     * @return The clustering object to use for the view
-     */
-    private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
-    {
-        CFMetaData viewCfm = getViewCfs().metadata;
-        int numViewClustering = viewCfm.clusteringColumns().size();
-        CBuilder clustering = CBuilder.create(getViewCfs().getComparator());
-        for (int i = 0; i < numViewClustering; i++)
-        {
-            ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
-            clustering.add(temporalRow.clusteringValue(definition, resolver));
-        }
-
-        return clustering.build();
-    }
-
-    /**
-     * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
-     */
-    private PartitionUpdate createTombstone(TemporalRow temporalRow,
-                                            DecoratedKey partitionKey,
-                                            Row.Deletion deletion,
-                                            TemporalRow.Resolver resolver,
-                                            int nowInSec)
-    {
-        CFMetaData viewCfm = getViewCfs().metadata;
-        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
-        builder.newRow(viewClustering(temporalRow, resolver));
-        builder.addRowDeletion(deletion);
-        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
-    }
-
-    /**
-     * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
-     */
-    private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
-                                                   DecoratedKey partitionKey,
-                                                   ColumnDefinition deletedColumn,
-                                                   DeletionTime deletionTime,
-                                                   TemporalRow.Resolver resolver,
-                                                   int nowInSec)
-    {
-
-        CFMetaData viewCfm = getViewCfs().metadata;
-        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
-        builder.newRow(viewClustering(temporalRow, resolver));
-        builder.addComplexDeletion(deletedColumn, deletionTime);
-        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
-    }
-
-    /**
-     * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
-     *         the TemporalRow and its Resolver
-     */
-    private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
-    {
-        List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
-        Object[] partitionKey = new Object[partitionDefs.size()];
-
-        for (int i = 0; i < partitionKey.length; i++)
-        {
-            ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
-
-            if (value == null)
-                return null;
-
-            partitionKey[i] = value;
-        }
-
-        CFMetaData metadata = getViewCfs().metadata;
-        return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
-                                                                     .getKeyValidatorAsClusteringComparator()
-                                                                     .make(partitionKey)));
-    }
-
-    /**
-     * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
-     * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
-     * mutation is necessary
-     */
-    private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
-    {
-        // Primary Key and Clustering columns do not generate tombstones
-        if (viewHasAllPrimaryKeys)
-            return null;
-
-        boolean hasUpdate = false;
-        List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
-        for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
-        {
-            if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
-                hasUpdate = true;
-        }
-
-        if (!hasUpdate)
-            return null;
-
-        TemporalRow.Resolver resolver = TemporalRow.earliest;
-        return createTombstone(temporalRow,
-                               viewPartitionKey(temporalRow, resolver),
-                               Row.Deletion.shadowable(new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec)),
-                               resolver,
-                               temporalRow.nowInSec);
-    }
-
-    /**
-     * @return Mutation which is the transformed base table mutation for the materialized view.
-     */
-    private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
-    {
-        TemporalRow.Resolver resolver = TemporalRow.latest;
-
-        DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
-        ColumnFamilyStore viewCfs = getViewCfs();
-
-        if (partitionKey == null)
-        {
-            // Not having a partition key means we aren't updating anything
-            return null;
-        }
-
-        Row.Builder regularBuilder = BTreeRow.unsortedBuilder(temporalRow.nowInSec);
-
-        CBuilder clustering = CBuilder.create(viewCfs.getComparator());
-        for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++)
-        {
-            clustering.add(temporalRow.clusteringValue(viewCfs.metadata.clusteringColumns().get(i), resolver));
-        }
-        regularBuilder.newRow(clustering.build());
-        regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfs.metadata,
-                                                                     temporalRow.viewClusteringTimestamp(),
-                                                                     temporalRow.viewClusteringTtl(),
-                                                                     temporalRow.viewClusteringLocalDeletionTime()));
-
-        for (ColumnDefinition columnDefinition : viewCfs.metadata.allColumns())
-        {
-            if (columnDefinition.isPrimaryKeyColumn())
-                continue;
-
-            for (Cell cell : temporalRow.values(columnDefinition, resolver))
-            {
-                regularBuilder.addCell(cell);
-            }
-        }
-
-        return PartitionUpdate.singleRowUpdate(viewCfs.metadata, partitionKey, regularBuilder.build());
-    }
-
-    /**
-     * @param partition Update which possibly contains deletion info for which to generate view tombstones.
-     * @return    View Tombstones which delete all of the rows which have been removed from the base table with
-     *            {@param partition}
-     */
-    private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition)
-    {
-        final TemporalRow.Resolver resolver = TemporalRow.earliest;
-
-        DeletionInfo deletionInfo = partition.deletionInfo();
-
-        List<Mutation> mutations = new ArrayList<>();
-
-        // Check the complex columns to see if there are any which may have tombstones we need to create for the view
-        if (!columns.baseComplexColumns.isEmpty())
-        {
-            for (Row row : partition)
-            {
-                if (!row.hasComplexDeletion())
-                    continue;
-
-                TemporalRow temporalRow = rowSet.getClustering(row.clustering());
-
-                assert temporalRow != null;
-
-                for (ColumnDefinition definition : columns.baseComplexColumns)
-                {
-                    ComplexColumnData columnData = row.getComplexColumnData(definition);
-
-                    if (columnData != null)
-                    {
-                        DeletionTime time = columnData.complexDeletion();
-                        if (!time.isLive())
-                        {
-                            DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
-                            if (targetKey != null)
-                                mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
-                        }
-                    }
-                }
-            }
-        }
-
-        ReadCommand command = null;
-
-        if (!deletionInfo.isLive())
-        {
-            // We have to generate tombstones for all of the affected rows, but we don't have the information in order
-            // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
-            // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
-            // entire partition of data which is not distributed on a single partition node.
-            DecoratedKey dk = rowSet.dk;
-
-            if (deletionInfo.hasRanges())
-            {
-                SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
-                Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
-                while (tombstones.hasNext())
-                {
-                    RangeTombstone tombstone = tombstones.next();
-
-                    builder.addSlice(tombstone.deletedSlice());
-                }
-
-                command = builder.build();
-            }
-            else
-            {
-                command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
-            }
-        }
-
-        if (command == null)
-        {
-            SinglePartitionSliceBuilder builder = null;
-            for (Row row : partition)
-            {
-                if (!row.deletion().isLive())
-                {
-                    if (builder == null)
-                        builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
-                    builder.addSlice(Slice.make(row.clustering()));
-                }
-            }
-
-            if (builder != null)
-                command = builder.build();
-        }
-
-        if (command != null)
-        {
-
-            //We may have already done this work for
-            //another MV update so check
-
-            if (!rowSet.hasTombstonedExisting())
-            {
-                QueryPager pager = command.getPager(null);
-
-                // Add all of the rows which were recovered from the query to the row set
-                while (!pager.isExhausted())
-                {
-                    try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                         PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
-                    {
-                        if (!iter.hasNext())
-                            break;
-
-                        try (RowIterator rowIterator = iter.next())
-                        {
-                            while (rowIterator.hasNext())
-                            {
-                                Row row = rowIterator.next();
-                                rowSet.addRow(row, false);
-                            }
-                        }
-                    }
-                }
-
-                //Incase we fetched nothing, avoid re checking on another MV update
-                rowSet.setTombstonedExisting();
-            }
-
-            // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
-            // for the view.
-            for (TemporalRow temporalRow : rowSet)
-            {
-                DeletionTime deletionTime = temporalRow.deletionTime(partition);
-                if (!deletionTime.isLive())
-                {
-                    DecoratedKey value = viewPartitionKey(temporalRow, resolver);
-                    if (value != null)
-                    {
-                        PartitionUpdate update = createTombstone(temporalRow, value, Row.Deletion.regular(deletionTime), resolver, temporalRow.nowInSec);
-                        if (update != null)
-                            mutations.add(new Mutation(update));
-                    }
-                }
-            }
-        }
-
-        return !mutations.isEmpty() ? mutations : null;
-    }
-
-    /**
-     * Read and update temporal rows in the set which have corresponding values stored on the local node
-     */
-    private void readLocalRows(TemporalRow.Set rowSet)
-    {
-        SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
-
-        for (TemporalRow temporalRow : rowSet)
-            builder.addSlice(temporalRow.baseSlice());
-
-        QueryPager pager = builder.build().getPager(null);
-
-        while (!pager.isExhausted())
-        {
-            try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                 PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
-            {
-                while (iter.hasNext())
-                {
-                    try (RowIterator rows = iter.next())
-                    {
-                        while (rows.hasNext())
-                        {
-                            rowSet.addRow(rows.next(), false);
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * @return Set of rows which are contained in the partition update {@param partition}
-     */
-    private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set<ColumnIdentifier> viewPrimaryKeyCols)
-    {
-
-        TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey());
-
-        for (Row row : partition)
-            rowSet.addRow(row, true);
-
-        return rowSet;
-    }
-
-    /**
-     * Splits the partition update up and adds the existing state to each row.
-     * This data can be reused for multiple MV updates on the same base table
-     *
-     * @param partition the mutation
-     * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
-     *                   since all of the update will already be present in the base table.
-     * @return The set of temoral rows contained in this update
-     */
-    public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set existing, boolean isBuilding)
-    {
-        if (!updateAffectsView(partition))
-            return null;
-
-        Set<ColumnIdentifier> columns = new HashSet<>(this.columns.primaryKeyDefs.size());
-        for (ColumnDefinition def : this.columns.primaryKeyDefs)
-            columns.add(def.name);
-
-        TemporalRow.Set rowSet = null;
-        if (existing == null)
-        {
-            rowSet = separateRows(partition, columns);
-
-            // If we are building the view, we do not want to add old values; they will always be the same
-            if (!isBuilding)
-                readLocalRows(rowSet);
-        }
-        else
-        {
-            rowSet = existing.withNewViewPrimaryKey(columns);
-        }
-
-        return rowSet;
-    }
-
-
-    /**
-     * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
-     *                   since all of the update will already be present in the base table.
-     * @return View mutations which represent the changes necessary as long as previously created mutations for the view
-     *         have been applied successfully. This is based solely on the changes that are necessary given the current
-     *         state of the base table and the newly applying partition data.
-     */
-    public Collection<Mutation> createMutations(AbstractBTreePartition partition, TemporalRow.Set rowSet, boolean isBuilding)
-    {
-        if (!updateAffectsView(partition))
-            return null;
-
-        Collection<Mutation> mutations = null;
-        for (TemporalRow temporalRow : rowSet)
-        {
-            // If we are building, there is no need to check for partition tombstones; those values will not be present
-            // in the partition data
-            if (!isBuilding)
-            {
-                PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
-                if (partitionTombstone != null)
-                {
-                    if (mutations == null) mutations = new LinkedList<>();
-                    mutations.add(new Mutation(partitionTombstone));
-                }
-            }
-
-            PartitionUpdate insert = createUpdatesForInserts(temporalRow);
-            if (insert != null)
-            {
-                if (mutations == null) mutations = new LinkedList<>();
-                mutations.add(new Mutation(insert));
-            }
-        }
-
-        if (!isBuilding)
-        {
-            Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
-            if (deletion != null && !deletion.isEmpty())
-            {
-                if (mutations == null) mutations = new LinkedList<>();
-                mutations.addAll(deletion);
-            }
-        }
-
-        return mutations;
-    }
-
-    public synchronized void build()
-    {
-        if (this.builder != null)
-        {
-            this.builder.stop();
-            this.builder = null;
-        }
-
-        this.builder = new MaterializedViewBuilder(baseCfs, this);
-        CompactionManager.instance.submitMaterializedViewBuilder(builder);
-    }
-
-    @Nullable
-    public static CFMetaData findBaseTable(String keyspace, String view)
-    {
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
-        if (ksm == null)
-            return null;
-
-        for (CFMetaData cfm : ksm.tables)
-            if (cfm.getMaterializedViews().get(view).isPresent())
-                return cfm;
-
-        return null;
-    }
-
-    /**
-     * @return CFMetaData which represents the definition given
-     */
-    public static CFMetaData getCFMetaData(MaterializedViewDefinition definition,
-                                           CFMetaData baseCf,
-                                           CFProperties properties)
-    {
-        CFMetaData.Builder viewBuilder = CFMetaData.Builder
-                                         .createView(baseCf.ksName, definition.viewName);
-
-        ColumnDefinition nonPkTarget = null;
-
-        for (ColumnIdentifier targetIdentifier : definition.partitionColumns)
-        {
-            ColumnDefinition target = baseCf.getColumnDefinition(targetIdentifier);
-            if (!target.isPartitionKey())
-                nonPkTarget = target;
-
-            viewBuilder.addPartitionKey(target.name, properties.getReversableType(targetIdentifier, target.type));
-        }
-
-        Collection<ColumnDefinition> included = new ArrayList<>();
-        for(ColumnIdentifier identifier : definition.included)
-        {
-            ColumnDefinition cfDef = baseCf.getColumnDefinition(identifier);
-            assert cfDef != null;
-            included.add(cfDef);
-        }
-
-        boolean includeAll = included.isEmpty();
-
-        for (ColumnIdentifier ident : definition.clusteringColumns)
-        {
-            ColumnDefinition column = baseCf.getColumnDefinition(ident);
-            viewBuilder.addClusteringColumn(ident, properties.getReversableType(ident, column.type));
-        }
-
-        for (ColumnDefinition column : baseCf.partitionColumns().regulars)
-        {
-            if (column != nonPkTarget && (includeAll || included.contains(column)))
-            {
-                viewBuilder.addRegularColumn(column.name, column.type);
-            }
-        }
-
-        //Add any extra clustering columns
-        for (ColumnDefinition column : Iterables.concat(baseCf.partitionKeyColumns(), baseCf.clusteringColumns()))
-        {
-            if ( (!definition.partitionColumns.contains(column.name) && !definition.clusteringColumns.contains(column.name)) &&
-                 (includeAll || included.contains(column)) )
-            {
-                viewBuilder.addRegularColumn(column.name, column.type);
-            }
-        }
-
-        return viewBuilder.build().params(properties.properties.asNewTableParams());
-    }
-}