You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/07/29 22:16:18 UTC

[5/5] cassandra git commit: Materialized Views

Materialized Views

patch by carl; reviewed by jmckenzie for CASSANDRA-6477


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3bdcaa33
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3bdcaa33
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3bdcaa33

Branch: refs/heads/trunk
Commit: 3bdcaa336a6e6a9727c333b433bb9f5d3afc0fb1
Parents: 38ff10d
Author: Carl Yeksigian <ca...@apache.org>
Authored: Wed Jul 29 16:06:08 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed Jul 29 16:15:53 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  10 +
 conf/cassandra.yaml                             |   5 +
 doc/cql3/CQL.textile                            |  55 ++
 pylib/cqlshlib/cql3handling.py                  |  13 +-
 .../org/apache/cassandra/concurrent/Stage.java  |   4 +
 .../cassandra/concurrent/StageManager.java      |   2 +
 .../org/apache/cassandra/config/CFMetaData.java |  61 +-
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |  11 +
 .../config/MaterializedViewDefinition.java      |  93 ++
 src/java/org/apache/cassandra/cql3/Cql.g        | 143 ++-
 .../AlterMaterializedViewStatement.java         |  83 ++
 .../cql3/statements/AlterTableStatement.java    |  85 ++
 .../cassandra/cql3/statements/CFProperties.java |  58 ++
 .../CreateMaterializedViewStatement.java        | 268 ++++++
 .../cql3/statements/CreateTableStatement.java   |  29 +-
 .../cql3/statements/CreateTriggerStatement.java |   5 +-
 .../DropMaterializedViewStatement.java          | 105 ++
 .../cql3/statements/DropTableStatement.java     |  25 +
 .../cql3/statements/ModificationStatement.java  |  23 +
 .../cql3/statements/SelectStatement.java        |   9 +-
 .../cql3/statements/TruncateStatement.java      |   6 +
 .../db/AbstractReadCommandBuilder.java          | 347 +++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  58 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |  77 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   2 +
 .../cassandra/db/MutationVerbHandler.java       |   8 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  98 ++
 src/java/org/apache/cassandra/db/WriteType.java |   3 +-
 .../db/compaction/CompactionManager.java        |  26 +
 .../compaction/CompactionStrategyManager.java   |   2 +-
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../cassandra/db/view/MaterializedView.java     | 691 ++++++++++++++
 .../db/view/MaterializedViewBuilder.java        | 222 +++++
 .../db/view/MaterializedViewManager.java        | 237 +++++
 .../db/view/MaterializedViewUtils.java          |  95 ++
 .../apache/cassandra/db/view/TemporalRow.java   | 414 ++++++++
 .../cassandra/metrics/KeyspaceMetrics.java      |   2 +-
 .../cassandra/metrics/MVWriteMetrics.java       |  42 +
 .../apache/cassandra/net/MessagingService.java  |  15 +-
 .../cassandra/schema/LegacySchemaMigrator.java  |   4 +-
 .../cassandra/schema/MaterializedViews.java     | 149 +++
 .../apache/cassandra/schema/SchemaKeyspace.java | 158 ++-
 .../cassandra/service/AbstractReadExecutor.java |   2 +-
 .../service/AbstractWriteResponseHandler.java   |   2 +-
 .../service/BatchlogResponseHandler.java        | 121 +++
 .../cassandra/service/CassandraDaemon.java      |  18 +
 .../apache/cassandra/service/StorageProxy.java  | 244 ++++-
 .../cassandra/service/StorageService.java       |  24 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  83 +-
 .../cassandra/thrift/CassandraServer.java       |  28 +
 .../cassandra/thrift/ThriftConversion.java      |   5 +-
 .../utils/NativeSSTableLoaderClient.java        |   3 +-
 .../cql3/MaterializedViewLongTest.java          | 194 ++++
 .../cassandra/AbstractReadCommandBuilder.java   | 327 -------
 .../org/apache/cassandra/cql3/CQLTester.java    |   4 +-
 .../cassandra/cql3/MaterializedViewTest.java    | 955 +++++++++++++++++++
 .../apache/cassandra/db/RangeTombstoneTest.java |   1 -
 .../db/view/MaterializedViewUtilsTest.java      | 115 +++
 60 files changed, 5383 insertions(+), 492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 71d5a2d..d8b83bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,7 @@
  * Populate TokenMetadata early during startup (CASSANDRA-9317)
  * Undeprecate cache recentHitRate (CASSANDRA-6591)
  * Add support for selectively varint encoding fields (CASSANDRA-9499, 9865)
+ * Materialized Views (CASSANDRA-6477)
 
 
 2.2.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index ba81d6e..392a581 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -16,8 +16,18 @@ using the provided 'sstableupgrade' tool.
 3.0
 ===
 
+New features
+------------
+   - Materialized Views, which allow for server-side denormalization, is now
+     available. Materialized views provide an alternative to secondary indexes
+     for non-primary key queries, and perform much better for indexing high
+     cardinality columns. 
+     See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views 
+   
 Upgrading
 ---------
+   - New write stages have been added for batchlog and materialized view mutations
+     you can set their size in cassandra.yaml
    - User defined functions are now executed in a sandbox.
      To use UDFs and UDAs, you have to enable them in cassandra.yaml.
    - New SSTable version 'la' with improved bloom-filter false-positive handling

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 7ce36af..0e20095 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -342,6 +342,11 @@ seed_provider:
 concurrent_reads: 32
 concurrent_writes: 32
 concurrent_counter_writes: 32
+concurrent_batchlog_writes: 32
+
+# For materialized view writes, as there is a read involved, so this should
+# be limited by the less of concurrent reads or concurrent writes.
+concurrent_materialized_view_writes: 32
 
 # Maximum memory to use for pooling sstable buffers. Defaults to the smaller
 # of 1/4 of heap or 512MB. This pool is allocated off-heap, so is in addition

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index bc7c385..d5c5684 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -479,6 +479,61 @@ The @DROP INDEX@ statement is used to drop an existing secondary index. The argu
 
 If the index does not exists, the statement will return an error, unless @IF EXISTS@ is used in which case the operation is a no-op.
 
+
+h3(#createMVStmt). CREATE MATERIALIZED VIEW
+
+__Syntax:__
+
+bc(syntax)..
+<create-table-stmt> ::= CREATE MATERIALIZED VIEW ( IF NOT EXISTS )? <viewname> AS
+                          SELECT ( '(' <identifier> ( ',' <identifier> ) * ')' | '*' )
+                          FROM <tablename>
+                          WHERE ( <identifier> IS NOT NULL ( AND <identifier> IS NOT NULL )* )?
+                          PRIMARY KEY '(' <partition-key> ( ',' <identifier> )* ')'
+                          ( WITH <option> ( AND <option>)* )?
+__Sample:__
+
+bc(sample)..
+CREATE MATERIALIZED VIEW monkeySpecies_by_population AS
+    SELECT *
+    FROM monkeySpecies
+    WHERE population IS NOT NULL AND species IS NOT NULL
+    PRIMARY KEY (population, species)
+    WITH comment='Allow query by population instead of species';
+p.
+The @CREATE MATERIALIZED VIEW@ statement creates a new materialized view. Each such view is a set of _rows_ which corresponds to rows which are present in the underlying, or base, table specified in the @SELECT@ statement. A materialized view cannot be directly updated, but updates to the base table will cause corresponding updates in the view.
+
+Attempting to create an already existing materialized view will return an error unless the @IF NOT EXISTS@ option is used. If it is used, the statement will be a no-op if the materialized view already exists.
+
+h4(#createMVWhere). @WHERE <identifier> IS NOT NULL@
+
+The where clause is required to explicitly exclude all primary key columns' null values. Any row which contains null values in the primary key will not be present in the materialized view.
+
+h3(#alterMVStmt). ALTER MATERIALIZED VIEW
+
+__Syntax:__
+
+bc(syntax). <alter-materialized-view-stmt> ::= ALTER MATERIALIZED VIEW <viewname>
+                                                 WITH <option> ( AND <option> )*
+
+p.
+The @ALTER MATERIALIZED VIEW@ statement allows options to be update; these options are the same as <a href="#createTableOptions">@CREATE TABLE@'s options</a>.
+
+
+h3(#dropMVStmt). DROP MATERIALIZED VIEW
+
+__Syntax:__
+
+bc(syntax). <drop-materialized-stmt> ::= DROP MATERIALIZED VIEW ( IF EXISTS )? <tablename>
+
+__Sample:__
+
+bc(sample). DROP MATERIALIZED VIEW monkeySpecies_by_population;
+
+The @DROP MATERIALIZED VIEW@ statement is used to drop an existing materialized view.
+
+If the materialized view does not exists, the statement will return an error, unless @IF EXISTS@ is used in which case the operation is a no-op.
+
 h3(#createTypeStmt). CREATE TYPE
 
 __Syntax:__

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 2767fb1..a46da91 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -44,7 +44,8 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
         'function', 'aggregate', 'keyspace', 'schema', 'columnfamily', 'table', 'index', 'on', 'drop',
         'primary', 'into', 'values', 'date', 'time', 'timestamp', 'ttl', 'alter', 'add', 'type',
         'compact', 'storage', 'order', 'by', 'asc', 'desc', 'clustering',
-        'token', 'writetime', 'map', 'list', 'to', 'custom', 'if', 'not'
+        'token', 'writetime', 'map', 'list', 'to', 'custom', 'if', 'not',
+        'materialized', 'view'
     ))
 
     unreserved_keywords = set((
@@ -244,6 +245,7 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
 <schemaChangeStatement> ::= <createKeyspaceStatement>
                           | <createColumnFamilyStatement>
                           | <createIndexStatement>
+                          | <createMaterializedViewStatement>
                           | <createUserTypeStatement>
                           | <createFunctionStatement>
                           | <createAggregateStatement>
@@ -251,6 +253,7 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
                           | <dropKeyspaceStatement>
                           | <dropColumnFamilyStatement>
                           | <dropIndexStatement>
+                          | <dropMaterializedViewStatement>
                           | <dropUserTypeStatement>
                           | <dropFunctionStatement>
                           | <dropAggregateStatement>
@@ -1090,6 +1093,11 @@ syntax_rules += r'''
                                ( "USING" <stringLiteral> ( "WITH" "OPTIONS" "=" <mapLiteral> )? )?
                          ;
 
+<createMaterializedViewStatement> ::= "CREATE" "MATERIALIZED" "VIEW" ("IF" "NOT" "EXISTS")? <columnFamilyName>?
+                                      "AS" <selectStatement>
+                                      "PRIMARY" "KEY" <pkDef>
+                                    ;
+
 <createUserTypeStatement> ::= "CREATE" "TYPE" ( ks=<nonSystemKeyspaceName> dot="." )? typename=<cfOrKsName> "(" newcol=<cident> <storageType>
                                 ( "," [newcolname]=<cident> <storageType> )*
                             ")"
@@ -1146,6 +1154,9 @@ syntax_rules += r'''
 <dropIndexStatement> ::= "DROP" "INDEX" ("IF" "EXISTS")? idx=<indexName>
                        ;
 
+<dropMaterializedViewStatement> ::= "DROP" "MATERIALIZED" "VIEW" ("IF" "EXISTS")? mv=<columnFamilyName>
+                                  ;
+
 <dropUserTypeStatement> ::= "DROP" "TYPE" ut=<userTypeName>
                           ;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/concurrent/Stage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java
index 4e3700f..e91c515 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -27,6 +27,8 @@ public enum Stage
     READ,
     MUTATION,
     COUNTER_MUTATION,
+    BATCHLOG_MUTATION,
+    MATERIALIZED_VIEW_MUTATION,
     GOSSIP,
     REQUEST_RESPONSE,
     ANTI_ENTROPY,
@@ -60,6 +62,8 @@ public enum Stage
                 return "internal";
             case MUTATION:
             case COUNTER_MUTATION:
+            case BATCHLOG_MUTATION:
+            case MATERIALIZED_VIEW_MUTATION:
             case READ:
             case REQUEST_RESPONSE:
             case READ_REPAIR:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 4f03fd5..ca83829 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -47,6 +47,8 @@ public class StageManager
     {
         stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
+        stages.put(Stage.BATCHLOG_MUTATION, multiThreadedLowSignalStage(Stage.BATCHLOG_MUTATION, getConcurrentBatchlogWriters()));
+        stages.put(Stage.MATERIALIZED_VIEW_MUTATION, multiThreadedLowSignalStage(Stage.MATERIALIZED_VIEW_MUTATION, getConcurrentMaterializedViewWriters()));
         stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
         stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
         stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index fbfb7fa..902b1d2 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.MaterializedViews;
 import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.schema.Triggers;
 import org.apache.cassandra.utils.*;
@@ -63,7 +64,7 @@ public final class CFMetaData
 {
     public enum Flag
     {
-        SUPER, COUNTER, DENSE, COMPOUND
+        SUPER, COUNTER, DENSE, COMPOUND, MATERIALIZEDVIEW
     }
 
     private static final Logger logger = LoggerFactory.getLogger(CFMetaData.class);
@@ -85,6 +86,15 @@ public final class CFMetaData
     // Note that this is the default only for user created tables
     public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
 
+    // Note that this need to come *before* any CFMetaData is defined so before the compile below.
+    private static final Comparator<ColumnDefinition> regularColumnComparator = new Comparator<ColumnDefinition>()
+    {
+        public int compare(ColumnDefinition def1, ColumnDefinition def2)
+        {
+            return ByteBufferUtil.compareUnsigned(def1.name.bytes, def2.name.bytes);
+        }
+    };
+
     public static class SpeculativeRetry
     {
         public enum RetryType
@@ -171,6 +181,7 @@ public final class CFMetaData
     private final boolean isCompound;
     private final boolean isSuper;
     private final boolean isCounter;
+    private final boolean isMaterializedView;
 
     public volatile ClusteringComparator comparator;  // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns
 
@@ -193,6 +204,7 @@ public final class CFMetaData
     private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
     private volatile Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
     private volatile Triggers triggers = Triggers.none();
+    private volatile MaterializedViews materializedViews = MaterializedViews.none();
 
     /*
      * All CQL3 columns definition are stored in the columnMetadata map.
@@ -235,6 +247,7 @@ public final class CFMetaData
     public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
     public CFMetaData droppedColumns(Map<ByteBuffer, DroppedColumn> cols) {droppedColumns = cols; return this;}
     public CFMetaData triggers(Triggers prop) {triggers = prop; return this;}
+    public CFMetaData materializedViews(MaterializedViews prop) {materializedViews = prop; return this;}
 
     private CFMetaData(String keyspace,
                        String name,
@@ -243,6 +256,7 @@ public final class CFMetaData
                        boolean isCounter,
                        boolean isDense,
                        boolean isCompound,
+                       boolean isMaterializedView,
                        List<ColumnDefinition> partitionKeyColumns,
                        List<ColumnDefinition> clusteringColumns,
                        PartitionColumns partitionColumns)
@@ -255,6 +269,7 @@ public final class CFMetaData
         this.isCompound = isCompound;
         this.isSuper = isSuper;
         this.isCounter = isCounter;
+        this.isMaterializedView = isMaterializedView;
 
         EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
         if (isSuper)
@@ -265,6 +280,8 @@ public final class CFMetaData
             flags.add(Flag.DENSE);
         if (isCompound)
             flags.add(Flag.COMPOUND);
+        if (isMaterializedView)
+            flags.add(Flag.MATERIALIZEDVIEW);
         this.flags = Sets.immutableEnumSet(flags);
 
         // A compact table should always have a clustering
@@ -299,6 +316,11 @@ public final class CFMetaData
             this.compactValueColumn = CompactTables.getCompactValueColumn(partitionColumns, isSuper());
     }
 
+    public MaterializedViews getMaterializedViews()
+    {
+        return materializedViews;
+    }
+
     public static CFMetaData create(String ksName,
                                     String name,
                                     UUID cfId,
@@ -306,6 +328,7 @@ public final class CFMetaData
                                     boolean isCompound,
                                     boolean isSuper,
                                     boolean isCounter,
+                                    boolean isMaterializedView,
                                     List<ColumnDefinition> columns)
     {
         List<ColumnDefinition> partitions = new ArrayList<>();
@@ -338,6 +361,7 @@ public final class CFMetaData
                               isCounter,
                               isDense,
                               isCompound,
+                              isMaterializedView,
                               partitions,
                               clusterings,
                               builder.build());
@@ -439,6 +463,7 @@ public final class CFMetaData
                                        isCounter(),
                                        isDense(),
                                        isCompound(),
+                                       isMaterializedView(),
                                        copy(partitionKeyColumns),
                                        copy(clusteringColumns),
                                        copy(partitionColumns)),
@@ -481,7 +506,8 @@ public final class CFMetaData
                       .speculativeRetry(oldCFMD.speculativeRetry)
                       .memtableFlushPeriod(oldCFMD.memtableFlushPeriod)
                       .droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
-                      .triggers(oldCFMD.triggers);
+                      .triggers(oldCFMD.triggers)
+                      .materializedViews(oldCFMD.materializedViews);
     }
 
     /**
@@ -753,7 +779,8 @@ public final class CFMetaData
             && Objects.equal(maxIndexInterval, other.maxIndexInterval)
             && Objects.equal(speculativeRetry, other.speculativeRetry)
             && Objects.equal(droppedColumns, other.droppedColumns)
-            && Objects.equal(triggers, other.triggers);
+            && Objects.equal(triggers, other.triggers)
+            && Objects.equal(materializedViews, other.materializedViews);
     }
 
     @Override
@@ -785,6 +812,7 @@ public final class CFMetaData
             .append(speculativeRetry)
             .append(droppedColumns)
             .append(triggers)
+            .append(materializedViews)
             .toHashCode();
     }
 
@@ -846,6 +874,7 @@ public final class CFMetaData
         compressionParameters = cfm.compressionParameters;
 
         triggers = cfm.triggers;
+        materializedViews = cfm.materializedViews;
 
         logger.debug("application result is {}", this);
 
@@ -1262,6 +1291,14 @@ public final class CFMetaData
         return false;
     }
 
+    public boolean hasComplexColumns()
+    {
+        for (ColumnDefinition def : partitionColumns())
+            if (def.isComplex())
+                return true;
+        return false;
+    }
+
     public boolean hasDroppedCollectionColumns()
     {
         for (DroppedColumn def : getDroppedColumns().values())
@@ -1293,6 +1330,11 @@ public final class CFMetaData
         return isCompound;
     }
 
+    public boolean isMaterializedView()
+    {
+        return isMaterializedView;
+    }
+
     public Serializers serializers()
     {
         return serializers;
@@ -1337,6 +1379,7 @@ public final class CFMetaData
             .append("speculativeRetry", speculativeRetry)
             .append("droppedColumns", droppedColumns)
             .append("triggers", triggers)
+            .append("materializedViews", materializedViews)
             .toString();
     }
 
@@ -1348,6 +1391,7 @@ public final class CFMetaData
         private final boolean isCompound;
         private final boolean isSuper;
         private final boolean isCounter;
+        private final boolean isMaterializedView;
 
         private UUID tableId;
 
@@ -1356,7 +1400,7 @@ public final class CFMetaData
         private final List<Pair<ColumnIdentifier, AbstractType>> staticColumns = new ArrayList<>();
         private final List<Pair<ColumnIdentifier, AbstractType>> regularColumns = new ArrayList<>();
 
-        private Builder(String keyspace, String table, boolean isDense, boolean isCompound, boolean isSuper, boolean isCounter)
+        private Builder(String keyspace, String table, boolean isDense, boolean isCompound, boolean isSuper, boolean isCounter, boolean isMaterializedView)
         {
             this.keyspace = keyspace;
             this.table = table;
@@ -1364,6 +1408,7 @@ public final class CFMetaData
             this.isCompound = isCompound;
             this.isSuper = isSuper;
             this.isCounter = isCounter;
+            this.isMaterializedView = isMaterializedView;
         }
 
         public static Builder create(String keyspace, String table)
@@ -1378,7 +1423,12 @@ public final class CFMetaData
 
         public static Builder create(String keyspace, String table, boolean isDense, boolean isCompound, boolean isSuper, boolean isCounter)
         {
-            return new Builder(keyspace, table, isDense, isCompound, isSuper, isCounter);
+            return new Builder(keyspace, table, isDense, isCompound, isSuper, isCounter, false);
+        }
+
+        public static Builder createView(String keyspace, String table)
+        {
+            return new Builder(keyspace, table, false, true, false, false, true);
         }
 
         public static Builder createDense(String keyspace, String table, boolean isCompound, boolean isCounter)
@@ -1501,6 +1551,7 @@ public final class CFMetaData
                                   isCounter,
                                   isDense,
                                   isCompound,
+                                  isMaterializedView,
                                   partitions,
                                   clusterings,
                                   builder.build());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64b23dd..8d22d82 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -92,6 +92,8 @@ public class Config
     public Integer concurrent_reads = 32;
     public Integer concurrent_writes = 32;
     public Integer concurrent_counter_writes = 32;
+    public Integer concurrent_batchlog_writes = 32;
+    public Integer concurrent_materialized_view_writes = 32;
 
     @Deprecated
     public Integer concurrent_replicates = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index f1369d1..d32af4d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -990,6 +990,8 @@ public class DatabaseDescriptor
             case PAXOS_COMMIT:
             case PAXOS_PREPARE:
             case PAXOS_PROPOSE:
+            case BATCHLOG_MUTATION:
+            case MATERIALIZED_VIEW_MUTATION:
                 return getWriteRpcTimeout();
             case COUNTER_MUTATION:
                 return getCounterWriteRpcTimeout();
@@ -1036,6 +1038,15 @@ public class DatabaseDescriptor
         return conf.concurrent_counter_writes;
     }
 
+    public static int getConcurrentBatchlogWriters()
+    {
+        return conf.concurrent_batchlog_writes;
+    }
+    public static int getConcurrentMaterializedViewWriters()
+    {
+        return conf.concurrent_materialized_view_writes;
+    }
+
     public static int getFlushWriters()
     {
             return conf.memtable_flush_writers;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/config/MaterializedViewDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/MaterializedViewDefinition.java b/src/java/org/apache/cassandra/config/MaterializedViewDefinition.java
new file mode 100644
index 0000000..90fa35c
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/MaterializedViewDefinition.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+
+public class MaterializedViewDefinition
+{
+    public final String baseCfName;
+    public final String viewName;
+    // The order of partititon columns and clustering columns is important, so we cannot switch these two to sets
+    public final List<ColumnIdentifier> partitionColumns;
+    public final List<ColumnIdentifier> clusteringColumns;
+    public final Set<ColumnIdentifier> included;
+    public final boolean includeAll;
+
+    public MaterializedViewDefinition(MaterializedViewDefinition def)
+    {
+        this(def.baseCfName, def.viewName, new ArrayList<>(def.partitionColumns), new ArrayList<>(def.clusteringColumns), new HashSet<>(def.included));
+    }
+
+    /**
+     * @param baseCfName        Name of the column family from which this view is based
+     * @param viewName          Name of the view
+     * @param partitionColumns  List of all of the partition columns, in the order they are defined
+     * @param clusteringColumns List of all of the clustering columns, in the order they are defined
+     * @param included
+     */
+    public MaterializedViewDefinition(String baseCfName, String viewName, List<ColumnIdentifier> partitionColumns, List<ColumnIdentifier> clusteringColumns, Set<ColumnIdentifier> included)
+    {
+        assert partitionColumns != null && !partitionColumns.isEmpty();
+        assert included != null;
+        this.baseCfName = baseCfName;
+        this.viewName = viewName;
+        this.partitionColumns = partitionColumns;
+        this.clusteringColumns = clusteringColumns;
+        this.includeAll = included.isEmpty();
+        this.included = included;
+    }
+
+    /**
+     * @return true if the view specified by this definition will include the column, false otherwise
+     */
+    public boolean includes(ColumnIdentifier column)
+    {
+        return includeAll
+               || partitionColumns.contains(column)
+               || clusteringColumns.contains(column)
+               || included.contains(column);
+    }
+
+    /**
+     * Replace the column {@param from} with {@param to} in this materialized view definition's partition,
+     * clustering, or included columns.
+     */
+    public void renameColumn(ColumnIdentifier from, ColumnIdentifier to)
+    {
+        if (!includeAll && included.contains(from))
+        {
+            included.remove(from);
+            included.add(to);
+        }
+
+        int partitionIndex = partitionColumns.indexOf(from);
+        if (partitionIndex >= 0)
+            partitionColumns.set(partitionIndex, to);
+
+        int clusteringIndex = clusteringColumns.indexOf(from);
+        if (clusteringIndex >= 0)
+            clusteringColumns.set(clusteringIndex, to);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index df4623d..0eadaee 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -233,43 +233,46 @@ query returns [ParsedStatement stmnt]
 
 cqlStatement returns [ParsedStatement stmt]
     @after{ if (stmt != null) stmt.setBoundVariables(bindVariables); }
-    : st1= selectStatement             { $stmt = st1; }
-    | st2= insertStatement             { $stmt = st2; }
-    | st3= updateStatement             { $stmt = st3; }
-    | st4= batchStatement              { $stmt = st4; }
-    | st5= deleteStatement             { $stmt = st5; }
-    | st6= useStatement                { $stmt = st6; }
-    | st7= truncateStatement           { $stmt = st7; }
-    | st8= createKeyspaceStatement     { $stmt = st8; }
-    | st9= createTableStatement        { $stmt = st9; }
-    | st10=createIndexStatement        { $stmt = st10; }
-    | st11=dropKeyspaceStatement       { $stmt = st11; }
-    | st12=dropTableStatement          { $stmt = st12; }
-    | st13=dropIndexStatement          { $stmt = st13; }
-    | st14=alterTableStatement         { $stmt = st14; }
-    | st15=alterKeyspaceStatement      { $stmt = st15; }
-    | st16=grantPermissionsStatement   { $stmt = st16; }
-    | st17=revokePermissionsStatement  { $stmt = st17; }
-    | st18=listPermissionsStatement    { $stmt = st18; }
-    | st19=createUserStatement         { $stmt = st19; }
-    | st20=alterUserStatement          { $stmt = st20; }
-    | st21=dropUserStatement           { $stmt = st21; }
-    | st22=listUsersStatement          { $stmt = st22; }
-    | st23=createTriggerStatement      { $stmt = st23; }
-    | st24=dropTriggerStatement        { $stmt = st24; }
-    | st25=createTypeStatement         { $stmt = st25; }
-    | st26=alterTypeStatement          { $stmt = st26; }
-    | st27=dropTypeStatement           { $stmt = st27; }
-    | st28=createFunctionStatement     { $stmt = st28; }
-    | st29=dropFunctionStatement       { $stmt = st29; }
-    | st30=createAggregateStatement    { $stmt = st30; }
-    | st31=dropAggregateStatement      { $stmt = st31; }
-    | st32=createRoleStatement         { $stmt = st32; }
-    | st33=alterRoleStatement          { $stmt = st33; }
-    | st34=dropRoleStatement           { $stmt = st34; }
-    | st35=listRolesStatement          { $stmt = st35; }
-    | st36=grantRoleStatement          { $stmt = st36; }
-    | st37=revokeRoleStatement         { $stmt = st37; }
+    : st1= selectStatement                 { $stmt = st1; }
+    | st2= insertStatement                 { $stmt = st2; }
+    | st3= updateStatement                 { $stmt = st3; }
+    | st4= batchStatement                  { $stmt = st4; }
+    | st5= deleteStatement                 { $stmt = st5; }
+    | st6= useStatement                    { $stmt = st6; }
+    | st7= truncateStatement               { $stmt = st7; }
+    | st8= createKeyspaceStatement         { $stmt = st8; }
+    | st9= createTableStatement            { $stmt = st9; }
+    | st10=createIndexStatement            { $stmt = st10; }
+    | st11=dropKeyspaceStatement           { $stmt = st11; }
+    | st12=dropTableStatement              { $stmt = st12; }
+    | st13=dropIndexStatement              { $stmt = st13; }
+    | st14=alterTableStatement             { $stmt = st14; }
+    | st15=alterKeyspaceStatement          { $stmt = st15; }
+    | st16=grantPermissionsStatement       { $stmt = st16; }
+    | st17=revokePermissionsStatement      { $stmt = st17; }
+    | st18=listPermissionsStatement        { $stmt = st18; }
+    | st19=createUserStatement             { $stmt = st19; }
+    | st20=alterUserStatement              { $stmt = st20; }
+    | st21=dropUserStatement               { $stmt = st21; }
+    | st22=listUsersStatement              { $stmt = st22; }
+    | st23=createTriggerStatement          { $stmt = st23; }
+    | st24=dropTriggerStatement            { $stmt = st24; }
+    | st25=createTypeStatement             { $stmt = st25; }
+    | st26=alterTypeStatement              { $stmt = st26; }
+    | st27=dropTypeStatement               { $stmt = st27; }
+    | st28=createFunctionStatement         { $stmt = st28; }
+    | st29=dropFunctionStatement           { $stmt = st29; }
+    | st30=createAggregateStatement        { $stmt = st30; }
+    | st31=dropAggregateStatement          { $stmt = st31; }
+    | st32=createRoleStatement             { $stmt = st32; }
+    | st33=alterRoleStatement              { $stmt = st33; }
+    | st34=dropRoleStatement               { $stmt = st34; }
+    | st35=listRolesStatement              { $stmt = st35; }
+    | st36=grantRoleStatement              { $stmt = st36; }
+    | st37=revokeRoleStatement             { $stmt = st37; }
+    | st38=createMaterializedViewStatement { $stmt = st38; }
+    | st39=dropMaterializedViewStatement   { $stmt = st39; }
+    | st40=alterMaterializedViewStatement  { $stmt = st40; }
     ;
 
 /*
@@ -665,7 +668,7 @@ createTableStatement returns [CreateTableStatement.RawStatement expr]
 
 cfamDefinition[CreateTableStatement.RawStatement expr]
     : '(' cfamColumns[expr] ( ',' cfamColumns[expr]? )* ')'
-      ( K_WITH cfamProperty[expr] ( K_AND cfamProperty[expr] )*)?
+      ( K_WITH cfamProperty[expr.properties] ( K_AND cfamProperty[expr.properties] )*)?
     ;
 
 cfamColumns[CreateTableStatement.RawStatement expr]
@@ -679,15 +682,15 @@ pkDef[CreateTableStatement.RawStatement expr]
     | '(' { List<ColumnIdentifier> l = new ArrayList<ColumnIdentifier>(); } k1=ident { l.add(k1); } ( ',' kn=ident { l.add(kn); } )* ')' { $expr.addKeyAliases(l); }
     ;
 
-cfamProperty[CreateTableStatement.RawStatement expr]
-    : property[expr.properties]
-    | K_COMPACT K_STORAGE { $expr.setCompactStorage(); }
-    | K_CLUSTERING K_ORDER K_BY '(' cfamOrdering[expr] (',' cfamOrdering[expr])* ')'
+cfamProperty[CFProperties props]
+    : property[props.properties]
+    | K_COMPACT K_STORAGE { $props.setCompactStorage(); }
+    | K_CLUSTERING K_ORDER K_BY '(' cfamOrdering[props] (',' cfamOrdering[props])* ')'
     ;
 
-cfamOrdering[CreateTableStatement.RawStatement expr]
+cfamOrdering[CFProperties props]
     @init{ boolean reversed=false; }
-    : k=ident (K_ASC | K_DESC { reversed=true;} ) { $expr.setOrdering(k, reversed); }
+    : k=ident (K_ASC | K_DESC { reversed=true;} ) { $props.setOrdering(k, reversed); }
     ;
 
 
@@ -734,6 +737,34 @@ indexIdent returns [IndexTarget.Raw id]
     | K_FULL '(' c=cident ')'    { $id = IndexTarget.Raw.fullCollection(c); }
     ;
 
+/**
+ * CREATE MATERIALIZED VIEW <viewName> AS
+ *  SELECT <columns>
+ *  FROM <CF>
+ *  WHERE <pkColumns> IS NOT NULL
+ *  PRIMARY KEY (<pkColumns>)
+ *  WITH <property> = <value> AND ...;
+ */
+createMaterializedViewStatement returns [CreateMaterializedViewStatement expr]
+    @init {
+        boolean ifNotExists = false;
+        List<ColumnIdentifier.Raw> partitionKeys = new ArrayList<>();
+        List<ColumnIdentifier.Raw> compositeKeys = new ArrayList<>();
+    }
+    : K_CREATE K_MATERIALIZED K_VIEW (K_IF K_NOT K_EXISTS { ifNotExists = true; })? cf=columnFamilyName K_AS
+        K_SELECT sclause=selectClause K_FROM basecf=columnFamilyName
+        (K_WHERE wclause=mvWhereClause)?
+        K_PRIMARY K_KEY (
+        '(' '(' k1=cident { partitionKeys.add(k1); } ( ',' kn=cident { partitionKeys.add(kn); } )* ')' ( ',' c1=cident { compositeKeys.add(c1); } )* ')'
+    |   '(' k1=cident { partitionKeys.add(k1); } ( ',' cn=cident { compositeKeys.add(cn); } )* ')'
+        )
+        { $expr = new CreateMaterializedViewStatement(cf, basecf, sclause, wclause, partitionKeys, compositeKeys, ifNotExists); }
+        ( K_WITH cfamProperty[expr.properties] ( K_AND cfamProperty[expr.properties] )*)?
+    ;
+
+mvWhereClause returns [List<ColumnIdentifier.Raw> expr]
+    : t1=cident { $expr = new ArrayList<ColumnIdentifier.Raw>(); $expr.add(t1); } K_IS K_NOT K_NULL (K_AND tN=cident { $expr.add(tN); } K_IS K_NOT K_NULL)*
+    ;
 
 /**
  * CREATE TRIGGER triggerName ON columnFamily USING 'triggerClass';
@@ -794,6 +825,18 @@ alterTableStatement returns [AlterTableStatement expr]
     }
     ;
 
+alterMaterializedViewStatement returns [AlterMaterializedViewStatement expr]
+    @init {
+        CFPropDefs props = new CFPropDefs();
+    }
+    : K_ALTER K_MATERIALIZED K_VIEW name=columnFamilyName
+          K_WITH properties[props]
+    {
+        $expr = new AlterMaterializedViewStatement(name, props);
+    }
+    ;
+    
+
 /**
  * ALTER TYPE <name> ALTER <field> TYPE <newtype>;
  * ALTER TYPE <name> ADD <field> <newtype>;
@@ -846,6 +889,15 @@ dropIndexStatement returns [DropIndexStatement expr]
     ;
 
 /**
+ * DROP MATERIALIZED VIEW [IF EXISTS] <view_name>
+ */
+dropMaterializedViewStatement returns [DropMaterializedViewStatement expr]
+    @init { boolean ifExists = false; }
+    : K_DROP K_MATERIALIZED K_VIEW (K_IF K_EXISTS { ifExists = true; } )? cf=columnFamilyName
+      { $expr = new DropMaterializedViewStatement(cf, ifExists); }
+    ;
+
+/**
   * TRUNCATE <CF>;
   */
 truncateStatement returns [TruncateStatement stmt]
@@ -1599,6 +1651,8 @@ K_KEYSPACE:    ( K E Y S P A C E
 K_KEYSPACES:   K E Y S P A C E S;
 K_COLUMNFAMILY:( C O L U M N F A M I L Y
                  | T A B L E );
+K_MATERIALIZED:M A T E R I A L I Z E D;
+K_VIEW:        V I E W;
 K_INDEX:       I N D E X;
 K_CUSTOM:      C U S T O M;
 K_ON:          O N;
@@ -1622,6 +1676,7 @@ K_DESC:        D E S C;
 K_ALLOW:       A L L O W;
 K_FILTERING:   F I L T E R I N G;
 K_IF:          I F;
+K_IS:          I S;
 K_CONTAINS:    C O N T A I N S;
 
 K_GRANT:       G R A N T;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
new file mode 100644
index 0000000..d2b1d13
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.Event;
+
+import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
+
+public class AlterMaterializedViewStatement extends SchemaAlteringStatement
+{
+    private final CFPropDefs cfProps;
+
+    public AlterMaterializedViewStatement(CFName name,
+                                          CFPropDefs cfProps)
+    {
+        super(name);
+        this.cfProps = cfProps;
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.ALTER);
+    }
+
+    public void validate(ClientState state)
+    {
+        // validated in announceMigration()
+    }
+
+    public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
+    {
+        CFMetaData meta = validateColumnFamily(keyspace(), columnFamily());
+        if (!meta.isMaterializedView())
+            throw new InvalidRequestException("Cannot use ALTER MATERIALIZED VIEW on Table");
+
+        CFMetaData cfm = meta.copy();
+
+
+        if (cfProps == null)
+            throw new InvalidRequestException("ALTER MATERIALIZED VIEW WITH invoked, but no parameters found");
+
+        cfProps.validate();
+
+        cfProps.applyToCFMetadata(cfm);
+        MigrationManager.announceColumnFamilyUpdate(cfm, false, isLocalOnly);
+        return true;
+    }
+
+    public String toString()
+    {
+        return String.format("AlterMaterializedViewStatement(name=%s)", cfName);
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index c6ac7cc..1495f2d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -17,11 +17,15 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
@@ -75,6 +79,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
     {
         CFMetaData meta = validateColumnFamily(keyspace(), columnFamily());
+        if (meta.isMaterializedView())
+            throw new InvalidRequestException("Cannot use ALTER TABLE on Materialized View");
+
         CFMetaData cfm = meta.copy();
 
         CQL3Type validator = this.validator == null ? null : this.validator.prepare(keyspace());
@@ -86,6 +93,8 @@ public class AlterTableStatement extends SchemaAlteringStatement
             def = cfm.getColumnDefinition(columnName);
         }
 
+        List<CFMetaData> materializedViewUpdates = null;
+
         switch (oType)
         {
             case ADD:
@@ -144,6 +153,22 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 cfm.addColumnDefinition(isStatic
                                         ? ColumnDefinition.staticDef(cfm, columnName.bytes, type)
                                         : ColumnDefinition.regularDef(cfm, columnName.bytes, type));
+
+                // Adding a column to a table which has an include all materialized view requires the column to be added
+                // to the materialized view as well
+                for (MaterializedViewDefinition mv : cfm.getMaterializedViews())
+                {
+                    if (mv.includeAll)
+                    {
+                        CFMetaData indexCfm = Schema.instance.getCFMetaData(keyspace(), mv.viewName).copy();
+                        indexCfm.addColumnDefinition(isStatic
+                                                     ? ColumnDefinition.staticDef(indexCfm, columnName.bytes, type)
+                                                     : ColumnDefinition.regularDef(indexCfm, columnName.bytes, type));
+                        if (materializedViewUpdates == null)
+                            materializedViewUpdates = new ArrayList<>();
+                        materializedViewUpdates.add(indexCfm);
+                    }
+                }
                 break;
 
             case ALTER:
@@ -193,6 +218,19 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 }
                 // In any case, we update the column definition
                 cfm.addOrReplaceColumnDefinition(def.withNewType(validatorType));
+
+                // We have to alter the schema of the materialized view table as well; it doesn't affect the definition however
+                for (MaterializedViewDefinition mv : cfm.getMaterializedViews())
+                {
+                    if (!mv.includes(columnName)) continue;
+                    // We have to use the pre-adjusted CFM, otherwise we can't resolve the Index
+                    CFMetaData indexCfm = Schema.instance.getCFMetaData(keyspace(), mv.viewName).copy();
+                    indexCfm.addOrReplaceColumnDefinition(def.withNewType(validatorType));
+
+                    if (materializedViewUpdates == null)
+                        materializedViewUpdates = new ArrayList<>();
+                    materializedViewUpdates.add(indexCfm);
+                }
                 break;
 
             case DROP:
@@ -223,6 +261,27 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         cfm.recordColumnDrop(toDelete);
                         break;
                 }
+
+                // If a column is dropped which is the target of a materialized view,
+                // then we need to drop the view.
+                // If a column is dropped which was selected into a materialized view,
+                // we need to drop that column from the included materialzied view table
+                // and definition.
+                boolean rejectAlter = false;
+                StringBuilder builder = new StringBuilder();
+                for (MaterializedViewDefinition mv : cfm.getMaterializedViews())
+                {
+                    if (!mv.includes(columnName)) continue;
+                    if (rejectAlter)
+                        builder.append(',');
+                    rejectAlter = true;
+                    builder.append(mv.viewName);
+                }
+                if (rejectAlter)
+                    throw new InvalidRequestException(String.format("Cannot drop column %s, depended on by materialized views (%s.{%s})",
+                                                                    columnName.toString(),
+                                                                    keyspace(),
+                                                                    builder.toString()));
                 break;
             case OPTS:
                 if (cfProps == null)
@@ -241,10 +300,36 @@ public class AlterTableStatement extends SchemaAlteringStatement
                     ColumnIdentifier from = entry.getKey().prepare(cfm);
                     ColumnIdentifier to = entry.getValue().prepare(cfm);
                     cfm.renameColumn(from, to);
+
+                    // If the materialized view includes a renamed column, it must be renamed in the index table and the definition.
+                    for (MaterializedViewDefinition mv : cfm.getMaterializedViews())
+                    {
+                        if (!mv.includes(from)) continue;
+
+                        CFMetaData indexCfm = Schema.instance.getCFMetaData(keyspace(), mv.viewName).copy();
+                        ColumnIdentifier indexFrom = entry.getKey().prepare(indexCfm);
+                        ColumnIdentifier indexTo = entry.getValue().prepare(indexCfm);
+                        indexCfm.renameColumn(indexFrom, indexTo);
+
+                        MaterializedViewDefinition mvCopy = new MaterializedViewDefinition(mv);
+                        mvCopy.renameColumn(from, to);
+
+                        cfm.materializedViews(cfm.getMaterializedViews().replace(mvCopy));
+
+                        if (materializedViewUpdates == null)
+                            materializedViewUpdates = new ArrayList<>();
+                        materializedViewUpdates.add(indexCfm);
+                    }
                 }
                 break;
         }
 
+        if (materializedViewUpdates != null)
+        {
+            for (CFMetaData mvUpdates : materializedViewUpdates)
+                MigrationManager.announceColumnFamilyUpdate(mvUpdates, false, isLocalOnly);
+        }
+
         MigrationManager.announceColumnFamilyUpdate(cfm, false, isLocalOnly);
         return true;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/CFProperties.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFProperties.java b/src/java/org/apache/cassandra/cql3/statements/CFProperties.java
new file mode 100644
index 0000000..50ec360
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CFProperties.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ReversedType;
+
+public class CFProperties
+{
+    public final CFPropDefs properties = new CFPropDefs();
+    final Map<ColumnIdentifier, Boolean> definedOrdering = new LinkedHashMap<>(); // Insertion ordering is important
+    boolean useCompactStorage = false;
+
+    public void validate()
+    {
+        properties.validate();
+    }
+
+    public void setOrdering(ColumnIdentifier alias, boolean reversed)
+    {
+        definedOrdering.put(alias, reversed);
+    }
+
+    public void setCompactStorage()
+    {
+        useCompactStorage = true;
+    }
+
+    public AbstractType getReversableType(ColumnIdentifier targetIdentifier, AbstractType<?> type)
+    {
+        if (!definedOrdering.containsKey(targetIdentifier))
+        {
+            return type;
+        }
+        return definedOrdering.get(targetIdentifier) ? ReversedType.getInstance(type) : type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java
new file mode 100644
index 0000000..380b068
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateMaterializedViewStatement.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.selection.RawSelector;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.db.view.MaterializedView;
+import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.Event;
+
+public class CreateMaterializedViewStatement extends SchemaAlteringStatement
+{
+    private final CFName baseName;
+    private final List<RawSelector> selectClause;
+    private final List<ColumnIdentifier.Raw> notNullWhereClause;
+    private final List<ColumnIdentifier.Raw> partitionKeys;
+    private final List<ColumnIdentifier.Raw> clusteringKeys;
+    public final CFProperties properties = new CFProperties();
+    private final boolean ifNotExists;
+
+    public CreateMaterializedViewStatement(CFName viewName,
+                                           CFName baseName,
+                                           List<RawSelector> selectClause,
+                                           List<ColumnIdentifier.Raw> notNullWhereClause,
+                                           List<ColumnIdentifier.Raw> partitionKeys,
+                                           List<ColumnIdentifier.Raw> clusteringKeys,
+                                           boolean ifNotExists)
+    {
+        super(viewName);
+        this.baseName = baseName;
+        this.selectClause = selectClause;
+        this.notNullWhereClause = notNullWhereClause;
+        this.partitionKeys = partitionKeys;
+        this.clusteringKeys = clusteringKeys;
+        this.ifNotExists = ifNotExists;
+    }
+
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        if (!baseName.hasKeyspace())
+            baseName.setKeyspace(keyspace(), true);
+        state.hasKeyspaceAccess(keyspace(), Permission.CREATE);
+    }
+
+    public void validate(ClientState state) throws RequestValidationException
+    {
+        // We do validation in announceMigration to reduce doubling up of work
+    }
+
+    public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
+    {
+        // We need to make sure that:
+        //  - primary key includes all columns in base table's primary key
+        //  - make sure that the select statement does not have anything other than columns
+        //    and their names match the base table's names
+        //  - make sure that primary key does not include any collections
+        //  - make sure there is no where clause in the select statement
+        //  - make sure there is not currently a table or view
+
+        properties.validate();
+
+        if (properties.useCompactStorage)
+            throw new InvalidRequestException("Cannot use 'COMPACT STORAGE' when defining a materialized view");
+
+        // We enforce the keyspace because if the RF is different, the logic to wait for a
+        // specific replica would break
+        if (!baseName.getKeyspace().equals(keyspace()))
+            throw new InvalidRequestException("Cannot create a materialized view on a table in a separate keyspace");
+
+        CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily());
+        if (cfm.isCounter())
+            throw new InvalidRequestException("Materialized views are not supported on counter tables");
+        if (cfm.isMaterializedView())
+            throw new InvalidRequestException("Materialized views cannot be created against other materialized views");
+
+        Set<ColumnIdentifier> included = new HashSet<>();
+        for (RawSelector selector : selectClause)
+        {
+            Selectable.Raw selectable = selector.selectable;
+            if (selectable instanceof Selectable.WithFieldSelection.Raw)
+                throw new InvalidRequestException("Cannot select out a part of type when defining a materialized view");
+            if (selectable instanceof Selectable.WithFunction.Raw)
+                throw new InvalidRequestException("Cannot use function when defining a materialized view");
+            if (selectable instanceof Selectable.WritetimeOrTTL.Raw)
+                throw new InvalidRequestException("Cannot use function when defining a materialized view");
+            ColumnIdentifier identifier = (ColumnIdentifier) selectable.prepare(cfm);
+            if (selector.alias != null)
+                throw new InvalidRequestException(String.format("Cannot alias column '%s' as '%s' when defining a materialized view", identifier.toString(), selector.alias.toString()));
+
+            ColumnDefinition cdef = cfm.getColumnDefinition(identifier);
+
+            if (cdef == null)
+                throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+            if (cdef.isStatic())
+                ClientWarn.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier));
+            else
+                included.add(identifier);
+        }
+
+        Set<ColumnIdentifier.Raw> targetPrimaryKeys = new HashSet<>();
+        for (ColumnIdentifier.Raw identifier : Iterables.concat(partitionKeys, clusteringKeys))
+        {
+            if (!targetPrimaryKeys.add(identifier))
+                throw new InvalidRequestException("Duplicate entry found in PRIMARY KEY: "+identifier);
+
+            ColumnDefinition cdef = cfm.getColumnDefinition(identifier.prepare(cfm));
+
+            if (cdef == null)
+                throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+            if (cfm.getColumnDefinition(identifier.prepare(cfm)).type.isMultiCell())
+                throw new InvalidRequestException(String.format("Cannot use MultiCell column '%s' in PRIMARY KEY of materialized view", identifier));
+
+            if (cdef.isStatic())
+                throw new InvalidRequestException(String.format("Cannot use Static column '%s' in PRIMARY KEY of materialized view", identifier));
+        }
+
+        Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();
+        for (ColumnDefinition definition : Iterables.concat(cfm.partitionKeyColumns(), cfm.clusteringColumns()))
+            basePrimaryKeyCols.add(definition.name);
+
+        List<ColumnIdentifier> targetClusteringColumns = new ArrayList<>();
+        List<ColumnIdentifier> targetPartitionKeys = new ArrayList<>();
+        Set<ColumnIdentifier> notNullColumns = new HashSet<>();
+        if (notNullWhereClause != null)
+        {
+            for (ColumnIdentifier.Raw raw : notNullWhereClause)
+            {
+                notNullColumns.add(raw.prepare(cfm));
+            }
+        }
+
+        // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used
+        boolean hasNonPKColumn = false;
+        for (ColumnIdentifier.Raw raw : partitionKeys)
+        {
+            hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, notNullColumns);
+        }
+
+        for (ColumnIdentifier.Raw raw : clusteringKeys)
+        {
+            hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, notNullColumns);
+        }
+
+        // We need to include all of the primary key colums from the base table in order to make sure that we do not
+        // overwrite values in the materialized view. We cannot support "collapsing" the base table into a smaller
+        // number of rows in the view because if we need to generate a tombstone, we have no way of knowing which value
+        // is currently being used in the view and whether or not to generate a tombstone.
+        // In order to not surprise our users, we require that they include all of the columns. We provide them with
+        // a list of all of the columns left to include.
+        boolean missingClusteringColumns = false;
+        StringBuilder columnNames = new StringBuilder();
+        for (ColumnDefinition def : cfm.allColumns())
+        {
+            if (!def.isPrimaryKeyColumn()) continue;
+
+            ColumnIdentifier identifier = def.name;
+            if (!targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier))
+            {
+                if (missingClusteringColumns)
+                    columnNames.append(',');
+                else
+                    missingClusteringColumns = true;
+                columnNames.append(identifier);
+            }
+        }
+        if (missingClusteringColumns)
+            throw new InvalidRequestException(String.format("Cannot create Materialized View %s without primary key columns from base %s (%s)",
+                                                            columnFamily(), baseName.getColumnFamily(), columnNames.toString()));
+
+        if (targetPartitionKeys.isEmpty())
+            throw new InvalidRequestException("Must select at least a column for a Materialized View");
+
+        if (targetClusteringColumns.isEmpty())
+            throw new InvalidRequestException("No columns are defined for Materialized View other than primary key");
+
+        MaterializedViewDefinition definition = new MaterializedViewDefinition(baseName.getColumnFamily(),
+                                                                               columnFamily(),
+                                                                               targetPartitionKeys,
+                                                                               targetClusteringColumns,
+                                                                               included);
+
+        CFMetaData indexCf = MaterializedView.getCFMetaData(definition, cfm, properties);
+        try
+        {
+            MigrationManager.announceNewColumnFamily(indexCf, isLocalOnly);
+        }
+        catch (AlreadyExistsException e)
+        {
+            if (ifNotExists)
+                return false;
+            throw e;
+        }
+
+        CFMetaData newCfm = cfm.copy();
+        newCfm.materializedViews(newCfm.getMaterializedViews().with(definition));
+
+        MigrationManager.announceColumnFamilyUpdate(newCfm, false, isLocalOnly);
+
+        return true;
+    }
+
+    private static boolean getColumnIdentifier(CFMetaData cfm,
+                                               Set<ColumnIdentifier> basePK,
+                                               boolean hasNonPKColumn,
+                                               ColumnIdentifier.Raw raw,
+                                               List<ColumnIdentifier> columns,
+                                               Set<ColumnIdentifier> allowedPKColumns)
+    {
+        ColumnIdentifier identifier = raw.prepare(cfm);
+
+        boolean isPk = basePK.contains(identifier);
+        if (!isPk && hasNonPKColumn)
+        {
+            throw new InvalidRequestException(String.format("Cannot include more than one non-primary key column '%s' in materialized view partition key", identifier));
+        }
+        if (!allowedPKColumns.contains(identifier))
+        {
+            throw new InvalidRequestException(String.format("Primary key column '%s' is required to be filtered by 'IS NOT NULL'", identifier));
+        }
+
+        columns.add(identifier);
+        return !isPk;
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 8cd6bc8..6e28f8c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -188,14 +188,12 @@ public class CreateTableStatement extends SchemaAlteringStatement
     public static class RawStatement extends CFStatement
     {
         private final Map<ColumnIdentifier, CQL3Type.Raw> definitions = new HashMap<>();
-        public final CFPropDefs properties = new CFPropDefs();
+        public final CFProperties properties = new CFProperties();
 
         private final List<List<ColumnIdentifier>> keyAliases = new ArrayList<List<ColumnIdentifier>>();
         private final List<ColumnIdentifier> columnAliases = new ArrayList<ColumnIdentifier>();
-        private final Map<ColumnIdentifier, Boolean> definedOrdering = new LinkedHashMap<ColumnIdentifier, Boolean>(); // Insertion ordering is important
         private final Set<ColumnIdentifier> staticColumns = new HashSet<ColumnIdentifier>();
 
-        private boolean useCompactStorage;
         private final Multiset<ColumnIdentifier> definedNames = HashMultiset.create(1);
 
         private final boolean ifNotExists;
@@ -223,7 +221,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
             properties.validate();
 
-            CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists, staticColumns);
+            CreateTableStatement stmt = new CreateTableStatement(cfName, properties.properties, ifNotExists, staticColumns);
 
             for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
             {
@@ -240,7 +238,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 throw new InvalidRequestException("No PRIMARY KEY specifed (exactly one required)");
             if (keyAliases.size() > 1)
                 throw new InvalidRequestException("Multiple PRIMARY KEYs specifed (exactly one required)");
-            if (stmt.hasCounters && properties.getDefaultTimeToLive() > 0)
+            if (stmt.hasCounters && properties.properties.getDefaultTimeToLive() > 0)
                 throw new InvalidRequestException("Cannot set default_time_to_live on a table with counters");
 
             List<ColumnIdentifier> kAliases = keyAliases.get(0);
@@ -279,6 +277,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
                         throw new InvalidRequestException("Cannot mix counter and non counter columns in the same table");
             }
 
+            boolean useCompactStorage = properties.useCompactStorage;
             // Dense means that on the thrift side, no part of the "thrift column name" stores a "CQL/metadata column name".
             // This means COMPACT STORAGE with at least one clustering type (otherwise it's a thrift "static" CF).
             stmt.isDense = useCompactStorage && !stmt.clusteringTypes.isEmpty();
@@ -327,18 +326,18 @@ public class CreateTableStatement extends SchemaAlteringStatement
             }
 
             // If we give a clustering order, we must explicitly do so for all aliases and in the order of the PK
-            if (!definedOrdering.isEmpty())
+            if (!properties.definedOrdering.isEmpty())
             {
-                if (definedOrdering.size() > columnAliases.size())
+                if (properties.definedOrdering.size() > columnAliases.size())
                     throw new InvalidRequestException("Only clustering key columns can be defined in CLUSTERING ORDER directive");
 
                 int i = 0;
-                for (ColumnIdentifier id : definedOrdering.keySet())
+                for (ColumnIdentifier id : properties.definedOrdering.keySet())
                 {
                     ColumnIdentifier c = columnAliases.get(i);
                     if (!id.equals(c))
                     {
-                        if (definedOrdering.containsKey(c))
+                        if (properties.definedOrdering.containsKey(c))
                             throw new InvalidRequestException(String.format("The order of columns in the CLUSTERING ORDER directive must be the one of the clustering key (%s must appear before %s)", c, id));
                         else
                             throw new InvalidRequestException(String.format("Missing CLUSTERING ORDER for column %s", c));
@@ -359,7 +358,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 throw new InvalidRequestException(String.format("Invalid collection type for PRIMARY KEY component %s", t));
 
             columns.remove(t);
-            Boolean isReversed = definedOrdering.get(t);
+            Boolean isReversed = properties.definedOrdering.get(t);
             return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
         }
 
@@ -380,15 +379,5 @@ public class CreateTableStatement extends SchemaAlteringStatement
         {
             columnAliases.add(alias);
         }
-
-        public void setOrdering(ColumnIdentifier alias, boolean reversed)
-        {
-            definedOrdering.put(alias, reversed);
-        }
-
-        public void setCompactStorage()
-        {
-            useCompactStorage = true;
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
index c2d5616..706cfea 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
@@ -58,7 +58,10 @@ public class CreateTriggerStatement extends SchemaAlteringStatement
 
     public void validate(ClientState state) throws RequestValidationException
     {
-        ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
+        CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
+        if (cfm.isMaterializedView())
+            throw new InvalidRequestException("Cannot CREATE TRIGGER against a materialized view");
+
         try
         {
             TriggerExecutor.instance.loadTriggerInstance(triggerClass);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
new file mode 100644
index 0000000..01d138c
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.db.KeyspaceNotDefinedException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.Event;
+
+public class DropMaterializedViewStatement extends SchemaAlteringStatement
+{
+    public final boolean ifExists;
+
+    public DropMaterializedViewStatement(CFName cf, boolean ifExists)
+    {
+        super(cf);
+        this.ifExists = ifExists;
+    }
+
+    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+    {
+        state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.DROP);
+    }
+
+    public void validate(ClientState state)
+    {
+        // validated in findIndexedCf()
+    }
+
+    public Event.SchemaChange changeEvent()
+    {
+        return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+    }
+
+    public boolean announceMigration(boolean isLocalOnly) throws InvalidRequestException, ConfigurationException
+    {
+        try
+        {
+            CFMetaData viewCfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
+            if (viewCfm == null)
+                throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", columnFamily(), keyspace()));
+            if (!viewCfm.isMaterializedView())
+                throw new ConfigurationException(String.format("Cannot drop non materialized view '%s' in keyspace '%s'", columnFamily(), keyspace()));
+
+            CFMetaData baseCfm = findBaseCf();
+            if (baseCfm == null)
+                throw new ConfigurationException(String.format("Cannot drop materialized view '%s' in keyspace '%s' without base CF.", columnFamily(), keyspace()));
+
+            CFMetaData updatedCfm = baseCfm.copy();
+            updatedCfm.materializedViews(updatedCfm.getMaterializedViews().without(columnFamily()));
+            MigrationManager.announceColumnFamilyUpdate(updatedCfm, false, isLocalOnly);
+            MigrationManager.announceColumnFamilyDrop(keyspace(), columnFamily(), isLocalOnly);
+            return true;
+        }
+        catch (ConfigurationException e)
+        {
+            if (ifExists)
+                return false;
+            throw e;
+        }
+    }
+
+    private CFMetaData findBaseCf() throws InvalidRequestException
+    {
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace());
+        if (ksm == null)
+            throw new KeyspaceNotDefinedException("Keyspace " + keyspace() + " does not exist");
+
+        for (CFMetaData cfm : ksm.tables)
+        {
+            if (cfm.getMaterializedViews().get(columnFamily()).isPresent())
+                return cfm;
+        }
+
+        if (ifExists)
+            return null;
+        else
+            throw new InvalidRequestException("View '" + cfName + "' could not be found in any of the tables of keyspace '" + keyspace() + '\'');
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
index e690c3e..35dc947 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -58,6 +61,28 @@ public class DropTableStatement extends SchemaAlteringStatement
     {
         try
         {
+            CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
+            if (cfm != null)
+            {
+                if (cfm.isMaterializedView())
+                    throw new InvalidRequestException("Cannot use DROP TABLE on Materialized View");
+
+                boolean rejectDrop = false;
+                StringBuilder messageBuilder = new StringBuilder();
+                for (MaterializedViewDefinition def : cfm.getMaterializedViews())
+                {
+                    if (rejectDrop)
+                        messageBuilder.append(',');
+                    rejectDrop = true;
+                    messageBuilder.append(def.viewName);
+                }
+                if (rejectDrop)
+                {
+                    throw new InvalidRequestException(String.format("Cannot drop table when materialized views still depend on it (%s.{%s})",
+                                                                    keyspace(),
+                                                                    messageBuilder.toString()));
+                }
+            }
             MigrationManager.announceColumnFamilyDrop(keyspace(), columnFamily(), isLocalOnly);
             return true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index e827309..2f3de4c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.MaterializedViewDefinition;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
@@ -155,6 +156,16 @@ public abstract class ModificationStatement implements CQLStatement
         return cfm.isCounter();
     }
 
+    public boolean isMaterializedView()
+    {
+        return cfm.isMaterializedView();
+    }
+
+    public boolean hasMaterializedViews()
+    {
+        return !cfm.getMaterializedViews().isEmpty();
+    }
+
     public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
     {
         return attrs.getTimestamp(now, options);
@@ -178,6 +189,15 @@ public abstract class ModificationStatement implements CQLStatement
         if (hasConditions())
             state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT);
 
+        // MV updates need to get the current state from the table, and might update the materialized views
+        // Require Permission.SELECT on the base table, and Permission.MODIFY on the views
+        if (hasMaterializedViews())
+        {
+            state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT);
+            for (MaterializedViewDefinition view : cfm.getMaterializedViews())
+                state.hasColumnFamilyAccess(keyspace(), view.viewName, Permission.MODIFY);
+        }
+
         for (Function function : getFunctions())
             state.ensureHasPermission(Permission.EXECUTE, function);
     }
@@ -192,6 +212,9 @@ public abstract class ModificationStatement implements CQLStatement
 
         if (isCounter() && attrs.isTimeToLiveSet())
             throw new InvalidRequestException("Cannot provide custom TTL for counter updates");
+
+        if (isMaterializedView())
+            throw new InvalidRequestException("Cannot directly modify a materialized view");
     }
 
     public void addOperation(Operation op)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1a7de37..84d621b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
@@ -684,10 +685,10 @@ public class SelectStatement implements CQLStatement
 
     public static class RawStatement extends CFStatement
     {
-        private final Parameters parameters;
-        private final List<RawSelector> selectClause;
-        private final List<Relation> whereClause;
-        private final Term.Raw limit;
+        public final Parameters parameters;
+        public final List<RawSelector> selectClause;
+        public final List<Relation> whereClause;
+        public final Term.Raw limit;
 
         public RawStatement(CFName cfName, Parameters parameters, List<RawSelector> selectClause, List<Relation> whereClause, Term.Raw limit)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 9234a79..5dd306a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -62,6 +64,10 @@ public class TruncateStatement extends CFStatement implements CQLStatement
     {
         try
         {
+            CFMetaData metaData = Schema.instance.getCFMetaData(keyspace(), columnFamily());
+            if (metaData.isMaterializedView())
+                throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
+
             StorageProxy.truncateBlocking(keyspace(), columnFamily());
         }
         catch (UnavailableException | TimeoutException | IOException e)