You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/07/17 15:40:26 UTC

[04/13] cassandra git commit: Make all DDL statements idempotent and not dependent on global state

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 09ec62a..fc09c24 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.schema;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
@@ -38,8 +37,13 @@ import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnknownTableException;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -79,20 +83,6 @@ public final class Schema
     }
 
     /**
-     * Validates that the provided keyspace is not one of the system keyspace.
-     *
-     * @param keyspace the keyspace name to validate.
-     *
-     * @throws InvalidRequestException if {@code keyspace} is the name of a
-     * system keyspace.
-     */
-    public static void validateKeyspaceNotSystem(String keyspace)
-    {
-        if (SchemaConstants.isLocalSystemKeyspace(keyspace))
-            throw new InvalidRequestException(format("%s keyspace is not user-modifiable", keyspace));
-    }
-
-    /**
      * load keyspace (keyspace) definitions, but do not initialize the keyspace instances.
      * Schema version may be updated as the result.
      */
@@ -108,22 +98,12 @@ public final class Schema
      */
     public void loadFromDisk(boolean updateVersion)
     {
-        load(SchemaKeyspace.fetchNonSystemKeyspaces());
+        SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load);
         if (updateVersion)
             updateVersion();
     }
 
     /**
-     * Load up non-system keyspaces
-     *
-     * @param keyspaceDefs The non-system keyspace definitions
-     */
-    private void load(Iterable<KeyspaceMetadata> keyspaceDefs)
-    {
-        keyspaceDefs.forEach(this::load);
-    }
-
-    /**
      * Update (or insert) new keyspace definition
      *
      * @param ksm The metadata about keyspace
@@ -153,50 +133,33 @@ public final class Schema
     private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated)
     {
         Keyspace keyspace = getKeyspaceInstance(updated.name);
-        if (keyspace != null)
+        if (null != keyspace)
             keyspace.setMetadata(updated);
 
-        MapDifference<TableId, TableMetadata> tablesDiff = previous.tables.diff(updated.tables);
-        MapDifference<TableId, ViewMetadata> viewsDiff = previous.views.diff(updated.views);
+        Tables.TablesDiff tablesDiff = Tables.diff(previous.tables, updated.tables);
+        Views.ViewsDiff viewsDiff = Views.diff(previous.views, updated.views);
+
         MapDifference<String, TableMetadata> indexesDiff = previous.tables.indexesDiff(updated.tables);
 
         // clean up after removed entries
-
-        tablesDiff.entriesOnlyOnLeft()
-                  .values()
-                  .forEach(table -> metadataRefs.remove(table.id));
-
-        viewsDiff.entriesOnlyOnLeft()
-                 .values()
-                 .forEach(view -> metadataRefs.remove(view.metadata.id));
+        tablesDiff.dropped.forEach(table -> metadataRefs.remove(table.id));
+        viewsDiff.dropped.forEach(view -> metadataRefs.remove(view.metadata.id));
 
         indexesDiff.entriesOnlyOnLeft()
                    .values()
                    .forEach(indexTable -> indexMetadataRefs.remove(Pair.create(indexTable.keyspace, indexTable.indexName().get())));
 
         // load up new entries
-
-        tablesDiff.entriesOnlyOnRight()
-                  .values()
-                  .forEach(table -> metadataRefs.put(table.id, new TableMetadataRef(table)));
-
-        viewsDiff.entriesOnlyOnRight()
-                 .values()
-                 .forEach(view -> metadataRefs.put(view.metadata.id, new TableMetadataRef(view.metadata)));
+        tablesDiff.created.forEach(table -> metadataRefs.put(table.id, new TableMetadataRef(table)));
+        viewsDiff.created.forEach(view -> metadataRefs.put(view.metadata.id, new TableMetadataRef(view.metadata)));
 
         indexesDiff.entriesOnlyOnRight()
                    .values()
                    .forEach(indexTable -> indexMetadataRefs.put(Pair.create(indexTable.keyspace, indexTable.indexName().get()), new TableMetadataRef(indexTable)));
 
         // refresh refs to updated ones
-
-        tablesDiff.entriesDiffering()
-                  .values()
-                  .forEach(diff -> metadataRefs.get(diff.rightValue().id).set(diff.rightValue()));
-
-        viewsDiff.entriesDiffering()
-                 .values()
-                 .forEach(diff -> metadataRefs.get(diff.rightValue().metadata.id).set(diff.rightValue().metadata));
+        tablesDiff.altered.forEach(diff -> metadataRefs.get(diff.after.id).set(diff.after));
+        viewsDiff.altered.forEach(diff -> metadataRefs.get(diff.after.metadata.id).set(diff.after.metadata));
 
         indexesDiff.entriesDiffering()
                    .values()
@@ -556,7 +519,16 @@ public final class Schema
     public void updateVersionAndAnnounce()
     {
         updateVersion();
-        MigrationManager.passiveAnnounce(version);
+        passiveAnnounceVersion();
+    }
+
+    /**
+     * Announce my version passively over gossip.
+     * Used to notify nodes as they arrive in the cluster.
+     */
+    private void passiveAnnounceVersion()
+    {
+        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version));
     }
 
     /**
@@ -576,7 +548,7 @@ public final class Schema
     {
         Keyspaces before = keyspaces.filter(k -> !SchemaConstants.isLocalSystemKeyspace(k.name));
         Keyspaces after = SchemaKeyspace.fetchNonSystemKeyspaces();
-        merge(before, after);
+        merge(Keyspaces.diff(before, after));
         updateVersionAndAnnounce();
     }
 
@@ -594,6 +566,60 @@ public final class Schema
         updateVersionAndAnnounce();
     }
 
+    public synchronized TransformationResult transform(SchemaTransformation transformation, boolean locally, long now)
+    {
+        KeyspacesDiff diff;
+        try
+        {
+            Keyspaces before = keyspaces;
+            Keyspaces after = transformation.apply(before);
+            diff = Keyspaces.diff(before, after);
+        }
+        catch (RuntimeException e)
+        {
+            return new TransformationResult(e);
+        }
+
+        if (diff.isEmpty())
+            return new TransformationResult(diff, Collections.emptyList());
+
+        Collection<Mutation> mutations = SchemaKeyspace.convertSchemaDiffToMutations(diff, now);
+        SchemaKeyspace.applyChanges(mutations);
+
+        merge(diff);
+        updateVersion();
+        if (!locally)
+            passiveAnnounceVersion();
+
+        return new TransformationResult(diff, mutations);
+    }
+
+    public static final class TransformationResult
+    {
+        public final boolean success;
+        public final RuntimeException exception;
+        public final KeyspacesDiff diff;
+        public final Collection<Mutation> mutations;
+
+        private TransformationResult(boolean success, RuntimeException exception, KeyspacesDiff diff, Collection<Mutation> mutations)
+        {
+            this.success = success;
+            this.exception = exception;
+            this.diff = diff;
+            this.mutations = mutations;
+        }
+
+        TransformationResult(RuntimeException exception)
+        {
+            this(false, exception, null, null);
+        }
+
+        TransformationResult(KeyspacesDiff diff, Collection<Mutation> mutations)
+        {
+            this(true, null, diff, mutations);
+        }
+    }
+
     synchronized void merge(Collection<Mutation> mutations)
     {
         // only compare the keyspaces affected by this set of schema mutations
@@ -608,71 +634,57 @@ public final class Schema
         // apply the schema mutations and fetch the new versions of the altered keyspaces
         Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
 
-        merge(before, after);
+        merge(Keyspaces.diff(before, after));
     }
 
-    private synchronized void merge(Keyspaces before, Keyspaces after)
+    private void merge(KeyspacesDiff diff)
     {
-        MapDifference<String, KeyspaceMetadata> keyspacesDiff = before.diff(after);
-
-        // dropped keyspaces
-        keyspacesDiff.entriesOnlyOnLeft().values().forEach(this::dropKeyspace);
-
-        // new keyspaces
-        keyspacesDiff.entriesOnlyOnRight().values().forEach(this::createKeyspace);
-
-        // updated keyspaces
-        keyspacesDiff.entriesDiffering().entrySet().forEach(diff -> alterKeyspace(diff.getValue().leftValue(), diff.getValue().rightValue()));
+        diff.dropped.forEach(this::dropKeyspace);
+        diff.created.forEach(this::createKeyspace);
+        diff.altered.forEach(this::alterKeyspace);
     }
 
-    private void alterKeyspace(KeyspaceMetadata before, KeyspaceMetadata after)
+    private void alterKeyspace(KeyspaceDiff delta)
     {
-        // calculate the deltas
-        MapDifference<TableId, TableMetadata> tablesDiff = before.tables.diff(after.tables);
-        MapDifference<TableId, ViewMetadata> viewsDiff = before.views.diff(after.views);
-        MapDifference<ByteBuffer, UserType> typesDiff = before.types.diff(after.types);
-        MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = before.functions.udfsDiff(after.functions);
-        MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff = before.functions.udasDiff(after.functions);
-
         // drop tables and views
-        viewsDiff.entriesOnlyOnLeft().values().forEach(this::dropView);
-        tablesDiff.entriesOnlyOnLeft().values().forEach(this::dropTable);
+        delta.views.dropped.forEach(this::dropView);
+        delta.tables.dropped.forEach(this::dropTable);
 
-        load(after);
+        load(delta.after);
 
         // add tables and views
-        tablesDiff.entriesOnlyOnRight().values().forEach(this::createTable);
-        viewsDiff.entriesOnlyOnRight().values().forEach(this::createView);
+        delta.tables.created.forEach(this::createTable);
+        delta.views.created.forEach(this::createView);
 
         // update tables and views
-        tablesDiff.entriesDiffering().values().forEach(diff -> alterTable(diff.rightValue()));
-        viewsDiff.entriesDiffering().values().forEach(diff -> alterView(diff.rightValue()));
+        delta.tables.altered.forEach(diff -> alterTable(diff.after));
+        delta.views.altered.forEach(diff -> alterView(diff.after));
 
         // deal with all removed, added, and altered views
-        Keyspace.open(before.name).viewManager.reload(true);
+        Keyspace.open(delta.after.name).viewManager.reload(true);
 
         // notify on everything dropped
-        udasDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropAggregate);
-        udfsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropFunction);
-        viewsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropView);
-        tablesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropTable);
-        typesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropType);
+        delta.udas.dropped.forEach(uda -> notifyDropAggregate((UDAggregate) uda));
+        delta.udfs.dropped.forEach(udf -> notifyDropFunction((UDFunction) udf));
+        delta.views.dropped.forEach(this::notifyDropView);
+        delta.tables.dropped.forEach(this::notifyDropTable);
+        delta.types.dropped.forEach(this::notifyDropType);
 
         // notify on everything created
-        typesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateType);
-        tablesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateTable);
-        viewsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateView);
-        udfsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateFunction);
-        udasDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateAggregate);
+        delta.types.created.forEach(this::notifyCreateType);
+        delta.tables.created.forEach(this::notifyCreateTable);
+        delta.views.created.forEach(this::notifyCreateView);
+        delta.udfs.created.forEach(udf -> notifyCreateFunction((UDFunction) udf));
+        delta.udas.created.forEach(uda -> notifyCreateAggregate((UDAggregate) uda));
 
         // notify on everything altered
-        if (!before.params.equals(after.params))
-            notifyAlterKeyspace(after);
-        typesDiff.entriesDiffering().values().forEach(diff -> notifyAlterType(diff.rightValue()));
-        tablesDiff.entriesDiffering().values().forEach(diff -> notifyAlterTable(diff.leftValue(), diff.rightValue()));
-        viewsDiff.entriesDiffering().values().forEach(diff -> notifyAlterView(diff.leftValue(), diff.rightValue()));
-        udfsDiff.entriesDiffering().values().forEach(diff -> notifyAlterFunction(diff.rightValue()));
-        udasDiff.entriesDiffering().values().forEach(diff -> notifyAlterAggregate(diff.rightValue()));
+        if (!delta.before.params.equals(delta.after.params))
+            notifyAlterKeyspace(delta.before, delta.after);
+        delta.types.altered.forEach(diff -> notifyAlterType(diff.before, diff.after));
+        delta.tables.altered.forEach(diff -> notifyAlterTable(diff.before, diff.after));
+        delta.views.altered.forEach(diff -> notifyAlterView(diff.before, diff.after));
+        delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, diff.after));
+        delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, diff.after));
     }
 
     private void createKeyspace(KeyspaceMetadata keyspace)
@@ -708,7 +720,7 @@ public final class Schema
 
     private void dropView(ViewMetadata metadata)
     {
-        Keyspace.open(metadata.keyspace).viewManager.stopBuild(metadata.name);
+        Keyspace.open(metadata.keyspace()).viewManager.stopBuild(metadata.name());
         dropTable(metadata.metadata);
     }
 
@@ -732,7 +744,7 @@ public final class Schema
 
     private void createView(ViewMetadata view)
     {
-        Keyspace.open(view.keyspace).initCf(metadataRefs.get(view.metadata.id), true);
+        Keyspace.open(view.keyspace()).initCf(metadataRefs.get(view.metadata.id), true);
     }
 
     private void alterTable(TableMetadata updated)
@@ -742,7 +754,7 @@ public final class Schema
 
     private void alterView(ViewMetadata updated)
     {
-        Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload();
+        Keyspace.open(updated.keyspace()).getColumnFamilyStore(updated.name()).reload();
     }
 
     private void notifyCreateKeyspace(KeyspaceMetadata ksm)
@@ -757,7 +769,7 @@ public final class Schema
 
     private void notifyCreateView(ViewMetadata view)
     {
-        changeListeners.forEach(l -> l.onCreateView(view.keyspace, view.name));
+        changeListeners.forEach(l -> l.onCreateView(view.keyspace(), view.name()));
     }
 
     private void notifyCreateType(UserType ut)
@@ -775,36 +787,36 @@ public final class Schema
         changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()));
     }
 
-    private void notifyAlterKeyspace(KeyspaceMetadata ksm)
+    private void notifyAlterKeyspace(KeyspaceMetadata before, KeyspaceMetadata after)
     {
-        changeListeners.forEach(l -> l.onAlterKeyspace(ksm.name));
+        changeListeners.forEach(l -> l.onAlterKeyspace(after.name));
     }
 
-    private void notifyAlterTable(TableMetadata current, TableMetadata updated)
+    private void notifyAlterTable(TableMetadata before, TableMetadata after)
     {
-        boolean changeAffectedPreparedStatements = current.changeAffectsPreparedStatements(updated);
-        changeListeners.forEach(l -> l.onAlterTable(updated.keyspace, updated.name, changeAffectedPreparedStatements));
+        boolean changeAffectedPreparedStatements = before.changeAffectsPreparedStatements(after);
+        changeListeners.forEach(l -> l.onAlterTable(after.keyspace, after.name, changeAffectedPreparedStatements));
     }
 
-    private void notifyAlterView(ViewMetadata current, ViewMetadata updated)
+    private void notifyAlterView(ViewMetadata before, ViewMetadata after)
     {
-        boolean changeAffectedPreparedStatements = current.metadata.changeAffectsPreparedStatements(updated.metadata);
-        changeListeners.forEach(l ->l.onAlterView(updated.keyspace, updated.name, changeAffectedPreparedStatements));
+        boolean changeAffectedPreparedStatements = before.metadata.changeAffectsPreparedStatements(after.metadata);
+        changeListeners.forEach(l ->l.onAlterView(after.keyspace(), after.name(), changeAffectedPreparedStatements));
     }
 
-    private void notifyAlterType(UserType ut)
+    private void notifyAlterType(UserType before, UserType after)
     {
-        changeListeners.forEach(l -> l.onAlterType(ut.keyspace, ut.getNameAsString()));
+        changeListeners.forEach(l -> l.onAlterType(after.keyspace, after.getNameAsString()));
     }
 
-    private void notifyAlterFunction(UDFunction udf)
+    private void notifyAlterFunction(UDFunction before, UDFunction after)
     {
-        changeListeners.forEach(l -> l.onAlterFunction(udf.name().keyspace, udf.name().name, udf.argTypes()));
+        changeListeners.forEach(l -> l.onAlterFunction(after.name().keyspace, after.name().name, after.argTypes()));
     }
 
-    private void notifyAlterAggregate(UDAggregate udf)
+    private void notifyAlterAggregate(UDAggregate before, UDAggregate after)
     {
-        changeListeners.forEach(l -> l.onAlterAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()));
+        changeListeners.forEach(l -> l.onAlterAggregate(after.name().keyspace, after.name().name, after.argTypes()));
     }
 
     private void notifyDropKeyspace(KeyspaceMetadata ksm)
@@ -819,7 +831,7 @@ public final class Schema
 
     private void notifyDropView(ViewMetadata view)
     {
-        changeListeners.forEach(l -> l.onDropView(view.keyspace, view.name));
+        changeListeners.forEach(l -> l.onDropView(view.keyspace(), view.name()));
     }
 
     private void notifyDropType(UserType ut)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4945fc2..553ccdc 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -30,20 +30,20 @@ import com.google.common.hash.Hasher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.antlr.runtime.RecognitionException;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -53,6 +53,7 @@ import static java.lang.String.format;
 
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 
@@ -270,6 +271,44 @@ public final class SchemaKeyspace
         return KeyspaceMetadata.create(SchemaConstants.SCHEMA_KEYSPACE_NAME, KeyspaceParams.local(), org.apache.cassandra.schema.Tables.of(ALL_TABLE_METADATA));
     }
 
+    static Collection<Mutation> convertSchemaDiffToMutations(KeyspacesDiff diff, long timestamp)
+    {
+        Map<String, Mutation> mutations = new HashMap<>();
+
+        diff.created.forEach(k -> mutations.put(k.name, makeCreateKeyspaceMutation(k, timestamp).build()));
+        diff.dropped.forEach(k -> mutations.put(k.name, makeDropKeyspaceMutation(k, timestamp).build()));
+        diff.altered.forEach(kd ->
+        {
+            KeyspaceMetadata ks = kd.after;
+
+            Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(ks.name, ks.params, timestamp);
+
+            kd.types.created.forEach(t -> addTypeToSchemaMutation(t, builder));
+            kd.types.dropped.forEach(t -> addDropTypeToSchemaMutation(t, builder));
+            kd.types.altered(Difference.SHALLOW).forEach(td -> addTypeToSchemaMutation(td.after, builder));
+
+            kd.tables.created.forEach(t -> addTableToSchemaMutation(t, true, builder));
+            kd.tables.dropped.forEach(t -> addDropTableToSchemaMutation(t, builder));
+            kd.tables.altered(Difference.SHALLOW).forEach(td -> addAlterTableToSchemaMutation(td.before, td.after, builder));
+
+            kd.views.created.forEach(v -> addViewToSchemaMutation(v, true, builder));
+            kd.views.dropped.forEach(v -> addDropViewToSchemaMutation(v, builder));
+            kd.views.altered(Difference.SHALLOW).forEach(vd -> addAlterViewToSchemaMutation(vd.before, vd.after, builder));
+
+            kd.udfs.created.forEach(f -> addFunctionToSchemaMutation((UDFunction) f, builder));
+            kd.udfs.dropped.forEach(f -> addDropFunctionToSchemaMutation((UDFunction) f, builder));
+            kd.udfs.altered(Difference.SHALLOW).forEach(fd -> addFunctionToSchemaMutation(fd.after, builder));
+
+            kd.udas.created.forEach(a -> addAggregateToSchemaMutation((UDAggregate) a, builder));
+            kd.udas.dropped.forEach(a -> addDropAggregateToSchemaMutation((UDAggregate) a, builder));
+            kd.udas.altered(Difference.SHALLOW).forEach(ad -> addAggregateToSchemaMutation(ad.after, builder));
+
+            mutations.put(ks.name, builder.build());
+        });
+
+        return mutations.values();
+    }
+
     /**
      * Add entries to system_schema.* for the hardcoded system keyspaces
      */
@@ -298,7 +337,7 @@ public final class SchemaKeyspace
         ALL.reverse().forEach(table -> getSchemaCFS(table).truncateBlocking());
     }
 
-    static void flush()
+    private static void flush()
     {
         if (!DatabaseDescriptor.isUnsafeSystem())
             ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush()));
@@ -463,15 +502,7 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    static Mutation.SimpleBuilder makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addTypeToSchemaMutation(type, builder);
-        return builder;
-    }
-
-    static void addTypeToSchemaMutation(UserType type, Mutation.SimpleBuilder mutation)
+    private static void addTypeToSchemaMutation(UserType type, Mutation.SimpleBuilder mutation)
     {
         mutation.update(Types)
                 .row(type.getNameAsString())
@@ -479,12 +510,9 @@ public final class SchemaKeyspace
                 .add("field_types", type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList()));
     }
 
-    static Mutation.SimpleBuilder dropTypeFromSchemaMutation(KeyspaceMetadata keyspace, UserType type, long timestamp)
+    private static void addDropTypeToSchemaMutation(UserType type, Mutation.SimpleBuilder builder)
     {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
         builder.update(Types).row(type.name).delete();
-        return builder;
     }
 
     static Mutation.SimpleBuilder makeCreateTableMutation(KeyspaceMetadata keyspace, TableMetadata table, long timestamp)
@@ -495,7 +523,7 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    static void addTableToSchemaMutation(TableMetadata table, boolean withColumnsAndTriggers, Mutation.SimpleBuilder builder)
+    private static void addTableToSchemaMutation(TableMetadata table, boolean withColumnsAndTriggers, Mutation.SimpleBuilder builder)
     {
         Row.SimpleBuilder rowBuilder = builder.update(Tables)
                                               .row(table.name)
@@ -544,13 +572,8 @@ public final class SchemaKeyspace
             builder.add("cdc", params.cdc);
     }
 
-    static Mutation.SimpleBuilder makeUpdateTableMutation(KeyspaceMetadata keyspace,
-                                                                 TableMetadata oldTable,
-                                                                 TableMetadata newTable,
-                                                                 long timestamp)
+    private static void addAlterTableToSchemaMutation(TableMetadata oldTable, TableMetadata newTable, Mutation.SimpleBuilder builder)
     {
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-
         addTableToSchemaMutation(newTable, false, builder);
 
         MapDifference<ByteBuffer, ColumnMetadata> columnDiff = Maps.difference(oldTable.columns, newTable.columns);
@@ -602,7 +625,15 @@ public final class SchemaKeyspace
         // updated indexes need to be updated
         for (MapDifference.ValueDifference<IndexMetadata> diff : indexesDiff.entriesDiffering().values())
             addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), builder);
+    }
 
+    static Mutation.SimpleBuilder makeUpdateTableMutation(KeyspaceMetadata keyspace,
+                                                          TableMetadata oldTable,
+                                                          TableMetadata newTable,
+                                                          long timestamp)
+    {
+        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addAlterTableToSchemaMutation(oldTable, newTable, builder);
         return builder;
     }
 
@@ -632,7 +663,12 @@ public final class SchemaKeyspace
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addDropTableToSchemaMutation(table, builder);
+        return builder;
+    }
 
+    private static void addDropTableToSchemaMutation(TableMetadata table, Mutation.SimpleBuilder builder)
+    {
         builder.update(Tables).row(table.name).delete();
 
         for (ColumnMetadata column : table.columns())
@@ -646,8 +682,6 @@ public final class SchemaKeyspace
 
         for (IndexMetadata index : table.indexes)
             dropIndexFromSchemaMutation(table, index, builder);
-
-        return builder;
     }
 
     private static void addColumnToSchemaMutation(TableMetadata table, ColumnMetadata column, Mutation.SimpleBuilder builder)
@@ -676,7 +710,7 @@ public final class SchemaKeyspace
         builder.update(DroppedColumns)
                .row(table.name, column.column.name.toString())
                .add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime)))
-               .add("type", expandUserTypes(column.column.type).asCQL3Type().toString())
+               .add("type", column.column.type.asCQL3Type().toString())
                .add("kind", column.column.kind.toString().toLowerCase());
     }
 
@@ -697,23 +731,15 @@ public final class SchemaKeyspace
         builder.update(Triggers).row(table.name, trigger.name).delete();
     }
 
-    static Mutation.SimpleBuilder makeCreateViewMutation(KeyspaceMetadata keyspace, ViewMetadata view, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addViewToSchemaMutation(view, true, builder);
-        return builder;
-    }
-
     private static void addViewToSchemaMutation(ViewMetadata view, boolean includeColumns, Mutation.SimpleBuilder builder)
     {
         TableMetadata table = view.metadata;
         Row.SimpleBuilder rowBuilder = builder.update(Views)
-                                              .row(view.name)
+                                              .row(view.name())
                                               .add("include_all_columns", view.includeAllColumns)
                                               .add("base_table_id", view.baseTableId.asUUID())
                                               .add("base_table_name", view.baseTableName)
-                                              .add("where_clause", view.whereClause)
+                                              .add("where_clause", view.whereClause.toString())
                                               .add("id", table.id.asUUID());
 
         addTableParamsToRowBuilder(table.params, rowBuilder);
@@ -728,57 +754,32 @@ public final class SchemaKeyspace
         }
     }
 
-    static Mutation.SimpleBuilder makeDropViewMutation(KeyspaceMetadata keyspace, ViewMetadata view, long timestamp)
+    private static void addDropViewToSchemaMutation(ViewMetadata view, Mutation.SimpleBuilder builder)
     {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-
-        builder.update(Views).row(view.name).delete();
+        builder.update(Views).row(view.name()).delete();
 
         TableMetadata table = view.metadata;
         for (ColumnMetadata column : table.columns())
             dropColumnFromSchemaMutation(table, column, builder);
-
-        for (IndexMetadata index : table.indexes)
-            dropIndexFromSchemaMutation(table, index, builder);
-
-        return builder;
     }
 
-    public static Mutation.SimpleBuilder makeUpdateViewMutation(Mutation.SimpleBuilder builder,
-                                                                ViewMetadata oldView,
-                                                                ViewMetadata newView)
+    private static void addAlterViewToSchemaMutation(ViewMetadata before, ViewMetadata after, Mutation.SimpleBuilder builder)
     {
-        addViewToSchemaMutation(newView, false, builder);
+        addViewToSchemaMutation(after, false, builder);
 
-        MapDifference<ByteBuffer, ColumnMetadata> columnDiff = Maps.difference(oldView.metadata.columns,
-                                                                               newView.metadata.columns);
+        MapDifference<ByteBuffer, ColumnMetadata> columnDiff = Maps.difference(before.metadata.columns, after.metadata.columns);
 
         // columns that are no longer needed
         for (ColumnMetadata column : columnDiff.entriesOnlyOnLeft().values())
-            dropColumnFromSchemaMutation(oldView.metadata, column, builder);
+            dropColumnFromSchemaMutation(before.metadata, column, builder);
 
         // newly added columns
         for (ColumnMetadata column : columnDiff.entriesOnlyOnRight().values())
-            addColumnToSchemaMutation(newView.metadata, column, builder);
+            addColumnToSchemaMutation(after.metadata, column, builder);
 
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
-            addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumn(name), builder);
-
-        // dropped columns
-        MapDifference<ByteBuffer, DroppedColumn> droppedColumnDiff =
-        Maps.difference(oldView.metadata.droppedColumns, oldView.metadata.droppedColumns);
-
-        // newly dropped columns
-        for (DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values())
-            addDroppedColumnToSchemaMutation(oldView.metadata, column, builder);
-
-        // columns added then dropped again
-        for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet())
-            addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.droppedColumns.get(name), builder);
-
-        return builder;
+            addColumnToSchemaMutation(after.metadata, after.metadata.getColumn(name), builder);
     }
 
     private static void addIndexToSchemaMutation(TableMetadata table, IndexMetadata index, Mutation.SimpleBuilder builder)
@@ -801,15 +802,7 @@ public final class SchemaKeyspace
         addIndexToSchemaMutation(table, index, builder);
     }
 
-    static Mutation.SimpleBuilder makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addFunctionToSchemaMutation(function, builder);
-        return builder;
-    }
-
-    static void addFunctionToSchemaMutation(UDFunction function, Mutation.SimpleBuilder builder)
+    private static void addFunctionToSchemaMutation(UDFunction function, Mutation.SimpleBuilder builder)
     {
         builder.update(Functions)
                .row(function.name().name, function.argumentsList())
@@ -832,23 +825,12 @@ public final class SchemaKeyspace
         }
     }
 
-    static Mutation.SimpleBuilder makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+    private static void addDropFunctionToSchemaMutation(UDFunction function, Mutation.SimpleBuilder builder)
     {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
         builder.update(Functions).row(function.name().name, function.argumentsList()).delete();
-        return builder;
     }
 
-    static Mutation.SimpleBuilder makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addAggregateToSchemaMutation(aggregate, builder);
-        return builder;
-    }
-
-    static void addAggregateToSchemaMutation(UDAggregate aggregate, Mutation.SimpleBuilder builder)
+    private static void addAggregateToSchemaMutation(UDAggregate aggregate, Mutation.SimpleBuilder builder)
     {
         builder.update(Aggregates)
                .row(aggregate.name().name, aggregate.argumentsList())
@@ -862,12 +844,9 @@ public final class SchemaKeyspace
                                 : null);
     }
 
-    static Mutation.SimpleBuilder makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+    private static void addDropAggregateToSchemaMutation(UDAggregate aggregate, Mutation.SimpleBuilder builder)
     {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
         builder.update(Aggregates).row(aggregate.name().name, aggregate.argumentsList()).delete();
-        return builder;
     }
 
     /*
@@ -1017,7 +996,7 @@ public final class SchemaKeyspace
         String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS);
         UntypedResultSet columnRows = query(query, keyspace, table);
         if (columnRows.isEmpty())
-            throw new MissingColumns("Columns not found in schema table for " + keyspace + "." + table);
+            throw new MissingColumns("Columns not found in schema table for " + keyspace + '.' + table);
 
         List<ColumnMetadata> columns = new ArrayList<>();
         columnRows.forEach(row -> columns.add(createColumnFromRow(row, types)));
@@ -1120,7 +1099,7 @@ public final class SchemaKeyspace
 
         Views.Builder views = org.apache.cassandra.schema.Views.builder();
         for (UntypedResultSet.Row row : query(query, keyspaceName))
-            views.add(fetchView(keyspaceName, row.getString("view_name"), types));
+            views.put(fetchView(keyspaceName, row.getString("view_name"), types));
         return views.build();
     }
 
@@ -1135,7 +1114,7 @@ public final class SchemaKeyspace
         TableId baseTableId = TableId.fromUUID(row.getUUID("base_table_id"));
         String baseTableName = row.getString("base_table_name");
         boolean includeAll = row.getBoolean("include_all_columns");
-        String whereClause = row.getString("where_clause");
+        String whereClauseString = row.getString("where_clause");
 
         List<ColumnMetadata> columns = fetchColumns(keyspaceName, viewName, types);
 
@@ -1147,31 +1126,36 @@ public final class SchemaKeyspace
                          .params(createTableParamsFromRow(row))
                          .build();
 
-        String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause);
-        SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+        WhereClause whereClause;
 
-        return new ViewMetadata(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, metadata);
+        try
+        {
+            whereClause = WhereClause.parse(whereClauseString);
+        }
+        catch (RecognitionException e)
+        {
+            throw new RuntimeException(format("Unexpected error while parsing materialized view's where clause for '%s' (got %s)", viewName, whereClauseString));
+        }
+
+        return new ViewMetadata(baseTableId, baseTableName, includeAll, whereClause, metadata);
     }
 
     private static Functions fetchFunctions(String keyspaceName, Types types)
     {
-        Functions udfs = fetchUDFs(keyspaceName, types);
-        Functions udas = fetchUDAs(keyspaceName, udfs, types);
+        Collection<UDFunction> udfs = fetchUDFs(keyspaceName, types);
+        Collection<UDAggregate> udas = fetchUDAs(keyspaceName, udfs, types);
 
-        return org.apache.cassandra.schema.Functions.builder()
-                                                    .add(udfs)
-                                                    .add(udas)
-                                                    .build();
+        return org.apache.cassandra.schema.Functions.builder().add(udfs).add(udas).build();
     }
 
-    private static Functions fetchUDFs(String keyspaceName, Types types)
+    private static Collection<UDFunction> fetchUDFs(String keyspaceName, Types types)
     {
         String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, FUNCTIONS);
 
-        Functions.Builder functions = org.apache.cassandra.schema.Functions.builder();
+        Collection<UDFunction> functions = new ArrayList<>();
         for (UntypedResultSet.Row row : query(query, keyspaceName))
             functions.add(createUDFFromRow(row, types));
-        return functions.build();
+        return functions;
     }
 
     private static UDFunction createUDFFromRow(UntypedResultSet.Row row, Types types)
@@ -1230,17 +1214,16 @@ public final class SchemaKeyspace
         }
     }
 
-    private static Functions fetchUDAs(String keyspaceName, Functions udfs, Types types)
+    private static Collection<UDAggregate> fetchUDAs(String keyspaceName, Collection<UDFunction> udfs, Types types)
     {
         String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, AGGREGATES);
 
-        Functions.Builder aggregates = org.apache.cassandra.schema.Functions.builder();
-        for (UntypedResultSet.Row row : query(query, keyspaceName))
-            aggregates.add(createUDAFromRow(row, udfs, types));
-        return aggregates.build();
+        Collection<UDAggregate> aggregates = new ArrayList<>();
+        query(query, keyspaceName).forEach(row -> aggregates.add(createUDAFromRow(row, udfs, types)));
+        return aggregates;
     }
 
-    private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, Functions functions, Types types)
+    private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, Collection<UDFunction> functions, Types types)
     {
         String ksName = row.getString("keyspace_name");
         String functionName = row.getString("aggregate_name");
@@ -1255,18 +1238,12 @@ public final class SchemaKeyspace
         AbstractType<?> returnType = CQLTypeParser.parse(ksName, row.getString("return_type"), types);
 
         FunctionName stateFunc = new FunctionName(ksName, (row.getString("state_func")));
+
         FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
         AbstractType<?> stateType = row.has("state_type") ? CQLTypeParser.parse(ksName, row.getString("state_type"), types) : null;
         ByteBuffer initcond = row.has("initcond") ? Terms.asBytes(ksName, row.getString("initcond"), stateType) : null;
 
-        try
-        {
-            return UDAggregate.create(functions, name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
-        }
-        catch (InvalidRequestException reason)
-        {
-            return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason);
-        }
+        return UDAggregate.create(functions, name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
     }
 
     private static UntypedResultSet query(String query, Object... variables)
@@ -1310,52 +1287,6 @@ public final class SchemaKeyspace
         return keyspaces.build();
     }
 
-    /*
-     * Type parsing and transformation
-     */
-
-    /*
-     * Recursively replaces any instances of UserType with an equivalent TupleType.
-     * We do it for dropped_columns, to allow safely dropping unused user types without retaining any references
-     * in dropped_columns.
-     */
-    private static AbstractType<?> expandUserTypes(AbstractType<?> original)
-    {
-        if (original instanceof UserType)
-            return new TupleType(expandUserTypes(((UserType) original).fieldTypes()));
-
-        if (original instanceof TupleType)
-            return new TupleType(expandUserTypes(((TupleType) original).allTypes()));
-
-        if (original instanceof ListType<?>)
-            return ListType.getInstance(expandUserTypes(((ListType<?>) original).getElementsType()), original.isMultiCell());
-
-        if (original instanceof MapType<?,?>)
-        {
-            MapType<?, ?> mt = (MapType<?, ?>) original;
-            return MapType.getInstance(expandUserTypes(mt.getKeysType()), expandUserTypes(mt.getValuesType()), mt.isMultiCell());
-        }
-
-        if (original instanceof SetType<?>)
-            return SetType.getInstance(expandUserTypes(((SetType<?>) original).getElementsType()), original.isMultiCell());
-
-        // this is very unlikely to ever happen, but it's better to be safe than sorry
-        if (original instanceof ReversedType<?>)
-            return ReversedType.getInstance(expandUserTypes(((ReversedType) original).baseType));
-
-        if (original instanceof CompositeType)
-            return CompositeType.getInstance(expandUserTypes(original.getComponents()));
-
-        return original;
-    }
-
-    private static List<AbstractType<?>> expandUserTypes(List<AbstractType<?>> types)
-    {
-        return types.stream()
-                    .map(SchemaKeyspace::expandUserTypes)
-                    .collect(toList());
-    }
-
     @VisibleForTesting
     static class MissingColumns extends RuntimeException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/SchemaTransformation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaTransformation.java b/src/java/org/apache/cassandra/schema/SchemaTransformation.java
new file mode 100644
index 0000000..c19ac7c
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaTransformation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+public interface SchemaTransformation
+{
+    /**
+     * Apply a statement transformation to a schema snapshot.
+     *
+     * Implementing methods should be side-effect free.
+     *
+     * @param schema Keyspaces to base the transformation on
+     * @return Keyspaces transformed by the statement
+     */
+    Keyspaces apply(Keyspaces schema);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/TableId.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java
index 95256fe..695147f 100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.utils.UUIDGen;
 /**
  * The unique identifier of a table.
  * <p>
- * This is essentially a UUID, but we wrap it as it's used quite a bit in the code and having a nicely name class make
+ * This is essentially a UUID, but we wrap it as it's used quite a bit in the code and having a nicely named class make
  * the code more readable.
  */
 public class TableId

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 47e5b47..6466e2e 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -44,6 +44,7 @@ import static java.lang.String.format;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 
+import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.transform;
 import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
 
@@ -204,6 +205,21 @@ public final class TableMetadata
         return kind == Kind.INDEX;
     }
 
+    public TableMetadata withSwapped(TableParams params)
+    {
+        return unbuild().params(params).build();
+    }
+
+    public TableMetadata withSwapped(Triggers triggers)
+    {
+        return unbuild().triggers(triggers).build();
+    }
+
+    public TableMetadata withSwapped(Indexes indexes)
+    {
+        return unbuild().indexes(indexes).build();
+    }
+
     public boolean isView()
     {
         return kind == Kind.VIEW;
@@ -418,42 +434,62 @@ public final class TableMetadata
         indexes.validate(this);
     }
 
-    void validateCompatibility(TableMetadata other)
+    void validateCompatibility(TableMetadata previous)
     {
         if (isIndex())
             return;
 
-        if (!other.keyspace.equals(keyspace))
-            except("Keyspace mismatch (found %s; expected %s)", other.keyspace, keyspace);
+        if (!previous.keyspace.equals(keyspace))
+            except("Keyspace mismatch (found %s; expected %s)", keyspace, previous.keyspace);
 
-        if (!other.name.equals(name))
-            except("Table mismatch (found %s; expected %s)", other.name, name);
+        if (!previous.name.equals(name))
+            except("Table mismatch (found %s; expected %s)", name, previous.name);
 
-        if (!other.id.equals(id))
-            except("Table ID mismatch (found %s; expected %s)", other.id, id);
+        if (!previous.id.equals(id))
+            except("Table ID mismatch (found %s; expected %s)", id, previous.id);
 
-        if (!other.flags.equals(flags))
-            except("Table type mismatch (found %s; expected %s)", other.flags, flags);
+        if (!previous.flags.equals(flags))
+            except("Table type mismatch (found %s; expected %s)", flags, previous.flags);
 
-        if (other.partitionKeyColumns.size() != partitionKeyColumns.size())
-            except("Partition keys of different length (found %s; expected %s)", other.partitionKeyColumns.size(), partitionKeyColumns.size());
+        if (previous.partitionKeyColumns.size() != partitionKeyColumns.size())
+        {
+            except("Partition keys of different length (found %s; expected %s)",
+                   partitionKeyColumns.size(),
+                   previous.partitionKeyColumns.size());
+        }
 
         for (int i = 0; i < partitionKeyColumns.size(); i++)
-            if (!other.partitionKeyColumns.get(i).type.isCompatibleWith(partitionKeyColumns.get(i).type))
-                except("Partition key column mismatch (found %s; expected %s)", other.partitionKeyColumns.get(i).type, partitionKeyColumns.get(i).type);
+        {
+            if (!partitionKeyColumns.get(i).type.isCompatibleWith(previous.partitionKeyColumns.get(i).type))
+            {
+                except("Partition key column mismatch (found %s; expected %s)",
+                       partitionKeyColumns.get(i).type,
+                       previous.partitionKeyColumns.get(i).type);
+            }
+        }
 
-        if (other.clusteringColumns.size() != clusteringColumns.size())
-            except("Clustering columns of different length (found %s; expected %s)", other.clusteringColumns.size(), clusteringColumns.size());
+        if (previous.clusteringColumns.size() != clusteringColumns.size())
+        {
+            except("Clustering columns of different length (found %s; expected %s)",
+                   clusteringColumns.size(),
+                   previous.clusteringColumns.size());
+        }
 
         for (int i = 0; i < clusteringColumns.size(); i++)
-            if (!other.clusteringColumns.get(i).type.isCompatibleWith(clusteringColumns.get(i).type))
-                except("Clustering column mismatch (found %s; expected %s)", other.clusteringColumns.get(i).type, clusteringColumns.get(i).type);
+        {
+            if (!clusteringColumns.get(i).type.isCompatibleWith(previous.clusteringColumns.get(i).type))
+            {
+                except("Clustering column mismatch (found %s; expected %s)",
+                       clusteringColumns.get(i).type,
+                       previous.clusteringColumns.get(i).type);
+            }
+        }
 
-        for (ColumnMetadata otherColumn : other.regularAndStaticColumns)
+        for (ColumnMetadata previousColumn : previous.regularAndStaticColumns)
         {
-            ColumnMetadata column = getColumn(otherColumn.name);
-            if (column != null && !otherColumn.type.isCompatibleWith(column.type))
-                except("Column mismatch (found %s; expected %s", otherColumn, column);
+            ColumnMetadata column = getColumn(previousColumn.name);
+            if (column != null && !column.type.isCompatibleWith(previousColumn.type))
+                except("Column mismatch (found %s; expected %s)", column, previousColumn);
         }
     }
 
@@ -472,7 +508,7 @@ public final class TableMetadata
      * This method should only be called for superColumn tables and "static
      * compact" ones. For any other table, all column names are UTF8.
      */
-    public AbstractType<?> staticCompactOrSuperTableColumnNameType()
+    AbstractType<?> staticCompactOrSuperTableColumnNameType()
     {
         if (isSuper())
         {
@@ -543,6 +579,22 @@ public final class TableMetadata
         return unbuild().params(builder.build()).build();
     }
 
+    boolean referencesUserType(ByteBuffer name)
+    {
+        return any(columns(), c -> c.type.referencesUserType(name));
+    }
+
+    public TableMetadata withUpdatedUserType(UserType udt)
+    {
+        if (!referencesUserType(udt.name))
+            return this;
+
+        Builder builder = unbuild();
+        columns().forEach(c -> builder.alterColumnType(c.name, c.type.withUpdatedUserType(udt)));
+
+        return builder.build();
+    }
+
     private void except(String format, Object... args)
     {
         throw new ConfigurationException(keyspace + "." + name + ": " + format(format, args));
@@ -559,6 +611,11 @@ public final class TableMetadata
 
         TableMetadata tm = (TableMetadata) o;
 
+        return equalsWithoutColumns(tm) && columns.equals(tm.columns);
+    }
+
+    private boolean equalsWithoutColumns(TableMetadata tm)
+    {
         return keyspace.equals(tm.keyspace)
             && name.equals(tm.name)
             && id.equals(tm.id)
@@ -566,12 +623,46 @@ public final class TableMetadata
             && kind == tm.kind
             && params.equals(tm.params)
             && flags.equals(tm.flags)
-            && columns.equals(tm.columns)
             && droppedColumns.equals(tm.droppedColumns)
             && indexes.equals(tm.indexes)
             && triggers.equals(tm.triggers);
     }
 
+    Optional<Difference> compare(TableMetadata other)
+    {
+        return equalsWithoutColumns(other)
+             ? compareColumns(other.columns)
+             : Optional.of(Difference.SHALLOW);
+    }
+
+    private Optional<Difference> compareColumns(Map<ByteBuffer, ColumnMetadata> other)
+    {
+        if (!columns.keySet().equals(other.keySet()))
+            return Optional.of(Difference.SHALLOW);
+
+        boolean differsDeeply = false;
+
+        for (Map.Entry<ByteBuffer, ColumnMetadata> entry : columns.entrySet())
+        {
+            ColumnMetadata thisColumn = entry.getValue();
+            ColumnMetadata thatColumn = other.get(entry.getKey());
+
+            Optional<Difference> difference = thisColumn.compare(thatColumn);
+            if (difference.isPresent())
+            {
+                switch (difference.get())
+                {
+                    case SHALLOW:
+                        return difference;
+                    case DEEP:
+                        differsDeeply = true;
+                }
+            }
+        }
+
+        return differsDeeply ? Optional.of(Difference.DEEP) : Optional.empty();
+    }
+
     @Override
     public int hashCode()
     {
@@ -858,7 +949,7 @@ public final class TableMetadata
             return this;
         }
 
-        public Builder addColumns(Iterable<ColumnMetadata> columns)
+        Builder addColumns(Iterable<ColumnMetadata> columns)
         {
             columns.forEach(this::addColumn);
             return this;
@@ -884,7 +975,7 @@ public final class TableMetadata
 
         public Builder recordColumnDrop(ColumnMetadata column, long timeMicros)
         {
-            droppedColumns.put(column.name.bytes, new DroppedColumn(column, timeMicros));
+            droppedColumns.put(column.name.bytes, new DroppedColumn(column.withNewType(column.type.expandUserTypes()), timeMicros));
             return this;
         }
 
@@ -950,7 +1041,7 @@ public final class TableMetadata
             return this;
         }
 
-        public Builder alterColumnType(ColumnIdentifier name, AbstractType<?> type)
+        Builder alterColumnType(ColumnIdentifier name, AbstractType<?> type)
         {
             ColumnMetadata column = columns.get(name.bytes);
             if (column == null)
@@ -986,6 +1077,8 @@ public final class TableMetadata
      * Currently this is only used by views with normal base column as PK column
      * so updates to other columns do not make the row live when the base column
      * is not live. See CASSANDRA-11500.
+     *
+     * TODO: does not belong here, should be gone
      */
     public boolean enforceStrictLiveness()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/TableMetadataRef.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadataRef.java b/src/java/org/apache/cassandra/schema/TableMetadataRef.java
index 5ff9d5b..3c45594 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadataRef.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadataRef.java
@@ -66,7 +66,7 @@ public final class TableMetadataRef
      */
     void set(TableMetadata metadata)
     {
-        get().validateCompatibility(metadata);
+        metadata.validateCompatibility(get());
         this.metadata = metadata;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 78dc894..f5b3c89 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -245,7 +245,7 @@ public final class TableParams
     {
         private String comment = "";
         private Double bloomFilterFpChance;
-        public Double crcCheckChance = 1.0;
+        private double crcCheckChance = 1.0;
         private int gcGraceSeconds = 864000; // 10 days
         private int defaultTimeToLive = 0;
         private int memtableFlushPeriodInMs = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Tables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Tables.java b/src/java/org/apache/cassandra/schema/Tables.java
index a83c061..0320440 100644
--- a/src/java/org/apache/cassandra/schema/Tables.java
+++ b/src/java/org/apache/cassandra/schema/Tables.java
@@ -17,32 +17,38 @@
  */
 package org.apache.cassandra.schema;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Predicate;
 
 import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
 
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.index.internal.CassandraIndex;
 
-import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
 
 /**
  * An immutable container for a keyspace's Tables.
  */
 public final class Tables implements Iterable<TableMetadata>
 {
+    private static final Tables NONE = builder().build();
+
     private final ImmutableMap<String, TableMetadata> tables;
+    private final ImmutableMap<TableId, TableMetadata> tablesById;
     private final ImmutableMap<String, TableMetadata> indexTables;
 
     private Tables(Builder builder)
     {
         tables = builder.tables.build();
+        tablesById = builder.tablesById.build();
         indexTables = builder.indexTables.build();
     }
 
@@ -53,7 +59,7 @@ public final class Tables implements Iterable<TableMetadata>
 
     public static Tables none()
     {
-        return builder().build();
+        return NONE;
     }
 
     public static Tables of(TableMetadata... tables)
@@ -71,6 +77,11 @@ public final class Tables implements Iterable<TableMetadata>
         return tables.values().iterator();
     }
 
+    public Iterable<TableMetadata> referencingUserType(ByteBuffer name)
+    {
+        return Iterables.filter(tables.values(), t -> t.referencesUserType(name));
+    }
+
     ImmutableMap<String, TableMetadata> indexTables()
     {
         return indexTables;
@@ -105,9 +116,21 @@ public final class Tables implements Iterable<TableMetadata>
     }
 
     @Nullable
-    public TableMetadata getIndexTableNullable(String name)
+    TableMetadata getNullable(TableId id)
     {
-        return indexTables.get(name);
+        return tablesById.get(id);
+    }
+
+    boolean containsTable(TableId id)
+    {
+        return tablesById.containsKey(id);
+    }
+
+    public Tables filter(Predicate<TableMetadata> predicate)
+    {
+        Builder builder = builder();
+        tables.values().stream().filter(predicate).forEach(builder::add);
+        return builder.build();
     }
 
     /**
@@ -134,18 +157,19 @@ public final class Tables implements Iterable<TableMetadata>
         TableMetadata table =
             get(name).orElseThrow(() -> new IllegalStateException(String.format("Table %s doesn't exists", name)));
 
-        return builder().add(filter(this, t -> t != table)).build();
+        return without(table);
     }
 
-    MapDifference<TableId, TableMetadata> diff(Tables other)
+    public Tables without(TableMetadata table)
     {
-        Map<TableId, TableMetadata> thisTables = new HashMap<>();
-        this.forEach(t -> thisTables.put(t.id, t));
-
-        Map<TableId, TableMetadata> otherTables = new HashMap<>();
-        other.forEach(t -> otherTables.put(t.id, t));
+        return filter(t -> t != table);
+    }
 
-        return Maps.difference(thisTables, otherTables);
+    public Tables withUpdatedUserType(UserType udt)
+    {
+        return any(this, t -> t.referencesUserType(udt.name))
+             ? builder().add(transform(this, t -> t.withUpdatedUserType(udt))).build()
+             : this;
     }
 
     MapDifference<String, TableMetadata> indexesDiff(Tables other)
@@ -180,6 +204,7 @@ public final class Tables implements Iterable<TableMetadata>
     public static final class Builder
     {
         final ImmutableMap.Builder<String, TableMetadata> tables = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<TableId, TableMetadata> tablesById = new ImmutableMap.Builder<>();
         final ImmutableMap.Builder<String, TableMetadata> indexTables = new ImmutableMap.Builder<>();
 
         private Builder()
@@ -195,6 +220,8 @@ public final class Tables implements Iterable<TableMetadata>
         {
             tables.put(table.name, table);
 
+            tablesById.put(table.id, table);
+
             table.indexes
                  .stream()
                  .filter(i -> !i.isCustom())
@@ -217,4 +244,38 @@ public final class Tables implements Iterable<TableMetadata>
             return this;
         }
     }
+
+    static TablesDiff diff(Tables before, Tables after)
+    {
+        return TablesDiff.diff(before, after);
+    }
+
+    public static final class TablesDiff extends Diff<Tables, TableMetadata>
+    {
+        private final static TablesDiff NONE = new TablesDiff(Tables.none(), Tables.none(), ImmutableList.of());
+
+        private TablesDiff(Tables created, Tables dropped, ImmutableCollection<Altered<TableMetadata>> altered)
+        {
+            super(created, dropped, altered);
+        }
+
+        private static TablesDiff diff(Tables before, Tables after)
+        {
+            if (before == after)
+                return NONE;
+
+            Tables created = after.filter(t -> !before.containsTable(t.id));
+            Tables dropped = before.filter(t -> !after.containsTable(t.id));
+
+            ImmutableList.Builder<Altered<TableMetadata>> altered = ImmutableList.builder();
+            before.forEach(tableBefore ->
+            {
+                TableMetadata tableAfter = after.getNullable(tableBefore.id);
+                if (null != tableAfter)
+                    tableBefore.compare(tableAfter).ifPresent(kind -> altered.add(new Altered<>(tableBefore, tableAfter, kind)));
+            });
+
+            return new TablesDiff(created, dropped, altered.build());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Types.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Types.java b/src/java/org/apache/cassandra/schema/Types.java
index 0bdf7cf..64aeead 100644
--- a/src/java/org/apache/cassandra/schema/Types.java
+++ b/src/java/org/apache/cassandra/schema/Types.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.schema;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.function.Predicate;
 
 import javax.annotation.Nullable;
 
@@ -31,8 +32,11 @@ import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
 import static java.lang.String.format;
-import static com.google.common.collect.Iterables.filter;
 import static java.util.stream.Collectors.toList;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
+
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 /**
@@ -82,6 +86,11 @@ public final class Types implements Iterable<UserType>
         return types.values().iterator();
     }
 
+    public Iterable<UserType> referencingUserType(ByteBuffer name)
+    {
+        return Iterables.filter(types.values(), t -> t.referencesUserType(name) && !t.name.equals(name));
+    }
+
     /**
      * Get the type with the specified name
      *
@@ -105,6 +114,18 @@ public final class Types implements Iterable<UserType>
         return types.get(name);
     }
 
+    boolean containsType(ByteBuffer name)
+    {
+        return types.containsKey(name);
+    }
+
+    Types filter(Predicate<UserType> predicate)
+    {
+        Builder builder = builder();
+        types.values().stream().filter(predicate).forEach(builder::add);
+        return builder.build();
+    }
+
     /**
      * Create a Types instance with the provided type added
      */
@@ -124,12 +145,19 @@ public final class Types implements Iterable<UserType>
         UserType type =
             get(name).orElseThrow(() -> new IllegalStateException(format("Type %s doesn't exists", name)));
 
-        return builder().add(filter(this, t -> t != type)).build();
+        return without(type);
     }
 
-    MapDifference<ByteBuffer, UserType> diff(Types other)
+    public Types without(UserType type)
     {
-        return Maps.difference(types, other.types);
+        return filter(t -> t != type);
+    }
+
+    public Types withUpdatedUserType(UserType udt)
+    {
+        return any(this, t -> t.referencesUserType(udt.name))
+             ? builder().add(transform(this, t -> t.withUpdatedUserType(udt))).build()
+             : this;
     }
 
     @Override
@@ -155,7 +183,7 @@ public final class Types implements Iterable<UserType>
             if (!thisNext.getKey().equals(otherNext.getKey()))
                 return false;
 
-            if (!thisNext.getValue().equals(otherNext.getValue(), true))  // ignore freezing
+            if (!thisNext.getValue().equals(otherNext.getValue()))
                 return false;
         }
         return true;
@@ -305,7 +333,7 @@ public final class Types implements Iterable<UserType>
             {
                 List<FieldIdentifier> preparedFieldNames =
                     fieldNames.stream()
-                              .map(t -> FieldIdentifier.forInternalString(t))
+                              .map(FieldIdentifier::forInternalString)
                               .collect(toList());
 
                 List<AbstractType<?>> preparedFieldTypes =
@@ -329,4 +357,38 @@ public final class Types implements Iterable<UserType>
             }
         }
     }
+
+    static TypesDiff diff(Types before, Types after)
+    {
+        return TypesDiff.diff(before, after);
+    }
+
+    static final class TypesDiff extends Diff<Types, UserType>
+    {
+        private static final TypesDiff NONE = new TypesDiff(Types.none(), Types.none(), ImmutableList.of());
+
+        private TypesDiff(Types created, Types dropped, ImmutableCollection<Altered<UserType>> altered)
+        {
+            super(created, dropped, altered);
+        }
+
+        private static TypesDiff diff(Types before, Types after)
+        {
+            if (before == after)
+                return NONE;
+
+            Types created = after.filter(t -> !before.containsType(t.name));
+            Types dropped = before.filter(t -> !after.containsType(t.name));
+
+            ImmutableList.Builder<Altered<UserType>> altered = ImmutableList.builder();
+            before.forEach(typeBefore ->
+            {
+                UserType typeAfter = after.getNullable(typeBefore.name);
+                if (null != typeAfter)
+                    typeBefore.compare(typeAfter).ifPresent(kind -> altered.add(new Altered<>(typeBefore, typeAfter, kind)));
+            });
+
+            return new TypesDiff(created, dropped, altered.build());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/ViewMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/ViewMetadata.java b/src/java/org/apache/cassandra/schema/ViewMetadata.java
index 57f4092..66360bf 100644
--- a/src/java/org/apache/cassandra/schema/ViewMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ViewMetadata.java
@@ -17,56 +17,52 @@
  */
 package org.apache.cassandra.schema;
 
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-import org.antlr.runtime.*;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.view.View;
-import org.apache.cassandra.exceptions.SyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Optional;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.UserType;
+
 public final class ViewMetadata
 {
-    public final String keyspace;
-    public final String name;
     public final TableId baseTableId;
     public final String baseTableName;
+
     public final boolean includeAllColumns;
     public final TableMetadata metadata;
 
-    public final SelectStatement.RawStatement select;
-    public final String whereClause;
+    public final WhereClause whereClause;
 
     /**
-     * @param name              Name of the view
      * @param baseTableId       Internal ID of the table which this view is based off of
      * @param includeAllColumns Whether to include all columns or not
      */
-    public ViewMetadata(String keyspace,
-                        String name,
-                        TableId baseTableId,
+    public ViewMetadata(TableId baseTableId,
                         String baseTableName,
                         boolean includeAllColumns,
-                        SelectStatement.RawStatement select,
-                        String whereClause,
+                        WhereClause whereClause,
                         TableMetadata metadata)
     {
-        this.keyspace = keyspace;
-        this.name = name;
         this.baseTableId = baseTableId;
         this.baseTableName = baseTableName;
         this.includeAllColumns = includeAllColumns;
-        this.select = select;
         this.whereClause = whereClause;
         this.metadata = metadata;
     }
 
+    public String keyspace()
+    {
+        return metadata.keyspace;
+    }
+
+    public String name()
+    {
+        return metadata.name;
+    }
+
     /**
      * @return true if the view specified by this definition will include the column, false otherwise
      */
@@ -77,7 +73,7 @@ public final class ViewMetadata
 
     public ViewMetadata copy(TableMetadata newMetadata)
     {
-        return new ViewMetadata(keyspace, name, baseTableId, baseTableName, includeAllColumns, select, whereClause, newMetadata);
+        return new ViewMetadata(baseTableId, baseTableName, includeAllColumns, whereClause, newMetadata);
     }
 
     public TableMetadata baseTableMetadata()
@@ -95,20 +91,24 @@ public final class ViewMetadata
             return false;
 
         ViewMetadata other = (ViewMetadata) o;
-        return Objects.equals(keyspace, other.keyspace)
-               && Objects.equals(name, other.name)
-               && Objects.equals(baseTableId, other.baseTableId)
-               && Objects.equals(includeAllColumns, other.includeAllColumns)
-               && Objects.equals(whereClause, other.whereClause)
-               && Objects.equals(metadata, other.metadata);
+        return baseTableId.equals(other.baseTableId)
+            && includeAllColumns == other.includeAllColumns
+            && whereClause.equals(other.whereClause)
+            && metadata.equals(other.metadata);
+    }
+
+    Optional<Difference> compare(ViewMetadata other)
+    {
+        if (!baseTableId.equals(other.baseTableId) || includeAllColumns != other.includeAllColumns || !whereClause.equals(other.whereClause))
+            return Optional.of(Difference.SHALLOW);
+
+        return metadata.compare(other.metadata);
     }
 
     @Override
     public int hashCode()
     {
         return new HashCodeBuilder(29, 1597)
-               .append(keyspace)
-               .append(name)
                .append(baseTableId)
                .append(includeAllColumns)
                .append(whereClause)
@@ -120,8 +120,6 @@ public final class ViewMetadata
     public String toString()
     {
         return new ToStringBuilder(this)
-               .append("keyspace", keyspace)
-               .append("name", name)
                .append("baseTableId", baseTableId)
                .append("baseTableName", baseTableName)
                .append("includeAllColumns", includeAllColumns)
@@ -130,68 +128,37 @@ public final class ViewMetadata
                .toString();
     }
 
-    /**
-     * Replace the column 'from' with 'to' in this materialized view definition's partition,
-     * clustering, or included columns.
-     * @param from the existing column
-     * @param to the new column
-     */
-    public ViewMetadata renamePrimaryKeyColumn(ColumnIdentifier from, ColumnIdentifier to)
+    public boolean referencesUserType(ByteBuffer name)
+    {
+        return metadata.referencesUserType(name);
+    }
+
+    public ViewMetadata withUpdatedUserType(UserType udt)
+    {
+        return referencesUserType(udt.name)
+             ? copy(metadata.withUpdatedUserType(udt))
+             : this;
+    }
+
+    public ViewMetadata withRenamedPrimaryKeyColumn(ColumnIdentifier from, ColumnIdentifier to)
     {
         // convert whereClause to Relations, rename ids in Relations, then convert back to whereClause
-        List<Relation> relations = whereClauseToRelations(whereClause);
-        ColumnMetadata.Raw fromRaw = ColumnMetadata.Raw.forQuoted(from.toString());
-        ColumnMetadata.Raw toRaw = ColumnMetadata.Raw.forQuoted(to.toString());
-        List<Relation> newRelations =
-            relations.stream()
-                     .map(r -> r.renameIdentifier(fromRaw, toRaw))
-                     .collect(Collectors.toList());
-
-        String rawSelect = View.buildSelectStatement(baseTableName, metadata.columns(), whereClause);
-
-        return new ViewMetadata(keyspace,
-                                name,
-                                baseTableId,
+        ColumnMetadata.Raw rawFrom = ColumnMetadata.Raw.forQuoted(from.toString());
+        ColumnMetadata.Raw rawTo = ColumnMetadata.Raw.forQuoted(to.toString());
+
+        return new ViewMetadata(baseTableId,
                                 baseTableName,
                                 includeAllColumns,
-                                (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect),
-                                View.relationsToWhereClause(newRelations),
+                                whereClause.renameIdentifier(rawFrom, rawTo),
                                 metadata.unbuild().renamePrimaryKeyColumn(from, to).build());
     }
 
     public ViewMetadata withAddedRegularColumn(ColumnMetadata column)
     {
-        return new ViewMetadata(keyspace,
-                                name,
-                                baseTableId,
+        return new ViewMetadata(baseTableId,
                                 baseTableName,
                                 includeAllColumns,
-                                select,
                                 whereClause,
                                 metadata.unbuild().addColumn(column).build());
     }
-
-    public ViewMetadata withAlteredColumnType(ColumnIdentifier name, AbstractType<?> type)
-    {
-        return new ViewMetadata(keyspace,
-                                this.name,
-                                baseTableId,
-                                baseTableName,
-                                includeAllColumns,
-                                select,
-                                whereClause,
-                                metadata.unbuild().alterColumnType(name, type).build());
-    }
-
-    private static List<Relation> whereClauseToRelations(String whereClause)
-    {
-        try
-        {
-            return CQLFragmentParser.parseAnyUnhandled(CqlParser::whereClause, whereClause).build().relations;
-        }
-        catch (RecognitionException | SyntaxException exc)
-        {
-            throw new RuntimeException("Unexpected error parsing materialized view's where clause while handling column rename: ", exc);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Views.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Views.java b/src/java/org/apache/cassandra/schema/Views.java
index 5765433..07cd8f2 100644
--- a/src/java/org/apache/cassandra/schema/Views.java
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -21,24 +21,26 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
+import java.util.function.Predicate;
 
 import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
 
-import static com.google.common.collect.Iterables.filter;
+import org.apache.cassandra.db.marshal.UserType;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.transform;
 
 public final class Views implements Iterable<ViewMetadata>
 {
+    private static final Views NONE = builder().build();
+
     private final ImmutableMap<String, ViewMetadata> views;
 
     private Views(Builder builder)
     {
-        views = builder.views.build();
+        views = ImmutableMap.copyOf(builder.views);
     }
 
     public static Builder builder()
@@ -46,9 +48,14 @@ public final class Views implements Iterable<ViewMetadata>
         return new Builder();
     }
 
+    public Builder unbuild()
+    {
+        return builder().put(this);
+    }
+
     public static Views none()
     {
-        return builder().build();
+        return NONE;
     }
 
     public Iterator<ViewMetadata> iterator()
@@ -56,7 +63,7 @@ public final class Views implements Iterable<ViewMetadata>
         return views.values().iterator();
     }
 
-    public Iterable<TableMetadata> metadatas()
+    Iterable<TableMetadata> allTableMetadata()
     {
         return Iterables.transform(views.values(), view -> view.metadata);
     }
@@ -71,9 +78,9 @@ public final class Views implements Iterable<ViewMetadata>
         return views.isEmpty();
     }
 
-    public Iterable<ViewMetadata> forTable(UUID tableId)
+    public Iterable<ViewMetadata> forTable(TableId tableId)
     {
-        return Iterables.filter(this, v -> v.baseTableId.asUUID().equals(tableId));
+        return Iterables.filter(this, v -> v.baseTableId.equals(tableId));
     }
 
     /**
@@ -99,20 +106,32 @@ public final class Views implements Iterable<ViewMetadata>
         return views.get(name);
     }
 
+    boolean containsView(String name)
+    {
+        return views.containsKey(name);
+    }
+
+    Views filter(Predicate<ViewMetadata> predicate)
+    {
+        Builder builder = builder();
+        views.values().stream().filter(predicate).forEach(builder::put);
+        return builder.build();
+    }
+
     /**
      * Create a MaterializedViews instance with the provided materialized view added
      */
     public Views with(ViewMetadata view)
     {
-        if (get(view.name).isPresent())
-            throw new IllegalStateException(String.format("Materialized View %s already exists", view.name));
+        if (get(view.name()).isPresent())
+            throw new IllegalStateException(String.format("Materialized View %s already exists", view.name()));
 
-        return builder().add(this).add(view).build();
+        return builder().put(this).put(view).build();
     }
 
     public Views withSwapped(ViewMetadata view)
     {
-        return without(view.name).with(view);
+        return without(view.name()).with(view);
     }
 
     /**
@@ -123,18 +142,14 @@ public final class Views implements Iterable<ViewMetadata>
         ViewMetadata materializedView =
             get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name)));
 
-        return builder().add(filter(this, v -> v != materializedView)).build();
+        return filter(v -> v != materializedView);
     }
 
-    MapDifference<TableId, ViewMetadata> diff(Views other)
+    Views withUpdatedUserTypes(UserType udt)
     {
-        Map<TableId, ViewMetadata> thisViews = new HashMap<>();
-        this.forEach(v -> thisViews.put(v.metadata.id, v));
-
-        Map<TableId, ViewMetadata> otherViews = new HashMap<>();
-        other.forEach(v -> otherViews.put(v.metadata.id, v));
-
-        return Maps.difference(thisViews, otherViews);
+        return any(this, v -> v.referencesUserType(udt.name))
+             ? builder().put(transform(this, v -> v.withUpdatedUserType(udt))).build()
+             : this;
     }
 
     @Override
@@ -157,7 +172,7 @@ public final class Views implements Iterable<ViewMetadata>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<String, ViewMetadata> views = new ImmutableMap.Builder<>();
+        final Map<String, ViewMetadata> views = new HashMap<>();
 
         private Builder()
         {
@@ -168,17 +183,61 @@ public final class Views implements Iterable<ViewMetadata>
             return new Views(this);
         }
 
+        public ViewMetadata get(String name)
+        {
+            return views.get(name);
+        }
+
+        public Builder put(ViewMetadata view)
+        {
+            views.put(view.name(), view);
+            return this;
+        }
 
-        public Builder add(ViewMetadata view)
+        public Builder remove(String name)
         {
-            views.put(view.name, view);
+            views.remove(name);
             return this;
         }
 
-        public Builder add(Iterable<ViewMetadata> views)
+        public Builder put(Iterable<ViewMetadata> views)
         {
-            views.forEach(this::add);
+            views.forEach(this::put);
             return this;
         }
     }
+
+    static ViewsDiff diff(Views before, Views after)
+    {
+        return ViewsDiff.diff(before, after);
+    }
+
+    static final class ViewsDiff extends Diff<Views, ViewMetadata>
+    {
+        private static final ViewsDiff NONE = new ViewsDiff(Views.none(), Views.none(), ImmutableList.of());
+
+        private ViewsDiff(Views created, Views dropped, ImmutableCollection<Altered<ViewMetadata>> altered)
+        {
+            super(created, dropped, altered);
+        }
+
+        private static ViewsDiff diff(Views before, Views after)
+        {
+            if (before == after)
+                return NONE;
+
+            Views created = after.filter(v -> !before.containsView(v.name()));
+            Views dropped = before.filter(v -> !after.containsView(v.name()));
+
+            ImmutableList.Builder<Altered<ViewMetadata>> altered = ImmutableList.builder();
+            before.forEach(viewBefore ->
+            {
+                ViewMetadata viewAfter = after.getNullable(viewBefore.name());
+                if (null != viewAfter)
+                    viewBefore.compare(viewAfter).ifPresent(kind -> altered.add(new Altered<>(viewBefore, viewAfter, kind)));
+            });
+
+            return new ViewsDiff(created, dropped, altered.build());
+        }
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org