You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/16 16:35:38 UTC

[3/7] cassandra git commit: Improve MV schema representation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/schema/MaterializedViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MaterializedViews.java b/src/java/org/apache/cassandra/schema/MaterializedViews.java
deleted file mode 100644
index 1c55736..0000000
--- a/src/java/org/apache/cassandra/schema/MaterializedViews.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.schema;
-
-
-import java.util.Iterator;
-import java.util.Optional;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.cassandra.config.MaterializedViewDefinition;
-
-import static com.google.common.collect.Iterables.filter;
-
-public final class MaterializedViews implements Iterable<MaterializedViewDefinition>
-{
-    private final ImmutableMap<String, MaterializedViewDefinition> materializedViews;
-
-    private MaterializedViews(Builder builder)
-    {
-        materializedViews = builder.materializedViews.build();
-    }
-
-    public static Builder builder()
-    {
-        return new Builder();
-    }
-
-    public static MaterializedViews none()
-    {
-        return builder().build();
-    }
-
-    public Iterator<MaterializedViewDefinition> iterator()
-    {
-        return materializedViews.values().iterator();
-    }
-
-    public int size()
-    {
-        return materializedViews.size();
-    }
-
-    public boolean isEmpty()
-    {
-        return materializedViews.isEmpty();
-    }
-
-    /**
-     * Get the materialized view with the specified name
-     *
-     * @param name a non-qualified materialized view name
-     * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link MaterializedViewDefinition} otherwise
-     */
-    public Optional<MaterializedViewDefinition> get(String name)
-    {
-        return Optional.ofNullable(materializedViews.get(name));
-    }
-
-    /**
-     * Create a MaterializedViews instance with the provided materialized view added
-     */
-    public MaterializedViews with(MaterializedViewDefinition materializedView)
-    {
-        if (get(materializedView.viewName).isPresent())
-            throw new IllegalStateException(String.format("Materialized View %s already exists", materializedView.viewName));
-
-        return builder().add(this).add(materializedView).build();
-    }
-
-    /**
-     * Creates a MaterializedViews instance with the materializedView with the provided name removed
-     */
-    public MaterializedViews without(String name)
-    {
-        MaterializedViewDefinition materializedView =
-        get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name)));
-
-        return builder().add(filter(this, v -> v != materializedView)).build();
-    }
-
-    /**
-     * Creates a MaterializedViews instance which contains an updated materialized view
-     */
-    public MaterializedViews replace(MaterializedViewDefinition materializedView)
-    {
-        return without(materializedView.viewName).with(materializedView);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        return this == o || (o instanceof MaterializedViews && materializedViews.equals(((MaterializedViews) o).materializedViews));
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return materializedViews.hashCode();
-    }
-
-    @Override
-    public String toString()
-    {
-        return materializedViews.values().toString();
-    }
-
-    public static final class Builder
-    {
-        final ImmutableMap.Builder<String, MaterializedViewDefinition> materializedViews = new ImmutableMap.Builder<>();
-
-        private Builder()
-        {
-        }
-
-        public MaterializedViews build()
-        {
-            return new MaterializedViews(this);
-        }
-
-        public Builder add(MaterializedViewDefinition materializedView)
-        {
-            materializedViews.put(materializedView.viewName, materializedView);
-            return this;
-        }
-
-        public Builder add(Iterable<MaterializedViewDefinition> materializedViews)
-        {
-            materializedViews.forEach(this::add);
-            return this;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 55b841b..c922612 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -68,14 +68,14 @@ public final class SchemaKeyspace
     public static final String COLUMNS = "columns";
     public static final String DROPPED_COLUMNS = "dropped_columns";
     public static final String TRIGGERS = "triggers";
-    public static final String MATERIALIZED_VIEWS = "materialized_views";
+    public static final String VIEWS = "views";
     public static final String TYPES = "types";
     public static final String FUNCTIONS = "functions";
     public static final String AGGREGATES = "aggregates";
     public static final String INDEXES = "indexes";
 
     public static final List<String> ALL =
-        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
+        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
 
     private static final CFMetaData Keyspaces =
         compile(KEYSPACES,
@@ -144,17 +144,31 @@ public final class SchemaKeyspace
                 + "options frozen<map<text, text>>,"
                 + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
 
-    private static final CFMetaData MaterializedViews =
-        compile(MATERIALIZED_VIEWS,
-                "materialized views definitions",
+    private static final CFMetaData Views =
+        compile(VIEWS,
+                "view definitions",
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
-                + "table_name text,"
                 + "view_name text,"
-                + "target_columns frozen<list<text>>,"
-                + "clustering_columns frozen<list<text>>,"
-                + "included_columns frozen<list<text>>,"
-                + "PRIMARY KEY ((keyspace_name), table_name, view_name))");
+                + "base_table_id uuid,"
+                + "base_table_name text,"
+                + "bloom_filter_fp_chance double,"
+                + "caching frozen<map<text, text>>,"
+                + "comment text,"
+                + "compaction frozen<map<text, text>>,"
+                + "compression frozen<map<text, text>>,"
+                + "dclocal_read_repair_chance double,"
+                + "default_time_to_live int,"
+                + "extensions frozen<map<text, blob>>,"
+                + "gc_grace_seconds int,"
+                + "id uuid,"
+                + "include_all_columns boolean,"
+                + "max_index_interval int,"
+                + "memtable_flush_period_in_ms int,"
+                + "min_index_interval int,"
+                + "read_repair_chance double,"
+                + "speculative_retry text,"
+                + "PRIMARY KEY ((keyspace_name), view_name))");
 
     private static final CFMetaData Indexes =
         compile(INDEXES,
@@ -210,7 +224,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
 
     public static final List<CFMetaData> ALL_TABLE_METADATA =
-        ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, MaterializedViews, Types, Functions, Aggregates, Indexes);
+        ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes);
 
     private static CFMetaData compile(String name, String description, String schema)
     {
@@ -265,9 +279,10 @@ public final class SchemaKeyspace
 
                     readSchemaPartitionForKeyspaceAndApply(TYPES, key,
                         types -> readSchemaPartitionForKeyspaceAndApply(TABLES, key,
-                        tables -> readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
+                        tables -> readSchemaPartitionForKeyspaceAndApply(VIEWS, key,
+                        views -> readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
                         functions -> readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key,
-                        aggregates -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, types, functions, aggregates)))))
+                        aggregates -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, views, types, functions, aggregates))))))
                     );
                 }
             }
@@ -473,6 +488,7 @@ public final class SchemaKeyspace
         // current state of the schema
         Map<DecoratedKey, FilteredPartition> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
         Map<DecoratedKey, FilteredPartition> oldColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> oldViews = readSchemaForKeyspaces(VIEWS, keyspaces);
         Map<DecoratedKey, FilteredPartition> oldTypes = readSchemaForKeyspaces(TYPES, keyspaces);
         Map<DecoratedKey, FilteredPartition> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
         Map<DecoratedKey, FilteredPartition> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
@@ -485,12 +501,14 @@ public final class SchemaKeyspace
         // with new data applied
         Map<DecoratedKey, FilteredPartition> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
         Map<DecoratedKey, FilteredPartition> newColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> newViews = readSchemaForKeyspaces(VIEWS, keyspaces);
         Map<DecoratedKey, FilteredPartition> newTypes = readSchemaForKeyspaces(TYPES, keyspaces);
         Map<DecoratedKey, FilteredPartition> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
         Map<DecoratedKey, FilteredPartition> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
 
         Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
         mergeTables(oldColumnFamilies, newColumnFamilies);
+        mergeViews(oldViews, newViews);
         mergeTypes(oldTypes, newTypes);
         mergeFunctions(oldFunctions, newFunctions);
         mergeAggregates(oldAggregates, newAggregates);
@@ -546,6 +564,27 @@ public final class SchemaKeyspace
         });
     }
 
+    private static void mergeViews(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
+    {
+        diffSchema(before, after, new Differ()
+        {
+            public void onDropped(UntypedResultSet.Row oldRow)
+            {
+                Schema.instance.dropView(oldRow.getString("keyspace_name"), oldRow.getString("view_name"));
+            }
+
+            public void onAdded(UntypedResultSet.Row newRow)
+            {
+                Schema.instance.addView(createViewFromViewRow(newRow));
+            }
+
+            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
+            {
+                Schema.instance.updateView(newRow.getString("keyspace_name"), newRow.getString("view_name"));
+            }
+        });
+    }
+
     private static void mergeTypes(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
     {
         diffSchema(before, after, new Differ()
@@ -697,6 +736,7 @@ public final class SchemaKeyspace
         Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
 
         keyspace.tables.forEach(table -> addTableToSchemaMutation(table, timestamp, true, mutation));
+        keyspace.views.forEach(view -> addViewToSchemaMutation(view, timestamp, true, mutation));
         keyspace.types.forEach(type -> addTypeToSchemaMutation(type, timestamp, mutation));
         keyspace.functions.udfs().forEach(udf -> addFunctionToSchemaMutation(udf, timestamp, mutation));
         keyspace.functions.udas().forEach(uda -> addAggregateToSchemaMutation(uda, timestamp, mutation));
@@ -717,6 +757,7 @@ public final class SchemaKeyspace
 
     private static KeyspaceMetadata createKeyspaceFromSchemaPartitions(RowIterator serializedParams,
                                                                        RowIterator serializedTables,
+                                                                       RowIterator serializedViews,
                                                                        RowIterator serializedTypes,
                                                                        RowIterator serializedFunctions,
                                                                        RowIterator serializedAggregates)
@@ -725,13 +766,14 @@ public final class SchemaKeyspace
 
         KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(serializedParams);
         Tables tables = createTablesFromTablesPartition(serializedTables);
+        Views views = createViewsFromViewsPartition(serializedViews);
         Types types = createTypesFromPartition(serializedTypes);
 
         Collection<UDFunction> udfs = createFunctionsFromFunctionsPartition(serializedFunctions);
         Collection<UDAggregate> udas = createAggregatesFromAggregatesPartition(serializedAggregates);
         Functions functions = org.apache.cassandra.schema.Functions.builder().add(udfs).add(udas).build();
 
-        return KeyspaceMetadata.create(name, params, tables, types, functions);
+        return KeyspaceMetadata.create(name, params, tables, views, types, functions);
     }
 
     /**
@@ -849,9 +891,6 @@ public final class SchemaKeyspace
             for (TriggerMetadata trigger : table.getTriggers())
                 addTriggerToSchemaMutation(table, trigger, timestamp, mutation);
 
-            for (MaterializedViewDefinition materializedView: table.getMaterializedViews())
-                addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation);
-
             for (IndexMetadata index : table.getIndexes())
                 addIndexToSchemaMutation(table, index, timestamp, mutation);
         }
@@ -931,22 +970,6 @@ public final class SchemaKeyspace
         for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values())
             addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
 
-        MapDifference<String, MaterializedViewDefinition> materializedViewDiff = materializedViewsDiff(oldTable.getMaterializedViews(), newTable.getMaterializedViews());
-
-        // dropped materialized views
-        for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnLeft().values())
-            dropMaterializedViewFromSchemaMutation(oldTable, materializedView, timestamp, mutation);
-
-        // newly created materialized views
-        for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnRight().values())
-            addMaterializedViewToSchemaMutation(newTable, materializedView, timestamp, mutation);
-
-        // updated materialized views need to be updated
-        for (MapDifference.ValueDifference<MaterializedViewDefinition> diff : materializedViewDiff.entriesDiffering().values())
-        {
-            addUpdatedMaterializedViewDefinitionToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation);
-        }
-
         MapDifference<String, IndexMetadata> indexesDiff = indexesDiff(oldTable.getIndexes(),
                                                                        newTable.getIndexes());
 
@@ -989,17 +1012,6 @@ public final class SchemaKeyspace
         return Maps.difference(beforeMap, afterMap);
     }
 
-    private static MapDifference<String, MaterializedViewDefinition> materializedViewsDiff(MaterializedViews before, MaterializedViews after)
-    {
-        Map<String, MaterializedViewDefinition> beforeMap = new HashMap<>();
-        before.forEach(v -> beforeMap.put(v.viewName, v));
-
-        Map<String, MaterializedViewDefinition> afterMap = new HashMap<>();
-        after.forEach(v -> afterMap.put(v.viewName, v));
-
-        return Maps.difference(beforeMap, afterMap);
-    }
-
     public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -1013,9 +1025,6 @@ public final class SchemaKeyspace
         for (TriggerMetadata trigger : table.getTriggers())
             dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
 
-        for (MaterializedViewDefinition materializedView : table.getMaterializedViews())
-            dropMaterializedViewFromSchemaMutation(table, materializedView, timestamp, mutation);
-
         for (IndexMetadata index : table.getIndexes())
             dropIndexFromSchemaMutation(table, index, timestamp, mutation);
 
@@ -1083,12 +1092,8 @@ public final class SchemaKeyspace
         Triggers triggers =
             readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition);
 
-        MaterializedViews views =
-            readSchemaPartitionForTableAndApply(MATERIALIZED_VIEWS, keyspace, table, SchemaKeyspace::createMaterializedViewsFromMaterializedViewsPartition);
-
         CFMetaData cfm = createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns)
-                                                                        .triggers(triggers)
-                                                                        .materializedViews(views);
+                                                                        .triggers(triggers);
 
         // the CFMetaData itself is required to build the collection of indexes as
         // the column definitions are needed because we store only the name each
@@ -1114,7 +1119,6 @@ public final class SchemaKeyspace
         boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
         boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
         boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
-        boolean isMaterializedView = flags.contains(CFMetaData.Flag.VIEW);
 
         return CFMetaData.create(keyspace,
                                  table,
@@ -1123,7 +1127,7 @@ public final class SchemaKeyspace
                                  isCompound,
                                  isSuper,
                                  isCounter,
-                                 isMaterializedView,
+                                 false,
                                  columns,
                                  DatabaseDescriptor.getPartitioner())
                          .params(createTableParamsFromRow(row));
@@ -1274,82 +1278,164 @@ public final class SchemaKeyspace
     }
 
     /*
-     * Materialized View metadata serialization/deserialization.
+     * View metadata serialization/deserialization.
      */
 
-    private static void addMaterializedViewToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+    public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addViewToSchemaMutation(view, timestamp, true, mutation);
+        return mutation;
+    }
+
+    private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation)
     {
-        RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation)
-            .clustering(table.cfName, materializedView.viewName);
+        RowUpdateBuilder builder = new RowUpdateBuilder(Views, timestamp, mutation)
+            .clustering(view.viewName);
+
+        CFMetaData table = view.metadata;
 
-        builder.frozenList("target_columns", materializedView.partitionColumns.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()));
-        builder.frozenList("clustering_columns", materializedView.clusteringColumns.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()));
-        builder.frozenList("included_columns", materializedView.included.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()));
+        builder.add("include_all_columns", view.includeAllColumns)
+               .add("base_table_id", view.baseTableId)
+               .add("base_table_name", view.baseTableMetadata().cfName)
+               .add("id", table.cfId);
+
+        addTableParamsToSchemaMutation(table.params, builder);
+
+        if (includeColumns)
+        {
+            for (ColumnDefinition column : table.allColumns())
+                addColumnToSchemaMutation(table, column, timestamp, mutation);
+
+            for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values())
+                addDroppedColumnToSchemaMutation(table, column, timestamp, mutation);
+        }
 
         builder.build();
     }
 
-    private static void dropMaterializedViewFromSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+    public static Mutation makeDropViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
     {
-        RowUpdateBuilder.deleteRow(MaterializedViews, timestamp, mutation, table.cfName, materializedView.viewName);
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+
+        RowUpdateBuilder.deleteRow(Views, timestamp, mutation, view.viewName);
+
+        CFMetaData table = view.metadata;
+        for (ColumnDefinition column : table.allColumns())
+            dropColumnFromSchemaMutation(table, column, timestamp, mutation);
+
+        for (IndexMetadata index : table.getIndexes())
+            dropIndexFromSchemaMutation(table, index, timestamp, mutation);
+
+        return mutation;
     }
 
-    private static void addUpdatedMaterializedViewDefinitionToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+    public static Mutation makeUpdateViewMutation(KeyspaceMetadata keyspace,
+                                                  ViewDefinition oldView,
+                                                  ViewDefinition newView,
+                                                  long timestamp)
     {
-        addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation);
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+
+        addViewToSchemaMutation(newView, timestamp, false, mutation);
+
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldView.metadata.getColumnMetadata(),
+                                                                                 newView.metadata.getColumnMetadata());
+
+        // columns that are no longer needed
+        for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values())
+        {
+            dropColumnFromSchemaMutation(oldView.metadata, column, timestamp, mutation);
+        }
+
+        // newly added columns
+        for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values())
+            addColumnToSchemaMutation(newView.metadata, column, timestamp, mutation);
+
+        // old columns with updated attributes
+        for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
+            addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumnDefinition(name), timestamp, mutation);
+
+        // dropped columns
+        MapDifference<ByteBuffer, CFMetaData.DroppedColumn> droppedColumnDiff =
+        Maps.difference(oldView.metadata.getDroppedColumns(), oldView.metadata.getDroppedColumns());
+
+        // newly dropped columns
+        for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values())
+            addDroppedColumnToSchemaMutation(oldView.metadata, column, timestamp, mutation);
+
+        // columns added then dropped again
+        for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet())
+            addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.getDroppedColumns().get(name), timestamp, mutation);
+
+        return mutation;
+    }
+
+    public static ViewDefinition createViewFromName(String keyspace, String view)
+    {
+        return readSchemaPartitionForTableAndApply(VIEWS, keyspace, view, partition ->
+        {
+            if (partition.isEmpty())
+                throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, view));
+
+            return createViewFromViewPartition(partition);
+        });
+    }
+
+    private static ViewDefinition createViewFromViewPartition(RowIterator partition)
+    {
+        String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
+        return createViewFromViewRow(QueryProcessor.resultify(query, partition).one());
     }
 
     /**
-     * Deserialize materialized views from storage-level representation.
+     * Deserialize views from storage-level representation.
      *
-     * @param partition storage-level partition containing the materialized view definitions
-     * @return the list of processed MaterializedViewDefinitions
+     * @param partition storage-level partition containing the view definitions
+     * @return the list of processed ViewDefinitions
      */
-    private static MaterializedViews createMaterializedViewsFromMaterializedViewsPartition(RowIterator partition)
+    private static Views createViewsFromViewsPartition(RowIterator partition)
     {
-        MaterializedViews.Builder views = org.apache.cassandra.schema.MaterializedViews.builder();
-        String query = String.format("SELECT * FROM %s.%s", NAME, MATERIALIZED_VIEWS);
+        Views.Builder views = org.apache.cassandra.schema.Views.builder();
+        String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
         for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
         {
-            MaterializedViewDefinition mv = createMaterializedViewFromMaterializedViewRow(row);
-            views.add(mv);
+            ViewDefinition view = createViewFromViewRow(row);
+            views.add(view);
         }
         return views.build();
     }
 
-    private static MaterializedViewDefinition createMaterializedViewFromMaterializedViewRow(UntypedResultSet.Row row)
+    private static ViewDefinition createViewFromViewRow(UntypedResultSet.Row row)
     {
-        String name = row.getString("view_name");
-        List<String> partitionColumnNames = row.getFrozenList("target_columns", UTF8Type.instance);
-
-        String cfName = row.getString("table_name");
-        List<String> clusteringColumnNames = row.getFrozenList("clustering_columns", UTF8Type.instance);
-
-        List<ColumnIdentifier> partitionColumns = new ArrayList<>();
-        for (String columnName : partitionColumnNames)
-        {
-            partitionColumns.add(ColumnIdentifier.getInterned(columnName, true));
-        }
-
-        List<ColumnIdentifier> clusteringColumns = new ArrayList<>();
-        for (String columnName : clusteringColumnNames)
-        {
-            clusteringColumns.add(ColumnIdentifier.getInterned(columnName, true));
-        }
+        String keyspace = row.getString("keyspace_name");
+        String view = row.getString("view_name");
+        UUID id = row.getUUID("id");
+        UUID baseTableId = row.getUUID("base_table_id");
+        boolean includeAll = row.getBoolean("include_all_columns");
 
-        List<String> includedColumnNames = row.getFrozenList("included_columns", UTF8Type.instance);
-        Set<ColumnIdentifier> includedColumns = new HashSet<>();
-        if (includedColumnNames != null)
-        {
-            for (String columnName : includedColumnNames)
-                includedColumns.add(ColumnIdentifier.getInterned(columnName, true));
-        }
+        List<ColumnDefinition> columns =
+            readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, SchemaKeyspace::createColumnsFromColumnsPartition);
 
-        return new MaterializedViewDefinition(cfName,
-                                              name,
-                                              partitionColumns,
-                                              clusteringColumns,
-                                              includedColumns);
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns =
+            readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, view, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition);
+
+        CFMetaData cfm = CFMetaData.create(keyspace,
+                                           view,
+                                           id,
+                                           false,
+                                           true,
+                                           false,
+                                           false,
+                                           true,
+                                           columns,
+                                           DatabaseDescriptor.getPartitioner())
+                                   .params(createTableParamsFromRow(row))
+                                   .droppedColumns(droppedColumns);
+
+        return new ViewDefinition(keyspace, view, baseTableId, includeAll, cfm);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/schema/Views.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Views.java b/src/java/org/apache/cassandra/schema/Views.java
new file mode 100644
index 0000000..5888b9d
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ViewDefinition;
+
+import static com.google.common.collect.Iterables.filter;
+
+public final class Views implements Iterable<ViewDefinition>
+{
+    private final ImmutableMap<String, ViewDefinition> views;
+
+    private Views(Builder builder)
+    {
+        views = builder.views.build();
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static Views none()
+    {
+        return builder().build();
+    }
+
+    public Iterator<ViewDefinition> iterator()
+    {
+        return views.values().iterator();
+    }
+
+    public Iterable<CFMetaData> metadatas()
+    {
+        return Iterables.transform(views.values(), view -> view.metadata);
+    }
+
+    public int size()
+    {
+        return views.size();
+    }
+
+    public boolean isEmpty()
+    {
+        return views.isEmpty();
+    }
+
+    /**
+     * Get the materialized view with the specified name
+     *
+     * @param name a non-qualified materialized view name
+     * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link ViewDefinition} otherwise
+     */
+    public Optional<ViewDefinition> get(String name)
+    {
+        return Optional.ofNullable(views.get(name));
+    }
+
+    /**
+     * Get the view with the specified name
+     *
+     * @param name a non-qualified view name
+     * @return null if the view name is not found; the found {@link ViewDefinition} otherwise
+     */
+    @Nullable
+    public ViewDefinition getNullable(String name)
+    {
+        return views.get(name);
+    }
+
+    /**
+     * Create a MaterializedViews instance with the provided materialized view added
+     */
+    public Views with(ViewDefinition view)
+    {
+        if (get(view.viewName).isPresent())
+            throw new IllegalStateException(String.format("Materialized View %s already exists", view.viewName));
+
+        return builder().add(this).add(view).build();
+    }
+
+    /**
+     * Creates a MaterializedViews instance with the materializedView with the provided name removed
+     */
+    public Views without(String name)
+    {
+        ViewDefinition materializedView =
+            get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name)));
+
+        return builder().add(filter(this, v -> v != materializedView)).build();
+    }
+
+    /**
+     * Creates a MaterializedViews instance which contains an updated materialized view
+     */
+    public Views replace(ViewDefinition view, CFMetaData cfm)
+    {
+        return without(view.viewName).with(view);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        return this == o || (o instanceof Views && views.equals(((Views) o).views));
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return views.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return views.values().toString();
+    }
+
+    public static final class Builder
+    {
+        final ImmutableMap.Builder<String, ViewDefinition> views = new ImmutableMap.Builder<>();
+
+        private Builder()
+        {
+        }
+
+        public Views build()
+        {
+            return new Views(this);
+        }
+
+
+        public Builder add(ViewDefinition view)
+        {
+            views.put(view.viewName, view);
+            return this;
+        }
+
+        public Builder add(Iterable<ViewDefinition> views)
+        {
+            views.forEach(this::add);
+            return this;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 230b46a..1408a70 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -230,7 +230,7 @@ public class CassandraDaemon
             if (keyspaceName.equals(SystemKeyspace.NAME))
                 continue;
 
-            for (CFMetaData cfm : Schema.instance.getTables(keyspaceName))
+            for (CFMetaData cfm : Schema.instance.getTablesAndViews(keyspaceName))
                 ColumnFamilyStore.scrubDataDirectories(cfm);
         }
 
@@ -295,22 +295,19 @@ public class CassandraDaemon
             }
         }
 
-        Runnable indexRebuild = new Runnable()
+        Runnable viewRebuild = new Runnable()
         {
             @Override
             public void run()
             {
                 for (Keyspace keyspace : Keyspace.all())
                 {
-                    for (ColumnFamilyStore cf: keyspace.getColumnFamilyStores())
-                    {
-                        cf.materializedViewManager.buildAllViews();
-                    }
+                    keyspace.viewManager.buildAllViews();
                 }
             }
         };
 
-        ScheduledExecutors.optionalTasks.schedule(indexRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
+        ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
 
 
         SystemKeyspace.finishStartup();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java b/src/java/org/apache/cassandra/service/MigrationListener.java
index 358b236..f4b3e7c 100644
--- a/src/java/org/apache/cassandra/service/MigrationListener.java
+++ b/src/java/org/apache/cassandra/service/MigrationListener.java
@@ -31,6 +31,11 @@ public abstract class MigrationListener
     {
     }
 
+    public void onCreateView(String ksName, String viewName)
+    {
+        onCreateColumnFamily(ksName, viewName);
+    }
+
     public void onCreateUserType(String ksName, String typeName)
     {
     }
@@ -51,6 +56,11 @@ public abstract class MigrationListener
     {
     }
 
+    public void onUpdateView(String ksName, String viewName, boolean columnsDidChange)
+    {
+        onUpdateColumnFamily(ksName, viewName, columnsDidChange);
+    }
+
     public void onUpdateUserType(String ksName, String typeName)
     {
     }
@@ -71,6 +81,10 @@ public abstract class MigrationListener
     {
     }
 
+    public void onDropView(String ksName, String viewName)
+    {
+    }
+
     public void onDropUserType(String ksName, String typeName)
     {
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index dad6aa7..c820f18 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.*;
@@ -162,6 +163,12 @@ public class MigrationManager
             listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
+    public void notifyCreateView(ViewDefinition view)
+    {
+        for (MigrationListener listener : listeners)
+            listener.onCreateView(view.ksName, view.viewName);
+    }
+
     public void notifyCreateUserType(UserType ut)
     {
         for (MigrationListener listener : listeners)
@@ -192,6 +199,12 @@ public class MigrationManager
             listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName, columnsDidChange);
     }
 
+    public void notifyUpdateView(ViewDefinition view, boolean columnsDidChange)
+    {
+        for (MigrationListener listener : listeners)
+            listener.onUpdateView(view.ksName, view.viewName, columnsDidChange);
+    }
+
     public void notifyUpdateUserType(UserType ut)
     {
         for (MigrationListener listener : listeners)
@@ -225,6 +238,12 @@ public class MigrationManager
             listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
     }
 
+    public void notifyDropView(ViewDefinition view)
+    {
+        for (MigrationListener listener : listeners)
+            listener.onDropView(view.ksName, view.viewName);
+    }
+
     public void notifyDropUserType(UserType ut)
     {
         for (MigrationListener listener : listeners)
@@ -276,13 +295,28 @@ public class MigrationManager
         KeyspaceMetadata ksm = Schema.instance.getKSMetaData(cfm.ksName);
         if (ksm == null)
             throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName));
-        else if (ksm.tables.get(cfm.cfName).isPresent())
+        // If we have a table or a view which has the same name, we can't add a new one
+        else if (ksm.getTableOrViewNullable(cfm.cfName) != null)
             throw new AlreadyExistsException(cfm.ksName, cfm.cfName);
 
         logger.info(String.format("Create new table: %s", cfm));
         announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally);
     }
 
+    public static void announceNewView(ViewDefinition view, boolean announceLocally) throws ConfigurationException
+    {
+        view.metadata.validate();
+
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(view.ksName);
+        if (ksm == null)
+            throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", view.viewName, view.ksName));
+        else if (ksm.getTableOrViewNullable(view.viewName) != null)
+            throw new AlreadyExistsException(view.ksName, view.viewName);
+
+        logger.info(String.format("Create new view: %s", view));
+        announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally);
+    }
+
     public static void announceNewType(UserType newType, boolean announceLocally)
     {
         KeyspaceMetadata ksm = Schema.instance.getKSMetaData(newType.keyspace);
@@ -340,6 +374,21 @@ public class MigrationManager
         announce(SchemaKeyspace.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros(), fromThrift), announceLocally);
     }
 
+    public static void announceViewUpdate(ViewDefinition view, boolean announceLocally) throws ConfigurationException
+    {
+        view.metadata.validate();
+
+        ViewDefinition oldView = Schema.instance.getView(view.ksName, view.viewName);
+        if (oldView == null)
+            throw new ConfigurationException(String.format("Cannot update non existing materialized view '%s' in keyspace '%s'.", view.viewName, view.ksName));
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(view.ksName);
+
+        oldView.metadata.validateCompatility(view.metadata);
+
+        logger.info(String.format("Update view '%s/%s' From %s To %s", view.ksName, view.viewName, oldView, view));
+        announce(SchemaKeyspace.makeUpdateViewMutation(ksm, oldView, view, FBUtilities.timestampMicros()), announceLocally);
+    }
+
     public static void announceTypeUpdate(UserType updatedType, boolean announceLocally)
     {
         announceNewType(updatedType, announceLocally);
@@ -376,6 +425,17 @@ public class MigrationManager
         announce(SchemaKeyspace.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally);
     }
 
+    public static void announceViewDrop(String ksName, String viewName, boolean announceLocally) throws ConfigurationException
+    {
+        ViewDefinition view = Schema.instance.getView(ksName, viewName);
+        if (view == null)
+            throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", viewName, ksName));
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
+
+        logger.info(String.format("Drop table '%s/%s'", view.ksName, view.viewName));
+        announce(SchemaKeyspace.makeDropViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally);
+    }
+
     public static void announceTypeDrop(UserType droppedType)
     {
         announceTypeDrop(droppedType, false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index eca0c7e..16ff488 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -270,7 +270,7 @@ public class StartupChecks
             // we do a one-off scrub of the system keyspace first; we can't load the list of the rest of the keyspaces,
             // until system keyspace is opened.
 
-            for (CFMetaData cfm : Schema.instance.getTables(SystemKeyspace.NAME))
+            for (CFMetaData cfm : Schema.instance.getTablesAndViews(SystemKeyspace.NAME))
                 ColumnFamilyStore.scrubDataDirectories(cfm);
 
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e3b884e..d209af6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -48,12 +48,9 @@ import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.view.MaterializedViewManager;
-import org.apache.cassandra.db.view.MaterializedViewUtils;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.RingPosition;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.view.ViewManager;
+import org.apache.cassandra.db.view.ViewUtils;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
@@ -98,7 +95,7 @@ public class StorageProxy implements StorageProxyMBean
     private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
     private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite");
     private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
-    private static final MVWriteMetrics mvWriteMetrics = new MVWriteMetrics("MVWrite");
+    private static final ViewWriteMetrics viewWriteMetrics = new ViewWriteMetrics("ViewWrite");
 
     private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
 
@@ -655,6 +652,7 @@ public class StorageProxy implements StorageProxyMBean
      * @param mutations the mutations to be applied across the replicas
      */
     public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog)
+    throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for mutation");
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -679,15 +677,15 @@ public class StorageProxy implements StorageProxyMBean
             {
                 String keyspaceName = mutation.getKeyspaceName();
                 Token tk = mutation.key().getToken();
-                InetAddress pairedEndpoint = MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+                InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
                 List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
 
-                WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation,
-                                                                                 consistencyLevel,
-                                                                                 consistencyLevel,
-                                                                                 naturalEndpoints,
-                                                                                 WriteType.BATCH,
-                                                                                 cleanup);
+                WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation,
+                                                                                   consistencyLevel,
+                                                                                   consistencyLevel,
+                                                                                   naturalEndpoints,
+                                                                                   WriteType.BATCH,
+                                                                                   cleanup);
 
                 // When local node is the endpoint and there are no pending nodes we can
                 // Just apply the mutation locally.
@@ -704,12 +702,12 @@ public class StorageProxy implements StorageProxyMBean
                                       writeCommitLog);
 
                 // now actually perform the writes and wait for them to complete
-                asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+                asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
             }
         }
         finally
         {
-            mvWriteMetrics.addNano(System.nanoTime() - startTime);
+            viewWriteMetrics.addNano(System.nanoTime() - startTime);
         }
     }
 
@@ -721,7 +719,9 @@ public class StorageProxy implements StorageProxyMBean
     {
         Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
 
-        boolean updatesView = MaterializedViewManager.updatesAffectView(mutations, true);
+        boolean updatesView = Keyspace.open(mutations.iterator().next().getKeyspaceName())
+                              .viewManager
+                              .updatesAffectView(mutations, true);
 
         if (augmented != null)
             mutateAtomically(augmented, consistencyLevel, updatesView);
@@ -974,14 +974,14 @@ public class StorageProxy implements StorageProxyMBean
 
     /**
      * Same as performWrites except does not initiate writes (but does perform availability checks).
-     * Keeps track of MVWriteMetrics
+     * Keeps track of ViewWriteMetrics
      */
-    private static WriteResponseHandlerWrapper wrapMVBatchResponseHandler(Mutation mutation,
-                                                                          ConsistencyLevel consistency_level,
-                                                                          ConsistencyLevel batchConsistencyLevel,
-                                                                          List<InetAddress> naturalEndpoints,
-                                                                          WriteType writeType,
-                                                                          BatchlogResponseHandler.BatchlogCleanup cleanup)
+    private static WriteResponseHandlerWrapper wrapViewBatchResponseHandler(Mutation mutation,
+                                                                            ConsistencyLevel consistency_level,
+                                                                            ConsistencyLevel batchConsistencyLevel,
+                                                                            List<InetAddress> naturalEndpoints,
+                                                                            WriteType writeType,
+                                                                            BatchlogResponseHandler.BatchlogCleanup cleanup)
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -989,7 +989,7 @@ public class StorageProxy implements StorageProxyMBean
         Token tk = mutation.key().getToken();
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
         AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
-        BatchlogResponseHandler<IMutation> batchHandler = new MVWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+        BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
 
@@ -2282,20 +2282,20 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
-     * This class captures metrics for materialized views writes.
+     * This class captures metrics for views writes.
      */
-    private static class MVWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
+    private static class ViewWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
     {
-        public MVWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
+        public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
         {
             super(writeHandler, i, cleanup);
-            mvWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+            viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
         }
 
         public void response(MessageIn<IMutation> msg)
         {
             super.response(msg);
-            mvWriteMetrics.viewReplicasSuccess.inc();
+            viewWriteMetrics.viewReplicasSuccess.inc();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f095630..f0ad46f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -630,12 +630,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             public void runMayThrow() throws InterruptedException
             {
                 inShutdownHook = true;
-                ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
+                ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION);
                 ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
                 ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
                 if (mutationStage.isShutdown()
                     && counterMutationStage.isShutdown()
-                    && materializedViewMutationStage.isShutdown())
+                    && viewMutationStage.isShutdown())
                     return; // drained already
 
                 if (daemon != null)
@@ -646,11 +646,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // In-progress writes originating here could generate hints to be written, so shut down MessagingService
                 // before mutation stage, so we can get all the hints saved before shutting down
                 MessagingService.instance().shutdown();
-                materializedViewMutationStage.shutdown();
+                viewMutationStage.shutdown();
                 HintsService.instance.pauseDispatch();
                 counterMutationStage.shutdown();
                 mutationStage.shutdown();
-                materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+                viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 StorageProxy.instance.verifyNoHintsInProgress();
@@ -3179,7 +3179,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (ksMetaData == null)
             throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'");
 
-        CFMetaData cfMetaData = ksMetaData.tables.getNullable(cf);
+        CFMetaData cfMetaData = ksMetaData.getTableOrViewNullable(cf);
         if (cfMetaData == null)
             throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
 
@@ -3876,11 +3876,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         HintsService.instance.pauseDispatch();
 
         ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
-        ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
+        ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION);
         ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
         if (mutationStage.isTerminated()
             && counterMutationStage.isTerminated()
-            && materializedViewMutationStage.isTerminated())
+            && viewMutationStage.isTerminated())
         {
             logger.warn("Cannot drain node (did it already happen?)");
             return;
@@ -3894,10 +3894,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MessagingService.instance().shutdown();
 
         setMode(Mode.DRAINING, "clearing mutation stage", false);
-        materializedViewMutationStage.shutdown();
+        viewMutationStage.shutdown();
         counterMutationStage.shutdown();
         mutationStage.shutdown();
-        materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+        viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index cb99654..f261954 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,10 +34,10 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -127,7 +128,7 @@ public class StreamReceiveTask extends StreamTask
                 return;
             }
             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-            boolean hasMaterializedViews = cfs.materializedViewManager.allViews().iterator().hasNext();
+            boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
 
             try
             {
@@ -143,11 +144,11 @@ public class StreamReceiveTask extends StreamTask
 
                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    //We have a special path for Materialized view.
-                    //Since the MV requires cleaning up any pre-existing state, we must put
+                    //We have a special path for views.
+                    //Since the view requires cleaning up any pre-existing state, we must put
                     //all partitions through the same write path as normal mutations.
-                    //This also ensures any 2i's are also updated
-                    if (hasMaterializedViews)
+                    //This also ensures any 2is are also updated
+                    if (hasViews)
                     {
                         for (SSTableReader reader : readers)
                         {
@@ -183,7 +184,7 @@ public class StreamReceiveTask extends StreamTask
                 {
                     //We don't keep the streamed sstables since we've applied them manually
                     //So we abort the txn and delete the streamed sstables
-                    if (hasMaterializedViews)
+                    if (hasViews)
                     {
                         cfs.forceBlockingFlush();
                         task.txn.abort();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 207bb6f..7017bc1 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.*;
@@ -815,7 +816,7 @@ public class CassandraServer implements Cassandra.Iface
         cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.MODIFY);
 
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
-        if (metadata.isMaterializedView())
+        if (metadata.isView())
             throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
 
         ThriftValidation.validateKey(metadata, key);
@@ -912,7 +913,7 @@ public class CassandraServer implements Cassandra.Iface
             cState.hasColumnFamilyAccess(keyspace, column_family, Permission.SELECT);
 
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family, false);
-            if (metadata.isMaterializedView())
+            if (metadata.isView())
                 throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
 
             ThriftValidation.validateKey(metadata, key);
@@ -1108,7 +1109,7 @@ public class CassandraServer implements Cassandra.Iface
                 cState.hasColumnFamilyAccess(keyspace, cfName, Permission.MODIFY);
 
                 CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, cfName);
-                if (metadata.isMaterializedView())
+                if (metadata.isView())
                     throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
 
                 ThriftValidation.validateKey(metadata, key);
@@ -1325,7 +1326,7 @@ public class CassandraServer implements Cassandra.Iface
         cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.MODIFY);
 
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family, isCommutativeOp);
-        if (metadata.isMaterializedView())
+        if (metadata.isView())
             throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
 
         ThriftValidation.validateKey(metadata, key);
@@ -1903,7 +1904,7 @@ public class CassandraServer implements Cassandra.Iface
             cState.hasColumnFamilyAccess(keyspace, column_family, Permission.DROP);
 
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family);
-            if (metadata.isMaterializedView())
+            if (metadata.isView())
                 throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot drop Materialized Views from Thrift");
 
             MigrationManager.announceColumnFamilyDrop(keyspace, column_family);
@@ -2015,9 +2016,9 @@ public class CassandraServer implements Cassandra.Iface
             if (oldCfm == null)
                 throw new InvalidRequestException("Could not find table definition to modify.");
 
-            if (oldCfm.isMaterializedView())
+            if (oldCfm.isView())
                 throw new InvalidRequestException("Cannot modify Materialized View table " + oldCfm.cfName + " as it may break the schema. You should use cqlsh to modify Materialized View tables instead.");
-            if (!oldCfm.getMaterializedViews().isEmpty())
+            if (!Iterables.isEmpty(View.findAll(cf_def.keyspace, cf_def.name)))
                 throw new InvalidRequestException("Cannot modify table with Materialized View " + oldCfm.cfName + " as it may break the schema. You should use cqlsh to modify tables with Materialized Views instead.");
 
             if (!oldCfm.isThriftCompatible())
@@ -2047,7 +2048,7 @@ public class CassandraServer implements Cassandra.Iface
             String keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(keyspace, cfname, Permission.MODIFY);
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, cfname, false);
-            if (metadata.isMaterializedView())
+            if (metadata.isView())
                 throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot truncate Materialized Views");
 
             if (startSessionIfRequested())
@@ -2134,7 +2135,7 @@ public class CassandraServer implements Cassandra.Iface
             cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.MODIFY);
 
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
-            if (metadata.isMaterializedView())
+            if (metadata.isView())
                 throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
 
             ThriftValidation.validateKey(metadata, key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 86cfe42..b721226 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -162,7 +162,7 @@ public class ThriftConversion
     public static KsDef toThrift(KeyspaceMetadata ksm)
     {
         List<CfDef> cfDefs = new ArrayList<>();
-        for (CFMetaData cfm : ksm.tables)
+        for (CFMetaData cfm : ksm.tables) // do not include views
             if (cfm.isThriftCompatible()) // Don't expose CF that cannot be correctly handle by thrift; see CASSANDRA-4377 for further details
                 cfDefs.add(toThrift(cfm));
 
@@ -272,8 +272,8 @@ public class ThriftConversion
                                       defaultValidator);
             }
 
-            // We do not allow Thrift materialized views, so we always set it to false
-            boolean isMaterializedView = false;
+            // We do not allow Thrift views, so we always set it to false
+            boolean isView = false;
 
             CFMetaData newCFMD = CFMetaData.create(cf_def.keyspace,
                                                    cf_def.name,
@@ -282,7 +282,7 @@ public class ThriftConversion
                                                    isCompound,
                                                    isSuper,
                                                    isCounter,
-                                                   isMaterializedView,
+                                                   isView,
                                                    defs,
                                                    DatabaseDescriptor.getPartitioner());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index d602076..8dff532 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -80,6 +80,8 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
             }
 
             tables.putAll(fetchTablesMetadata(keyspace, session, partitioner));
+            // We only need the CFMetaData for the views, so we only load that.
+            tables.putAll(fetchViewMetadata(keyspace, session, partitioner));
         }
     }
 
@@ -111,41 +113,61 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
         for (Row row : session.execute(query, keyspace))
         {
             String name = row.getString("table_name");
-            UUID id = row.getUUID("id");
-
-            Set<CFMetaData.Flag> flags = row.isNull("flags")
-                                       ? Collections.emptySet()
-                                       : CFMetaData.flagsFromStrings(row.getSet("flags", String.class));
-
-            boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
-            boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
-            boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
-            boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
-            boolean isMaterializedView = flags.contains(CFMetaData.Flag.VIEW);
-
-            String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
-                                                SchemaKeyspace.NAME,
-                                                SchemaKeyspace.COLUMNS);
-
-            List<ColumnDefinition> defs = new ArrayList<>();
-            for (Row colRow : session.execute(columnsQuery, keyspace, name))
-                defs.add(createDefinitionFromRow(colRow, keyspace, name));
-
-            tables.put(name, CFMetaData.create(keyspace,
-                                               name,
-                                               id,
-                                               isDense,
-                                               isCompound,
-                                               isSuper,
-                                               isCounter,
-                                               isMaterializedView,
-                                               defs,
-                                               partitioner));
+            tables.put(name, createTableMetadata(keyspace, session, partitioner, false, row, name));
         }
 
         return tables;
     }
 
+    /*
+     * In the case where we are creating View CFMetaDatas, we
+     */
+    private static Map<String, CFMetaData> fetchViewMetadata(String keyspace, Session session, IPartitioner partitioner)
+    {
+        Map<String, CFMetaData> tables = new HashMap<>();
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaKeyspace.NAME, SchemaKeyspace.VIEWS);
+
+        for (Row row : session.execute(query, keyspace))
+        {
+            String name = row.getString("view_name");
+            tables.put(name, createTableMetadata(keyspace, session, partitioner, true, row, name));
+        }
+
+        return tables;
+    }
+
+    private static CFMetaData createTableMetadata(String keyspace, Session session, IPartitioner partitioner, boolean isView, Row row, String name)
+    {
+        UUID id = row.getUUID("id");
+        Set<CFMetaData.Flag> flags = row.isNull("flags")
+                                     ? Collections.emptySet()
+                                     : CFMetaData.flagsFromStrings(row.getSet("flags", String.class));
+
+        boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
+        boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
+        boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
+        boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+
+        String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
+                                            SchemaKeyspace.NAME,
+                                            SchemaKeyspace.COLUMNS);
+
+        List<ColumnDefinition> defs = new ArrayList<>();
+        for (Row colRow : session.execute(columnsQuery, keyspace, name))
+            defs.add(createDefinitionFromRow(colRow, keyspace, name));
+
+        return CFMetaData.create(keyspace,
+                                 name,
+                                 id,
+                                 isDense,
+                                 isCompound,
+                                 isSuper,
+                                 isCounter,
+                                 isView,
+                                 defs,
+                                 partitioner);
+    }
+
     private static ColumnDefinition createDefinitionFromRow(Row row, String keyspace, String table)
     {
         ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
deleted file mode 100644
index b833e60..0000000
--- a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CyclicBarrier;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.exceptions.WriteTimeoutException;
-import org.apache.cassandra.concurrent.SEPExecutor;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.batchlog.BatchlogManager;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class MaterializedViewLongTest extends CQLTester
-{
-    int protocolVersion = 4;
-    private final List<String> materializedViews = new ArrayList<>();
-
-    @BeforeClass
-    public static void startup()
-    {
-        requireNetwork();
-    }
-    @Before
-    public void begin()
-    {
-        materializedViews.clear();
-    }
-
-    @After
-    public void end() throws Throwable
-    {
-        for (String viewName : materializedViews)
-            executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + viewName);
-    }
-
-    private void createView(String name, String query) throws Throwable
-    {
-        executeNet(protocolVersion, String.format(query, name));
-        // If exception is thrown, the view will not be added to the list; since it shouldn't have been created, this is
-        // the desired behavior
-        materializedViews.add(name);
-    }
-
-    @Test
-    public void testConflictResolution() throws Throwable
-    {
-        final int writers = 96;
-        final int insertsPerWriter = 50;
-        final Map<Integer, Exception> failedWrites = new ConcurrentHashMap<>();
-
-        createTable("CREATE TABLE %s (" +
-                    "a int," +
-                    "b int," +
-                    "c int," +
-                    "PRIMARY KEY (a, b))");
-
-        executeNet(protocolVersion, "USE " + keyspace());
-
-        createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
-
-        CyclicBarrier semaphore = new CyclicBarrier(writers);
-
-        Thread[] threads = new Thread[writers];
-        for (int i = 0; i < writers; i++)
-        {
-            final int writer = i;
-            Thread t = new Thread(new WrappedRunnable()
-            {
-                public void runMayThrow()
-                {
-                    try
-                    {
-                        int writerOffset = writer * insertsPerWriter;
-                        semaphore.await();
-                        for (int i = 0; i < insertsPerWriter; i++)
-                        {
-                            try
-                            {
-                                executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES (?, ?, ?) USING TIMESTAMP 1",
-                                           1,
-                                           1,
-                                           i + writerOffset);
-                            }
-                            catch (NoHostAvailableException|WriteTimeoutException e)
-                            {
-                                failedWrites.put(i + writerOffset, e);
-                            }
-                        }
-                    }
-                    catch (Throwable e)
-                    {
-                        throw new RuntimeException(e);
-                    }
-                }
-            });
-            t.start();
-            threads[i] = t;
-        }
-
-        for (int i = 0; i < writers; i++)
-            threads[i].join();
-
-        for (int i = 0; i < writers * insertsPerWriter; i++)
-        {
-            if (executeNet(protocolVersion, "SELECT COUNT(*) FROM system.batchlog").one().getLong(0) == 0)
-                break;
-            try
-            {
-                // This will throw exceptions whenever there are exceptions trying to push the materialized view values
-                // out, caused by the view becoming overwhelmed.
-                BatchlogManager.instance.startBatchlogReplay().get();
-            }
-            catch (Throwable ignore)
-            {
-
-            }
-        }
-
-        int value = executeNet(protocolVersion, "SELECT c FROM %s WHERE a = 1 AND b = 1").one().getInt("c");
-
-        List<Row> rows = executeNet(protocolVersion, "SELECT c FROM " + keyspace() + ".mv").all();
-
-        boolean containsC = false;
-        StringBuilder others = new StringBuilder();
-        StringBuilder overlappingFailedWrites = new StringBuilder();
-        for (Row row : rows)
-        {
-            int c = row.getInt("c");
-            if (c == value)
-                containsC = true;
-            else
-            {
-                if (others.length() != 0)
-                    others.append(' ');
-                others.append(c);
-                if (failedWrites.containsKey(c))
-                {
-                    if (overlappingFailedWrites.length() != 0)
-                        overlappingFailedWrites.append(' ');
-                    overlappingFailedWrites.append(c)
-                                           .append(':')
-                                           .append(failedWrites.get(c).getMessage());
-                }
-            }
-        }
-
-        if (rows.size() > 1)
-        {
-            throw new AssertionError(String.format("Expected 1 row, but found %d; %s c = %d, and (%s) of which (%s) failed to insert", rows.size(), containsC ? "found row with" : "no rows contained", value, others, overlappingFailedWrites));
-        }
-        else if (rows.isEmpty())
-        {
-            throw new AssertionError(String.format("Could not find row with c = %d", value));
-        }
-        else if (rows.size() == 1 && !containsC)
-        {
-            throw new AssertionError(String.format("Single row had c = %d, expected %d", rows.get(0).getInt("c"), value));
-        }
-    }
-}