You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/04 18:08:45 UTC

[1/4] cassandra git commit: Use CQL type names in schema metadata tables

Repository: cassandra
Updated Branches:
  refs/heads/trunk fd6e27a43 -> 1b876bc60


http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Types.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Types.java b/src/java/org/apache/cassandra/schema/Types.java
index 0a7bb4f..0d6e36d 100644
--- a/src/java/org/apache/cassandra/schema/Types.java
+++ b/src/java/org/apache/cassandra/schema/Types.java
@@ -18,37 +18,61 @@
 package org.apache.cassandra.schema;
 
 import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Optional;
+import java.util.*;
 
 import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
 
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.jgrapht.graph.DefaultEdge;
+import org.jgrapht.traverse.TopologicalOrderIterator;
 
 import static com.google.common.collect.Iterables.filter;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 /**
  * An immutable container for a keyspace's UDTs.
  */
 public final class Types implements Iterable<UserType>
 {
-    private final ImmutableMap<ByteBuffer, UserType> types;
+    private static final Types NONE = new Types(ImmutableMap.of());
+
+    private final Map<ByteBuffer, UserType> types;
 
     private Types(Builder builder)
     {
         types = builder.types.build();
     }
 
+    /*
+     * For use in RawBuilder::build only.
+     */
+    private Types(Map<ByteBuffer, UserType> types)
+    {
+        this.types = types;
+    }
+
     public static Builder builder()
     {
         return new Builder();
     }
 
+    public static RawBuilder rawBuilder(String keyspace)
+    {
+        return new RawBuilder(keyspace);
+    }
+
     public static Types none()
     {
-        return builder().build();
+        return NONE;
     }
 
     public static Types of(UserType... types)
@@ -106,6 +130,11 @@ public final class Types implements Iterable<UserType>
         return builder().add(filter(this, t -> t != type)).build();
     }
 
+    MapDifference<ByteBuffer, UserType> diff(Types other)
+    {
+        return Maps.difference(types, other.types);
+    }
+
     @Override
     public boolean equals(Object o)
     {
@@ -126,7 +155,7 @@ public final class Types implements Iterable<UserType>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<ByteBuffer, UserType> types = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<ByteBuffer, UserType> types = ImmutableMap.builder();
 
         private Builder()
         {
@@ -156,4 +185,100 @@ public final class Types implements Iterable<UserType>
             return this;
         }
     }
+
+    public static final class RawBuilder
+    {
+        final String keyspace;
+        final List<RawUDT> definitions;
+
+        private RawBuilder(String keyspace)
+        {
+            this.keyspace = keyspace;
+            this.definitions = new ArrayList<>();
+        }
+
+        /**
+         * Build a Types instance from Raw definitions.
+         *
+         * Constructs a DAG of graph dependencies and resolves them 1 by 1 in topological order.
+         */
+        public Types build()
+        {
+            if (definitions.isEmpty())
+                return Types.none();
+
+            /*
+             * build a DAG of UDT dependencies
+             */
+            DefaultDirectedGraph<RawUDT, DefaultEdge> graph = new DefaultDirectedGraph<>(DefaultEdge.class);
+
+            definitions.forEach(graph::addVertex);
+
+            for (RawUDT udt1: definitions)
+                for (RawUDT udt2 : definitions)
+                    if (udt1 != udt2 && udt1.referencesUserType(udt2.name))
+                        graph.addEdge(udt2, udt1);
+
+            /*
+             * iterate in topological order,
+             */
+            Types types = new Types(new HashMap<>());
+
+            TopologicalOrderIterator<RawUDT, DefaultEdge> iterator = new TopologicalOrderIterator<>(graph);
+            while (iterator.hasNext())
+            {
+                UserType udt = iterator.next().prepare(keyspace, types); // will throw InvalidRequestException if meets an unknown type
+                types.types.put(udt.name, udt);
+            }
+
+            /*
+             * return an immutable copy
+             */
+            return Types.builder().add(types).build();
+        }
+
+        void add(String name, List<String> fieldNames, List<String> fieldTypes)
+        {
+            List<CQL3Type.Raw> rawFieldTypes =
+                fieldTypes.stream()
+                          .map(CQLTypeParser::parseRaw)
+                          .collect(toList());
+
+            definitions.add(new RawUDT(name, fieldNames, rawFieldTypes));
+        }
+
+        private static final class RawUDT
+        {
+            final String name;
+            final List<String> fieldNames;
+            final List<CQL3Type.Raw> fieldTypes;
+
+            RawUDT(String name, List<String> fieldNames, List<CQL3Type.Raw> fieldTypes)
+            {
+                this.name = name;
+                this.fieldNames = fieldNames;
+                this.fieldTypes = fieldTypes;
+            }
+
+            boolean referencesUserType(String typeName)
+            {
+                return fieldTypes.stream().anyMatch(t -> t.referencesUserType(typeName));
+            }
+
+            UserType prepare(String keyspace, Types types)
+            {
+                List<ByteBuffer> preparedFieldNames =
+                    fieldNames.stream()
+                              .map(ByteBufferUtil::bytes)
+                              .collect(toList());
+
+                List<AbstractType<?>> preparedFieldTypes =
+                    fieldTypes.stream()
+                              .map(t -> t.prepare(keyspace, types).getType())
+                              .collect(toList());
+
+                return new UserType(keyspace, bytes(name), preparedFieldNames, preparedFieldTypes);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Views.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Views.java b/src/java/org/apache/cassandra/schema/Views.java
index 5888b9d..b8fdd4b 100644
--- a/src/java/org/apache/cassandra/schema/Views.java
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -26,6 +26,8 @@ import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ViewDefinition;
@@ -124,6 +126,11 @@ public final class Views implements Iterable<ViewDefinition>
         return without(view.viewName).with(view);
     }
 
+    MapDifference<String, ViewDefinition> diff(Views other)
+    {
+        return Maps.difference(views, other.views);
+    }
+
     @Override
     public boolean equals(Object o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index de6c7f7..6a21f91 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -468,20 +468,9 @@ public class MigrationManager
     private static void announce(Mutation schema, boolean announceLocally)
     {
         if (announceLocally)
-        {
-            try
-            {
-                SchemaKeyspace.mergeSchema(Collections.singletonList(schema));
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
+            SchemaKeyspace.mergeSchema(Collections.singletonList(schema));
         else
-        {
             FBUtilities.waitOnFuture(announce(Collections.singletonList(schema)));
-        }
     }
 
     private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema)
@@ -497,7 +486,7 @@ public class MigrationManager
     {
         Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
         {
-            protected void runMayThrow() throws IOException, ConfigurationException
+            protected void runMayThrow() throws ConfigurationException
             {
                 SchemaKeyspace.mergeSchemaAndAnnounceVersion(schema);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index 4e3fac3..8a1b858 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -74,10 +74,6 @@ class MigrationTask extends WrappedRunnable
                 {
                     SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
                 }
-                catch (IOException e)
-                {
-                    logger.error("IOException merging remote schema", e);
-                }
                 catch (ConfigurationException e)
                 {
                     logger.error("Configuration exception merging remote schema", e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index 3deb5fa..9d91df3 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -21,18 +21,23 @@ package org.apache.cassandra.config;
 import java.util.*;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.IndexType;
@@ -154,10 +159,18 @@ public class CFMetaDataTest
         PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.TABLES));
         PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS));
 
-        CFMetaData newCfm = SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition(
-                UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds()),
-                UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds())
-        );
-        assert cfm.equals(newCfm) : String.format("%n%s%n!=%n%s", cfm, newCfm);
+        UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.TABLES),
+                                                                 UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds()))
+                                                      .one();
+        TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
+
+        UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS),
+                                                                UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds()));
+        Set<ColumnDefinition> columns = new HashSet<>();
+        for (UntypedResultSet.Row row : columnsRows)
+            columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none()));
+
+        assertEquals(cfm.params, params);
+        assertEquals(new HashSet<>(cfm.allColumns()), columns);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
index 0e7084f..9e3c51d 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
@@ -110,7 +110,7 @@ public class TupleTypeTest extends CQLTester
             row(0, 4, tuple(null, "1"))
         );
 
-        assertInvalidMessage("Invalid tuple literal: too many elements. Type tuple<int, text> expects 2 but got 3",
+        assertInvalidMessage("Invalid tuple literal: too many elements. Type frozen<tuple<int, text>> expects 2 but got 3",
                              "INSERT INTO %s(k, t) VALUES (1,'1:2:3')");
     }
 
@@ -121,7 +121,7 @@ public class TupleTypeTest extends CQLTester
 
         assertInvalidSyntax("INSERT INTO %s (k, t) VALUES (0, ())");
 
-        assertInvalidMessage("Invalid tuple literal for t: too many elements. Type tuple<int, text, double> expects 3 but got 4",
+        assertInvalidMessage("Invalid tuple literal for t: too many elements. Type frozen<tuple<int, text, double>> expects 3 but got 4",
                              "INSERT INTO %s (k, t) VALUES (0, (2, 'foo', 3.1, 'bar'))");
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index bf3d33f..78e85dc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -797,7 +797,7 @@ public class UFTest extends CQLTester
     @Test
     public void testFunctionNonExistingKeyspace() throws Throwable
     {
-        assertInvalidMessage("to non existing keyspace",
+        assertInvalidMessage("Keyspace this_ks_does_not_exist doesn't exist",
                              "CREATE OR REPLACE FUNCTION this_ks_does_not_exist.jnft(val double) " +
                              "RETURNS NULL ON NULL INPUT " +
                              "RETURNS double " +
@@ -810,7 +810,7 @@ public class UFTest extends CQLTester
     {
         dropPerTestKeyspace();
 
-        assertInvalidMessage("to non existing keyspace",
+        assertInvalidMessage("Keyspace " + KEYSPACE_PER_TEST + " doesn't exist",
                              "CREATE OR REPLACE FUNCTION " + KEYSPACE_PER_TEST + ".jnft(val double) " +
                              "RETURNS NULL ON NULL INPUT " +
                              "RETURNS double " +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index d069d56..bc382fb 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -63,8 +63,7 @@ public class LegacySchemaMigratorTest
     {
         CQLTester.cleanupAndLeaveDirs();
 
-        List<KeyspaceMetadata> expected = keyspaceToMigrate();
-        expected.sort((k1, k2) -> k1.name.compareTo(k2.name));
+        Keyspaces expected = keyspacesToMigrate();
 
         // write the keyspaces into the legacy tables
         expected.forEach(LegacySchemaMigratorTest::legacySerializeKeyspace);
@@ -73,8 +72,7 @@ public class LegacySchemaMigratorTest
         LegacySchemaMigrator.migrate();
 
         // read back all the metadata from the new schema tables
-        List<KeyspaceMetadata> actual = SchemaKeyspace.readSchemaFromSystemTables();
-        actual.sort((k1, k2) -> k1.name.compareTo(k2.name));
+        Keyspaces actual = SchemaKeyspace.fetchNonSystemKeyspaces();
 
         // need to load back CFMetaData of those tables (CFS instances will still be loaded)
         loadLegacySchemaTables();
@@ -104,9 +102,9 @@ public class LegacySchemaMigratorTest
         Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables));
     }
 
-    private static List<KeyspaceMetadata> keyspaceToMigrate()
+    private static Keyspaces keyspacesToMigrate()
     {
-        List<KeyspaceMetadata> keyspaces = new ArrayList<>();
+        Keyspaces.Builder keyspaces = Keyspaces.builder();
 
         // A whole bucket of shorthand
         String ks1 = KEYSPACE_PREFIX + "Keyspace1";
@@ -255,7 +253,7 @@ public class LegacySchemaMigratorTest
         keyspaces.add(keyspaceWithUDAs());
         keyspaces.add(keyspaceWithUDAsAndUDTs());
 
-        return keyspaces;
+        return keyspaces.build();
     }
 
     private static KeyspaceMetadata keyspaceWithDroppedCollections()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 3eb4faf..a1b7ad3 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -32,11 +33,13 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -158,14 +161,14 @@ public class SchemaKeyspaceTest
         assertEquals(extensions, metadata.params.extensions);
     }
 
-    private static void updateTable(String keyspace, CFMetaData oldTable, CFMetaData newTable) throws IOException
+    private static void updateTable(String keyspace, CFMetaData oldTable, CFMetaData newTable)
     {
         KeyspaceMetadata ksm = Schema.instance.getKeyspaceInstance(keyspace).getMetadata();
         Mutation mutation = SchemaKeyspace.makeUpdateTableMutation(ksm, oldTable, newTable, FBUtilities.timestampMicros(), false);
         SchemaKeyspace.mergeSchema(Collections.singleton(mutation));
     }
 
-    private static void createTable(String keyspace, String cql) throws IOException
+    private static void createTable(String keyspace, String cql)
     {
         CFMetaData table = CFMetaData.compile(cql, keyspace);
 
@@ -185,10 +188,21 @@ public class SchemaKeyspaceTest
 
         // Test schema conversion
         Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros());
-        PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SystemKeyspace.NAME, SchemaKeyspace.TABLES));
-        PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SystemKeyspace.NAME, SchemaKeyspace.COLUMNS));
-        CFMetaData newCfm = SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition(UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds()),
-                                                                                            UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds()));
-        assert cfm.equals(newCfm) : String.format("%n%s%n!=%n%s", cfm, newCfm);
+        PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.TABLES));
+        PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS));
+
+        UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.TABLES),
+                                                                 UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds()))
+                                                      .one();
+        TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
+
+        UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS),
+                                                                UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds()));
+        Set<ColumnDefinition> columns = new HashSet<>();
+        for (UntypedResultSet.Row row : columnsRows)
+            columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none()));
+
+        assertEquals(cfm.params, params);
+        assertEquals(new HashSet<>(cfm.allColumns()), columns);
     }
 }


[4/4] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b876bc6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b876bc6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b876bc6

Branch: refs/heads/trunk
Commit: 1b876bc6004356295ad64257f63718d099a3aaf7
Parents: fd6e27a 340df43
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Nov 4 17:08:43 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 4 17:08:43 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 NOTICE.txt                                      |    4 +
 build.xml                                       |    9 +-
 ...assandra-driver-core-3.0.0-alpha4-shaded.jar |  Bin 2275541 -> 0 bytes
 ...core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar |  Bin 0 -> 2284484 bytes
 ...iver-internal-only-3.0.0a2.post0-95c6008.zip |  Bin 233564 -> 0 bytes
 ...iver-internal-only-3.0.0a3.post0-a983923.zip |  Bin 0 -> 229492 bytes
 lib/jgrapht-core-0.9.1.jar                      |  Bin 0 -> 351208 bytes
 lib/licenses/jgrapht-core-0.9.1.txt             |  227 +++
 .../org/apache/cassandra/config/CFMetaData.java |   12 +-
 .../org/apache/cassandra/config/Schema.java     |   44 +-
 .../org/apache/cassandra/cql3/CQL3Type.java     |   77 +-
 .../apache/cassandra/cql3/ColumnIdentifier.java |    2 +-
 .../cql3/statements/CreateTableStatement.java   |   12 +-
 .../db/DefinitionsUpdateVerbHandler.java        |    3 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |    1 -
 .../apache/cassandra/db/marshal/EmptyType.java  |    7 +
 .../apache/cassandra/db/marshal/TypeParser.java |   45 -
 .../cassandra/io/sstable/CQLSSTableWriter.java  |   14 +-
 .../apache/cassandra/schema/CQLTypeParser.java  |   93 ++
 .../org/apache/cassandra/schema/Keyspaces.java  |  124 ++
 .../apache/cassandra/schema/SchemaKeyspace.java | 1422 +++++++-----------
 .../org/apache/cassandra/schema/Tables.java     |    7 +
 src/java/org/apache/cassandra/schema/Types.java |  135 +-
 src/java/org/apache/cassandra/schema/Views.java |    7 +
 .../cassandra/service/MigrationManager.java     |   15 +-
 .../apache/cassandra/service/MigrationTask.java |    4 -
 .../apache/cassandra/config/CFMetaDataTest.java |   29 +-
 .../cql3/validation/entities/TupleTypeTest.java |    4 +-
 .../cql3/validation/entities/UFTest.java        |    4 +-
 .../schema/LegacySchemaMigratorTest.java        |   12 +-
 .../cassandra/schema/SchemaKeyspaceTest.java    |   30 +-
 32 files changed, 1318 insertions(+), 1026 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b876bc6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b5ded10,1914fa1..5e2826c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
 +3.2
 + * Added graphing option to cassandra-stress (CASSANDRA-7918)
 + * Abort in-progress queries that time out (CASSANDRA-7392)
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +
 +
  3.0
+  * Use CQL type names in schema metadata tables (CASSANDRA-10365)
   * Guard batchlog replay against integer division by zero (CASSANDRA-9223)
   * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
   * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b876bc6/build.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b876bc6/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b876bc6/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------


[3/4] cassandra git commit: Use CQL type names in schema metadata tables

Posted by al...@apache.org.
Use CQL type names in schema metadata tables

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-10365


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/340df43f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/340df43f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/340df43f

Branch: refs/heads/trunk
Commit: 340df43fb74f7f3ef021d10ad1b4510636ee3f14
Parents: f4af154
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Mon Sep 21 12:02:53 2015 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 4 17:06:07 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 NOTICE.txt                                      |    4 +
 build.xml                                       |    9 +-
 ...assandra-driver-core-3.0.0-alpha4-shaded.jar |  Bin 2275541 -> 0 bytes
 ...core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar |  Bin 0 -> 2284484 bytes
 ...iver-internal-only-3.0.0a2.post0-95c6008.zip |  Bin 233564 -> 0 bytes
 ...iver-internal-only-3.0.0a3.post0-a983923.zip |  Bin 0 -> 229492 bytes
 lib/jgrapht-core-0.9.1.jar                      |  Bin 0 -> 351208 bytes
 lib/licenses/jgrapht-core-0.9.1.txt             |  227 +++
 .../org/apache/cassandra/config/CFMetaData.java |   12 +-
 .../org/apache/cassandra/config/Schema.java     |   44 +-
 .../org/apache/cassandra/cql3/CQL3Type.java     |   77 +-
 .../apache/cassandra/cql3/ColumnIdentifier.java |    2 +-
 .../cql3/statements/CreateTableStatement.java   |   12 +-
 .../db/DefinitionsUpdateVerbHandler.java        |    3 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |    1 -
 .../apache/cassandra/db/marshal/EmptyType.java  |    7 +
 .../apache/cassandra/db/marshal/TypeParser.java |   45 -
 .../cassandra/io/sstable/CQLSSTableWriter.java  |   14 +-
 .../apache/cassandra/schema/CQLTypeParser.java  |   93 ++
 .../org/apache/cassandra/schema/Keyspaces.java  |  124 ++
 .../apache/cassandra/schema/SchemaKeyspace.java | 1421 +++++++-----------
 .../org/apache/cassandra/schema/Tables.java     |    7 +
 src/java/org/apache/cassandra/schema/Types.java |  135 +-
 src/java/org/apache/cassandra/schema/Views.java |    7 +
 .../cassandra/service/MigrationManager.java     |   15 +-
 .../apache/cassandra/service/MigrationTask.java |    4 -
 .../apache/cassandra/config/CFMetaDataTest.java |   29 +-
 .../cql3/validation/entities/TupleTypeTest.java |    4 +-
 .../cql3/validation/entities/UFTest.java        |    4 +-
 .../schema/LegacySchemaMigratorTest.java        |   12 +-
 .../cassandra/schema/SchemaKeyspaceTest.java    |   30 +-
 32 files changed, 1318 insertions(+), 1025 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eb575e8..1914fa1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Use CQL type names in schema metadata tables (CASSANDRA-10365)
  * Guard batchlog replay against integer division by zero (CASSANDRA-9223)
  * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
  * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index a20994f..b880183 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -83,3 +83,7 @@ BSD 3-clause
 ASM
 (http://asm.ow2.org/)
 Copyright (c) 2000-2011 INRIA, France Telecom
+
+JGraphT
+(http://jgrapht.org)
+Copyright 2003-2015, by Barak Naveh and Contributors.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index c8707cd..261d0e6 100644
--- a/build.xml
+++ b/build.xml
@@ -406,6 +406,7 @@
           <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
           <dependency groupId="io.airlift" artifactId="airline" version="0.6" />
           <dependency groupId="io.netty" artifactId="netty-all" version="4.0.23.Final" />
+          <dependency groupId="org.jgrapht" artifactId="jgrapht-core" version="0.9.1" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
           <!-- TODO CASSANDRA-9543
@@ -561,16 +562,14 @@
 
         <!-- don't need jna to run, but nice to have -->
         <dependency groupId="net.java.dev.jna" artifactId="jna"/>
-        
+
         <!-- don't need jamm unless running a server in which case it needs to be a -javagent to be used anyway -->
         <dependency groupId="com.github.jbellis" artifactId="jamm"/>
 
         <dependency groupId="io.netty" artifactId="netty-all"/>
-      	
-      	<dependency groupId="joda-time" artifactId="joda-time"/>
-
+        <dependency groupId="org.jgrapht" artifactId="jgrapht-core"/>
+        <dependency groupId="joda-time" artifactId="joda-time"/>
         <dependency groupId="org.fusesource" artifactId="sigar"/>
-      	
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" />
       </artifact:pom>
       <artifact:pom id="thrift-pom"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/cassandra-driver-core-3.0.0-alpha4-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha4-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha4-shaded.jar
deleted file mode 100644
index 9a4921e..0000000
Binary files a/lib/cassandra-driver-core-3.0.0-alpha4-shaded.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/cassandra-driver-core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..2026c52
Binary files /dev/null and b/lib/cassandra-driver-core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip
deleted file mode 100644
index da7fa0d..0000000
Binary files a/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/cassandra-driver-internal-only-3.0.0a3.post0-a983923.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a3.post0-a983923.zip b/lib/cassandra-driver-internal-only-3.0.0a3.post0-a983923.zip
new file mode 100644
index 0000000..66ec36e
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.0.0a3.post0-a983923.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/jgrapht-core-0.9.1.jar
----------------------------------------------------------------------
diff --git a/lib/jgrapht-core-0.9.1.jar b/lib/jgrapht-core-0.9.1.jar
new file mode 100644
index 0000000..f491e25
Binary files /dev/null and b/lib/jgrapht-core-0.9.1.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/licenses/jgrapht-core-0.9.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/jgrapht-core-0.9.1.txt b/lib/licenses/jgrapht-core-0.9.1.txt
new file mode 100644
index 0000000..5d80026
--- /dev/null
+++ b/lib/licenses/jgrapht-core-0.9.1.txt
@@ -0,0 +1,227 @@
+Eclipse Public License - v 1.0
+
+   THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
+   PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF
+   THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+   1. DEFINITIONS
+
+   "Contribution" means:
+
+   a) in the case of the initial Contributor, the initial code and
+   documentation distributed under this Agreement, and
+
+   b) in the case of each subsequent Contributor:
+
+   i) changes to the Program, and
+
+   ii) additions to the Program;
+
+   where such changes and/or additions to the Program originate from and
+   are distributed by that particular Contributor. A Contribution
+   'originates' from a Contributor if it was added to the Program by such
+   Contributor itself or anyone acting on such Contributor's behalf.
+   Contributions do not include additions to the Program which: (i) are
+   separate modules of software distributed in conjunction with the
+   Program under their own license agreement, and (ii) are not derivative
+   works of the Program.
+
+   "Contributor" means any person or entity that distributes the Program.
+
+   "Licensed Patents" mean patent claims licensable by a Contributor which
+   are necessarily infringed by the use or sale of its Contribution alone
+   or when combined with the Program.
+
+   "Program" means the Contributions distributed in accordance with this
+   Agreement.
+
+   "Recipient" means anyone who receives the Program under this Agreement,
+   including all Contributors.
+
+   2. GRANT OF RIGHTS
+
+   a) Subject to the terms of this Agreement, each Contributor hereby
+   grants Recipient a non-exclusive, worldwide, royalty-free copyright
+   license to reproduce, prepare derivative works of, publicly display,
+   publicly perform, distribute and sublicense the Contribution of such
+   Contributor, if any, and such derivative works, in source code and
+   object code form.
+
+   b) Subject to the terms of this Agreement, each Contributor hereby
+   grants Recipient a non-exclusive, worldwide, royalty-free patent
+   license under Licensed Patents to make, use, sell, offer to sell,
+   import and otherwise transfer the Contribution of such Contributor, if
+   any, in source code and object code form. This patent license shall
+   apply to the combination of the Contribution and the Program if, at the
+   time the Contribution is added by the Contributor, such addition of the
+   Contribution causes such combination to be covered by the Licensed
+   Patents. The patent license shall not apply to any other combinations
+   which include the Contribution. No hardware per se is licensed
+   hereunder.
+
+   c) Recipient understands that although each Contributor grants the
+   licenses to its Contributions set forth herein, no assurances are
+   provided by any Contributor that the Program does not infringe the
+   patent or other intellectual property rights of any other entity. Each
+   Contributor disclaims any liability to Recipient for claims brought by
+   any other entity based on infringement of intellectual property rights
+   or otherwise. As a condition to exercising the rights and licenses
+   granted hereunder, each Recipient hereby assumes sole responsibility to
+   secure any other intellectual property rights needed, if any. For
+   example, if a third party patent license is required to allow Recipient
+   to distribute the Program, it is Recipient's responsibility to acquire
+   that license before distributing the Program.
+
+   d) Each Contributor represents that to its knowledge it has sufficient
+   copyright rights in its Contribution, if any, to grant the copyright
+   license set forth in this Agreement.
+
+   3. REQUIREMENTS
+
+   A Contributor may choose to distribute the Program in object code form
+   under its own license agreement, provided that:
+
+   a) it complies with the terms and conditions of this Agreement; and
+
+   b) its license agreement:
+
+   i) effectively disclaims on behalf of all Contributors all warranties
+   and conditions, express and implied, including warranties or conditions
+   of title and non-infringement, and implied warranties or conditions of
+   merchantability and fitness for a particular purpose;
+
+   ii) effectively excludes on behalf of all Contributors all liability
+   for damages, including direct, indirect, special, incidental and
+   consequential damages, such as lost profits;
+
+   iii) states that any provisions which differ from this Agreement are
+   offered by that Contributor alone and not by any other party; and
+
+   iv) states that source code for the Program is available from such
+   Contributor, and informs licensees how to obtain it in a reasonable
+   manner on or through a medium customarily used for software exchange.
+
+   When the Program is made available in source code form:
+
+   a) it must be made available under this Agreement; and
+
+   b) a copy of this Agreement must be included with each copy of the
+   Program.
+
+   Contributors may not remove or alter any copyright notices contained
+   within the Program.
+
+   Each Contributor must identify itself as the originator of its
+   Contribution, if any, in a manner that reasonably allows subsequent
+   Recipients to identify the originator of the Contribution.
+
+   4. COMMERCIAL DISTRIBUTION
+
+   Commercial distributors of software may accept certain responsibilities
+   with respect to end users, business partners and the like. While this
+   license is intended to facilitate the commercial use of the Program,
+   the Contributor who includes the Program in a commercial product
+   offering should do so in a manner which does not create potential
+   liability for other Contributors. Therefore, if a Contributor includes
+   the Program in a commercial product offering, such Contributor
+   ("Commercial Contributor") hereby agrees to defend and indemnify every
+   other Contributor ("Indemnified Contributor") against any losses,
+   damages and costs (collectively "Losses") arising from claims, lawsuits
+   and other legal actions brought by a third party against the
+   Indemnified Contributor to the extent caused by the acts or omissions
+   of such Commercial Contributor in connection with its distribution of
+   the Program in a commercial product offering. The obligations in this
+   section do not apply to any claims or Losses relating to any actual or
+   alleged intellectual property infringement. In order to qualify, an
+   Indemnified Contributor must: a) promptly notify the Commercial
+   Contributor in writing of such claim, and b) allow the Commercial
+   Contributor to control, and cooperate with the Commercial Contributor
+   in, the defense and any related settlement negotiations. The
+   Indemnified Contributor may participate in any such claim at its own
+   expense.
+
+   For example, a Contributor might include the Program in a commercial
+   product offering, Product X. That Contributor is then a Commercial
+   Contributor. If that Commercial Contributor then makes performance
+   claims, or offers warranties related to Product X, those performance
+   claims and warranties are such Commercial Contributor's responsibility
+   alone. Under this section, the Commercial Contributor would have to
+   defend claims against the other Contributors related to those
+   performance claims and warranties, and if a court requires any other
+   Contributor to pay any damages as a result, the Commercial Contributor
+   must pay those damages.
+
+   5. NO WARRANTY
+
+   EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS
+   PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY
+   WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR
+   FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely responsible
+   for determining the appropriateness of using and distributing the
+   Program and assumes all risks associated with its exercise of rights
+   under this Agreement , including but not limited to the risks and costs
+   of program errors, compliance with applicable laws, damage to or loss
+   of data, programs or equipment, and unavailability or interruption of
+   operations.
+
+   6. DISCLAIMER OF LIABILITY
+
+   EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR
+   ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT,
+   INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING
+   WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF
+   LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+   NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR
+   DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED
+   HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
+
+   7. GENERAL
+
+   If any provision of this Agreement is invalid or unenforceable under
+   applicable law, it shall not affect the validity or enforceability of
+   the remainder of the terms of this Agreement, and without further
+   action by the parties hereto, such provision shall be reformed to the
+   minimum extent necessary to make such provision valid and enforceable.
+
+   If Recipient institutes patent litigation against any entity (including
+   a cross-claim or counterclaim in a lawsuit) alleging that the Program
+   itself (excluding combinations of the Program with other software or
+   hardware) infringes such Recipient's patent(s), then such Recipient's
+   rights granted under Section 2(b) shall terminate as of the date such
+   litigation is filed.
+
+   All Recipient's rights under this Agreement shall terminate if it fails
+   to comply with any of the material terms or conditions of this
+   Agreement and does not cure such failure in a reasonable period of time
+   after becoming aware of such noncompliance. If all Recipient's rights
+   under this Agreement terminate, Recipient agrees to cease use and
+   distribution of the Program as soon as reasonably practicable. However,
+   Recipient's obligations under this Agreement and any licenses granted
+   by Recipient relating to the Program shall continue and survive.
+
+   Everyone is permitted to copy and distribute copies of this Agreement,
+   but in order to avoid inconsistency the Agreement is copyrighted and
+   may only be modified in the following manner. The Agreement Steward
+   reserves the right to publish new versions (including revisions) of
+   this Agreement from time to time. No one other than the Agreement
+   Steward has the right to modify this Agreement. The Eclipse Foundation
+   is the initial Agreement Steward. The Eclipse Foundation may assign the
+   responsibility to serve as the Agreement Steward to a suitable separate
+   entity. Each new version of the Agreement will be given a
+   distinguishing version number. The Program (including Contributions)
+   may always be distributed subject to the version of the Agreement under
+   which it was received. In addition, after a new version of the
+   Agreement is published, Contributor may elect to distribute the Program
+   (including its Contributions) under the new version. Except as
+   expressly stated in Sections 2(a) and 2(b) above, Recipient receives no
+   rights or licenses to the intellectual property of any Contributor
+   under this Agreement, whether expressly, by implication, estoppel or
+   otherwise. All rights in the Program not expressly granted under this
+   Agreement are reserved.
+
+   This Agreement is governed by the laws of the State of New York and the
+   intellectual property laws of the United States of America. No party to
+   this Agreement will bring a legal action under this Agreement more than
+   one year after the cause of action arose. Each party waives its rights
+   to a jury trial in any resulting litigation.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 0387060..86f78eb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -401,7 +401,7 @@ public final class CFMetaData
     {
         CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(cql);
         parsed.prepareKeyspace(keyspace);
-        CreateTableStatement statement = (CreateTableStatement) parsed.prepare().statement;
+        CreateTableStatement statement = (CreateTableStatement) ((CreateTableStatement.RawStatement) parsed).prepare(Types.none()).statement;
 
         return statement.metadataBuilder()
                         .withId(generateLegacyCfId(keyspace, statement.columnFamily()))
@@ -721,16 +721,6 @@ public final class CFMetaData
     }
 
     /**
-     * Updates this object in place to match the definition in the system schema tables.
-     * @return true if any columns were added, removed, or altered; otherwise, false is returned
-     */
-    public boolean reload()
-    {
-        return apply(isView ? SchemaKeyspace.createViewFromName(ksName, cfName).metadata
-                            : SchemaKeyspace.createTableFromName(ksName, cfName));
-    }
-
-    /**
      * Updates CFMetaData in-place to match cfm
      *
      * @return true if any columns were added, removed, or altered; otherwise, false is returned

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index df4e984..117c5cd 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -122,7 +122,7 @@ public class Schema
      */
     public Schema loadFromDisk(boolean updateVersion)
     {
-        load(SchemaKeyspace.readSchemaFromSystemTables());
+        load(SchemaKeyspace.fetchNonSystemKeyspaces());
         if (updateVersion)
             updateVersion();
         return this;
@@ -135,7 +135,7 @@ public class Schema
      *
      * @return self to support chaining calls
      */
-    public Schema load(Collection<KeyspaceMetadata> keyspaceDefs)
+    public Schema load(Iterable<KeyspaceMetadata> keyspaceDefs)
     {
         keyspaceDefs.forEach(this::load);
         return this;
@@ -354,6 +354,16 @@ public class Schema
         return keyspaces.keySet();
     }
 
+    public Keyspaces getKeyspaces(Set<String> includedKeyspaceNames)
+    {
+        Keyspaces.Builder builder = Keyspaces.builder();
+        keyspaces.values()
+                 .stream()
+                 .filter(k -> includedKeyspaceNames.contains(k.name))
+                 .forEach(builder::add);
+        return builder.build();
+    }
+
     /**
      * Update (or insert) new keyspace definition
      *
@@ -611,15 +621,15 @@ public class Schema
         MigrationManager.instance.notifyCreateColumnFamily(cfm);
     }
 
-    public void updateTable(String ksName, String tableName)
+    public void updateTable(CFMetaData table)
     {
-        CFMetaData cfm = getCFMetaData(ksName, tableName);
-        assert cfm != null;
-        boolean columnsDidChange = cfm.reload();
+        CFMetaData current = getCFMetaData(table.ksName, table.cfName);
+        assert current != null;
+        boolean columnsDidChange = current.apply(table);
 
-        Keyspace keyspace = Keyspace.open(cfm.ksName);
-        keyspace.getColumnFamilyStore(cfm.cfName).reload();
-        MigrationManager.instance.notifyUpdateColumnFamily(cfm, columnsDidChange);
+        Keyspace keyspace = Keyspace.open(current.ksName);
+        keyspace.getColumnFamilyStore(current.cfName).reload();
+        MigrationManager.instance.notifyUpdateColumnFamily(current, columnsDidChange);
     }
 
     public void dropTable(String ksName, String tableName)
@@ -669,17 +679,15 @@ public class Schema
         MigrationManager.instance.notifyCreateView(view);
     }
 
-    public void updateView(String ksName, String viewName)
+    public void updateView(ViewDefinition view)
     {
-        Optional<ViewDefinition> optView = getKSMetaData(ksName).views.get(viewName);
-        assert optView.isPresent();
-        ViewDefinition view = optView.get();
-        boolean columnsDidChange = view.metadata.reload();
+        ViewDefinition current = getKSMetaData(view.ksName).views.get(view.viewName).get();
+        boolean columnsDidChange = current.metadata.apply(view.metadata);
 
-        Keyspace keyspace = Keyspace.open(view.ksName);
-        keyspace.getColumnFamilyStore(view.viewName).reload();
-        Keyspace.open(view.ksName).viewManager.update(view.viewName);
-        MigrationManager.instance.notifyUpdateView(view, columnsDidChange);
+        Keyspace keyspace = Keyspace.open(current.ksName);
+        keyspace.getColumnFamilyStore(current.viewName).reload();
+        Keyspace.open(current.ksName).viewManager.update(current.viewName);
+        MigrationManager.instance.notifyUpdateView(current, columnsDidChange);
     }
 
     public void dropView(String ksName, String viewName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index 749b989..7f5afa6 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -17,19 +17,21 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Types;
 
 public interface CQL3Type
 {
@@ -45,21 +47,22 @@ public interface CQL3Type
         BLOB        (BytesType.instance),
         BOOLEAN     (BooleanType.instance),
         COUNTER     (CounterColumnType.instance),
+        DATE        (SimpleDateType.instance),
         DECIMAL     (DecimalType.instance),
         DOUBLE      (DoubleType.instance),
+        EMPTY       (EmptyType.instance),
         FLOAT       (FloatType.instance),
         INET        (InetAddressType.instance),
         INT         (Int32Type.instance),
         SMALLINT    (ShortType.instance),
         TEXT        (UTF8Type.instance),
+        TIME        (TimeType.instance),
         TIMESTAMP   (TimestampType.instance),
+        TIMEUUID    (TimeUUIDType.instance),
         TINYINT     (ByteType.instance),
         UUID        (UUIDType.instance),
         VARCHAR     (UTF8Type.instance),
-        VARINT      (IntegerType.instance),
-        TIMEUUID    (TimeUUIDType.instance),
-        DATE        (SimpleDateType.instance),
-        TIME        (TimeType.instance);
+        VARINT      (IntegerType.instance);
 
         private final AbstractType<?> type;
 
@@ -243,7 +246,7 @@ public interface CQL3Type
         @Override
         public String toString()
         {
-            return name;
+            return "frozen<" + ColumnIdentifier.maybeQuote(name) + '>';
         }
     }
 
@@ -291,14 +294,14 @@ public interface CQL3Type
         public String toString()
         {
             StringBuilder sb = new StringBuilder();
-            sb.append("tuple<");
+            sb.append("frozen<tuple<");
             for (int i = 0; i < type.size(); i++)
             {
                 if (i > 0)
                     sb.append(", ");
                 sb.append(type.type(i).asCQL3Type());
             }
-            sb.append(">");
+            sb.append(">>");
             return sb.toString();
         }
     }
@@ -342,7 +345,20 @@ public interface CQL3Type
             throw new InvalidRequestException(message);
         }
 
-        public abstract CQL3Type prepare(String keyspace) throws InvalidRequestException;
+        public CQL3Type prepare(String keyspace)
+        {
+            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+            if (ksm == null)
+                throw new ConfigurationException(String.format("Keyspace %s doesn't exist", keyspace));
+            return prepare(keyspace, ksm.types);
+        }
+
+        public abstract CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException;
+
+        public boolean referencesUserType(String name)
+        {
+            return false;
+        }
 
         public static Raw from(CQL3Type type)
         {
@@ -382,14 +398,14 @@ public interface CQL3Type
 
         private static class RawType extends Raw
         {
-            private CQL3Type type;
+            private final CQL3Type type;
 
             private RawType(CQL3Type type)
             {
                 this.type = type;
             }
 
-            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException
             {
                 return type;
             }
@@ -443,7 +459,7 @@ public interface CQL3Type
                 return true;
             }
 
-            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException
             {
                 assert values != null : "Got null values type for a collection";
 
@@ -461,16 +477,21 @@ public interface CQL3Type
                 switch (kind)
                 {
                     case LIST:
-                        return new Collection(ListType.getInstance(values.prepare(keyspace).getType(), !frozen));
+                        return new Collection(ListType.getInstance(values.prepare(keyspace, udts).getType(), !frozen));
                     case SET:
-                        return new Collection(SetType.getInstance(values.prepare(keyspace).getType(), !frozen));
+                        return new Collection(SetType.getInstance(values.prepare(keyspace, udts).getType(), !frozen));
                     case MAP:
                         assert keys != null : "Got null keys type for a collection";
-                        return new Collection(MapType.getInstance(keys.prepare(keyspace).getType(), values.prepare(keyspace).getType(), !frozen));
+                        return new Collection(MapType.getInstance(keys.prepare(keyspace, udts).getType(), values.prepare(keyspace, udts).getType(), !frozen));
                 }
                 throw new AssertionError();
             }
 
+            public boolean referencesUserType(String name)
+            {
+                return (keys != null && keys.referencesUserType(name)) || values.referencesUserType(name);
+            }
+
             @Override
             public String toString()
             {
@@ -510,13 +531,13 @@ public interface CQL3Type
                 return false;
             }
 
-            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException
             {
                 if (name.hasKeyspace())
                 {
                     // The provided keyspace is the one of the current statement this is part of. If it's different from the keyspace of
                     // the UTName, we reject since we want to limit user types to their own keyspace (see #6643)
-                    if (keyspace != null && !SystemKeyspace.NAME.equals(name.getKeyspace()) && !keyspace.equals(name.getKeyspace()))
+                    if (!keyspace.equals(name.getKeyspace()))
                         throw new InvalidRequestException(String.format("Statement on keyspace %s cannot refer to a user type in keyspace %s; "
                                                                         + "user types can only be used in the keyspace they are defined in",
                                                                         keyspace, name.getKeyspace()));
@@ -526,10 +547,7 @@ public interface CQL3Type
                     name.setKeyspace(keyspace);
                 }
 
-                KeyspaceMetadata ksm = Schema.instance.getKSMetaData(name.getKeyspace());
-                if (ksm == null)
-                    throw new InvalidRequestException("Unknown keyspace " + name.getKeyspace());
-                UserType type = ksm.types.getNullable(name.getUserTypeName());
+                UserType type = udts.getNullable(name.getUserTypeName());
                 if (type == null)
                     throw new InvalidRequestException("Unknown type " + name);
 
@@ -539,6 +557,11 @@ public interface CQL3Type
                 return new UserDefined(name.toString(), type);
             }
 
+            public boolean referencesUserType(String name)
+            {
+                return this.name.getStringTypeName().equals(name);
+            }
+
             protected boolean supportsFreezing()
             {
                 return true;
@@ -573,14 +596,13 @@ public interface CQL3Type
             public void freeze() throws InvalidRequestException
             {
                 for (CQL3Type.Raw t : types)
-                {
                     if (t.supportsFreezing())
                         t.freeze();
-                }
+
                 frozen = true;
             }
 
-            public CQL3Type prepare(String keyspace) throws InvalidRequestException
+            public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException
             {
                 if (!frozen)
                     freeze();
@@ -591,11 +613,16 @@ public interface CQL3Type
                     if (t.isCounter())
                         throw new InvalidRequestException("Counters are not allowed inside tuples");
 
-                    ts.add(t.prepare(keyspace).getType());
+                    ts.add(t.prepare(keyspace, udts).getType());
                 }
                 return new Tuple(new TupleType(ts));
             }
 
+            public boolean referencesUserType(String name)
+            {
+                return types.stream().anyMatch(t -> t.referencesUserType(name));
+            }
+
             @Override
             public String toString()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index eb16f93..745e4f0 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -328,7 +328,7 @@ public class ColumnIdentifier extends org.apache.cassandra.cql3.selection.Select
         }
     }
 
-    private static String maybeQuote(String text)
+    static String maybeQuote(String text)
     {
         if (UNQUOTED_IDENTIFIER.matcher(text).matches())
             return text;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 8e1d6c6..a1947df 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -30,7 +30,9 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.QueryState;
@@ -193,6 +195,14 @@ public class CreateTableStatement extends SchemaAlteringStatement
          */
         public ParsedStatement.Prepared prepare() throws RequestValidationException
         {
+            KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace());
+            if (ksm == null)
+                throw new ConfigurationException(String.format("Keyspace %s doesn't exist", keyspace()));
+            return prepare(ksm.types);
+        }
+
+        public ParsedStatement.Prepared prepare(Types udts) throws RequestValidationException
+        {
             // Column family name
             if (!columnFamily().matches("\\w+"))
                 throw new InvalidRequestException(String.format("\"%s\" is not a valid table name (must be alphanumeric character or underscore only: [a-zA-Z_0-9]+)", columnFamily()));
@@ -212,7 +222,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
             for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
             {
                 ColumnIdentifier id = entry.getKey();
-                CQL3Type pt = entry.getValue().prepare(keyspace());
+                CQL3Type pt = entry.getValue().prepare(keyspace(), udts);
                 if (pt.isCollection() && ((CollectionType)pt.getType()).isMultiCell())
                     stmt.collections.put(id.bytes, (CollectionType)pt.getType());
                 if (entry.getValue().isCounter())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index b849f95..8b3e121 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.schema.SchemaKeyspace;
@@ -45,7 +46,7 @@ public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<Mut
 
         StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
         {
-            public void runMayThrow() throws Exception
+            public void runMayThrow() throws ConfigurationException
             {
                 SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 293f8a3..eb28b2c 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -377,7 +377,6 @@ public class Keyspace
             // re-initializing an existing CF.  This will happen if you cleared the schema
             // on this node and it's getting repopulated from the rest of the cluster.
             assert cfs.name.equals(cfName);
-            cfs.metadata.reload();
             cfs.reload();
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/db/marshal/EmptyType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/EmptyType.java b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
index 9cd7226..c653084 100644
--- a/src/java/org/apache/cassandra/db/marshal/EmptyType.java
+++ b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.Constants;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.serializers.TypeSerializer;
@@ -65,6 +66,12 @@ public class EmptyType extends AbstractType<Void>
         return new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER);
     }
 
+    @Override
+    public CQL3Type asCQL3Type()
+    {
+        return CQL3Type.Native.EMPTY;
+    }
+
     public TypeSerializer<Void> getSerializer()
     {
         return EmptySerializer.instance;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/db/marshal/TypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index faa678e..35d15ab 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -23,7 +23,6 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -94,50 +93,6 @@ public class TypeParser
         return parse(compareWith == null ? null : compareWith.toString());
     }
 
-    public static String parseCqlNativeType(String str)
-    {
-        return CQL3Type.Native.valueOf(str.trim().toUpperCase(Locale.ENGLISH)).getType().toString();
-    }
-
-    public static String parseCqlCollectionOrFrozenType(String str) throws SyntaxException
-    {
-        str = str.trim().toLowerCase();
-        switch (str)
-        {
-            case "map": return "MapType";
-            case "set": return "SetType";
-            case "list": return "ListType";
-            case "frozen": return "FrozenType";
-            default: throw new SyntaxException("Invalid type name" + str);
-        }
-    }
-
-    /**
-     * Turns user facing type names into Abstract Types, 'text' -> UTF8Type
-     */
-    public static AbstractType<?> parseCqlName(String str) throws SyntaxException, ConfigurationException
-    {
-        return parse(parseCqlNameRecurse(str));
-    }
-
-    private static String parseCqlNameRecurse(String str) throws SyntaxException
-    {
-        if (str.indexOf(',') >= 0 && (!str.contains("<") || (str.indexOf(',') < str.indexOf('<'))))
-        {
-            String[] parseString = str.split(",", 2);
-            return parseCqlNameRecurse(parseString[0]) + "," + parseCqlNameRecurse(parseString[1]);
-        }
-        else if (str.contains("<"))
-        {
-            String[] parseString = str.trim().split("<", 2);
-            return parseCqlCollectionOrFrozenType(parseString[0]) + "(" + parseCqlNameRecurse(parseString[1].substring(0, parseString[1].length()-1)) + ")";
-        }
-        else
-        {
-            return parseCqlNativeType(str);
-        }
-    }
-
     /**
      * Parse an AbstractType from current position of this parser.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 70380f4..0006a81 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -25,6 +25,7 @@ import java.util.*;
 
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
@@ -40,6 +41,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.Pair;
 
@@ -348,7 +350,7 @@ public class CQLSSTableWriter implements Closeable
             {
                 synchronized (CQLSSTableWriter.class)
                 {
-                    this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData();
+                    this.schema = getTableMetadata(schema);
 
                     // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly
                     // build the insert statement in using().
@@ -482,6 +484,16 @@ public class CQLSSTableWriter implements Closeable
             return this;
         }
 
+        private static CFMetaData getTableMetadata(String schema)
+        {
+            CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(schema);
+            // tables with UDTs are currently not supported by CQLSSTableWrite, so we just use Types.none(), for now
+            // see CASSANDRA-10624 for more details
+            CreateTableStatement statement = (CreateTableStatement) ((CreateTableStatement.RawStatement) parsed).prepare(Types.none()).statement;
+            statement.validate(ClientState.forInternalCalls());
+            return statement.getCFMetaData();
+        }
+
         private static <T extends CQLStatement> Pair<T, List<ColumnSpecification>> getStatement(String query, Class<T> klass, String type)
         {
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/CQLTypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CQLTypeParser.java b/src/java/org/apache/cassandra/schema/CQLTypeParser.java
new file mode 100644
index 0000000..87eebd7
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/CQLTypeParser.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.antlr.runtime.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.exceptions.SyntaxException;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public final class CQLTypeParser
+{
+    private static final ImmutableSet<String> PRIMITIVE_TYPES;
+
+    static
+    {
+        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+        for (CQL3Type.Native primitive : CQL3Type.Native.values())
+            builder.add(primitive.name().toLowerCase());
+        PRIMITIVE_TYPES = builder.build();
+    }
+
+    public static AbstractType<?> parse(String keyspace, String unparsed, Types userTypes)
+    {
+        String lowercased = unparsed.toLowerCase();
+
+        // fast path for the common case of a primitive type
+        if (PRIMITIVE_TYPES.contains(lowercased))
+            return CQL3Type.Native.valueOf(unparsed.toUpperCase()).getType();
+
+        // special-case top-level UDTs
+        UserType udt = userTypes.getNullable(bytes(lowercased));
+        if (udt != null)
+            return udt;
+
+        return parseRaw(unparsed).prepare(keyspace, userTypes).getType();
+    }
+
+    static CQL3Type.Raw parseRaw(String type)
+    {
+        try
+        {
+            // Lexer and parser
+            ErrorCollector errorCollector = new ErrorCollector(type);
+            CharStream stream = new ANTLRStringStream(type);
+            CqlLexer lexer = new CqlLexer(stream);
+            lexer.addErrorListener(errorCollector);
+
+            TokenStream tokenStream = new CommonTokenStream(lexer);
+            CqlParser parser = new CqlParser(tokenStream);
+            parser.addErrorListener(errorCollector);
+
+            // Parse the query string to a statement instance
+            CQL3Type.Raw rawType = parser.comparatorType();
+
+            // The errorCollector has queue up any errors that the lexer and parser may have encountered
+            // along the way, if necessary, we turn the last error into exceptions here.
+            errorCollector.throwFirstSyntaxError();
+
+            return rawType;
+        }
+        catch (RuntimeException re)
+        {
+            throw new SyntaxException(String.format("Failed parsing statement: [%s] reason: %s %s",
+                                                    type,
+                                                    re.getClass().getSimpleName(),
+                                                    re.getMessage()));
+        }
+        catch (RecognitionException e)
+        {
+            throw new SyntaxException("Invalid or malformed CQL type: " + e.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Keyspaces.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java b/src/java/org/apache/cassandra/schema/Keyspaces.java
new file mode 100644
index 0000000..8c0a63e
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Keyspaces.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.Iterator;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
+public final class Keyspaces implements Iterable<KeyspaceMetadata>
+{
+    private final ImmutableMap<String, KeyspaceMetadata> keyspaces;
+
+    private Keyspaces(Builder builder)
+    {
+        keyspaces = builder.keyspaces.build();
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static Keyspaces none()
+    {
+        return builder().build();
+    }
+
+    public static Keyspaces of(KeyspaceMetadata... keyspaces)
+    {
+        return builder().add(keyspaces).build();
+    }
+
+    public Iterator<KeyspaceMetadata> iterator()
+    {
+        return keyspaces.values().iterator();
+    }
+
+    public Stream<KeyspaceMetadata> stream()
+    {
+        return keyspaces.values().stream();
+    }
+
+    public Keyspaces filter(Predicate<KeyspaceMetadata> predicate)
+    {
+        Builder builder = builder();
+        stream().filter(predicate).forEach(builder::add);
+        return builder.build();
+    }
+
+    MapDifference<String, KeyspaceMetadata> diff(Keyspaces other)
+    {
+        return Maps.difference(keyspaces, other.keyspaces);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        return this == o || (o instanceof Keyspaces && keyspaces.equals(((Keyspaces) o).keyspaces));
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return keyspaces.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return keyspaces.values().toString();
+    }
+
+    public static final class Builder
+    {
+        private final ImmutableMap.Builder<String, KeyspaceMetadata> keyspaces = new ImmutableMap.Builder<>();
+
+        private Builder()
+        {
+        }
+
+        public Keyspaces build()
+        {
+            return new Keyspaces(this);
+        }
+
+        public Builder add(KeyspaceMetadata keyspace)
+        {
+            keyspaces.put(keyspace.name, keyspace);
+            return this;
+        }
+
+        public Builder add(KeyspaceMetadata... keyspaces)
+        {
+            for (KeyspaceMetadata keyspace : keyspaces)
+                add(keyspace);
+            return this;
+        }
+
+        public Builder add(Iterable<KeyspaceMetadata> keyspaces)
+        {
+            keyspaces.forEach(this::add);
+            return this;
+        }
+    }
+}


[2/4] cassandra git commit: Use CQL type names in schema metadata tables

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4ff8a23..e4e50a0 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -17,14 +17,12 @@
  */
 package org.apache.cassandra.schema;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
@@ -47,9 +45,14 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.Pair;
 
+import static java.lang.String.format;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.schema.CQLTypeParser.parse;
 
 /**
  * system_schema.* tables and methods for manipulating them.
@@ -77,7 +80,6 @@ public final class SchemaKeyspace
     public static final String AGGREGATES = "aggregates";
     public static final String INDEXES = "indexes";
 
-
     public static final List<String> ALL =
         ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
 
@@ -205,14 +207,13 @@ public final class SchemaKeyspace
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "function_name text,"
-                + "signature frozen<list<text>>,"
-                + "argument_names frozen<list<text>>,"
                 + "argument_types frozen<list<text>>,"
+                + "argument_names frozen<list<text>>,"
                 + "body text,"
                 + "language text,"
                 + "return_type text,"
                 + "called_on_null_input boolean,"
-                + "PRIMARY KEY ((keyspace_name), function_name, signature))");
+                + "PRIMARY KEY ((keyspace_name), function_name, argument_types))");
 
     private static final CFMetaData Aggregates =
         compile(AGGREGATES,
@@ -220,14 +221,13 @@ public final class SchemaKeyspace
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "aggregate_name text,"
-                + "signature frozen<list<text>>,"
                 + "argument_types frozen<list<text>>,"
                 + "final_func text,"
                 + "initcond blob,"
                 + "return_type text,"
                 + "state_func text,"
                 + "state_type text,"
-                + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
+                + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))");
 
     public static final List<CFMetaData> ALL_TABLE_METADATA =
         ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes);
@@ -267,35 +267,6 @@ public final class SchemaKeyspace
         makeCreateKeyspaceMutation(schema, timestamp + 1).apply();
     }
 
-    public static List<KeyspaceMetadata> readSchemaFromSystemTables()
-    {
-        ReadCommand cmd = getReadCommandForTableSchema(KEYSPACES);
-        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup))
-        {
-            List<KeyspaceMetadata> keyspaces = new ArrayList<>();
-
-            while (schema.hasNext())
-            {
-                try (RowIterator partition = schema.next())
-                {
-                    if (isSystemKeyspaceSchemaPartition(partition.partitionKey()))
-                        continue;
-
-                    DecoratedKey key = partition.partitionKey();
-
-                    readSchemaPartitionForKeyspaceAndApply(TYPES, key,
-                        types -> readSchemaPartitionForKeyspaceAndApply(TABLES, key,
-                        tables -> readSchemaPartitionForKeyspaceAndApply(VIEWS, key,
-                        views -> readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
-                        functions -> readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key,
-                        aggregates -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, views, types, functions, aggregates))))))
-                    );
-                }
-            }
-            return keyspaces;
-        }
-    }
-
     public static void truncate()
     {
         ALL.forEach(table -> getSchemaCFS(table).truncateBlocking());
@@ -397,336 +368,18 @@ public final class SchemaKeyspace
         }
     }
 
-    private static Map<DecoratedKey, FilteredPartition> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
-    {
-        Map<DecoratedKey, FilteredPartition> schema = new HashMap<>();
-
-        for (String keyspaceName : keyspaceNames)
-        {
-            // We don't to return the RowIterator directly because we should guarantee that this iterator
-            // will be closed, and putting it in a Map make that harder/more awkward.
-            readSchemaPartitionForKeyspaceAndApply(schemaTableName, keyspaceName,
-                partition -> {
-                    if (!partition.isEmpty())
-                        schema.put(partition.partitionKey(), FilteredPartition.create(partition));
-                    return null;
-                }
-            );
-        }
-
-        return schema;
-    }
-
     private static ByteBuffer getSchemaKSKey(String ksName)
     {
         return AsciiType.instance.fromString(ksName);
     }
 
-    private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function<RowIterator, T> fct)
-    {
-        return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSKey(keyspaceName), fct);
-    }
-
-    private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, ByteBuffer keyspaceKey, Function<RowIterator, T> fct)
-    {
-        ColumnFamilyStore store = getSchemaCFS(schemaTableName);
-        return readSchemaPartitionForKeyspaceAndApply(store, store.decorateKey(keyspaceKey), fct);
-    }
-
-    private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
-    {
-        return readSchemaPartitionForKeyspaceAndApply(getSchemaCFS(schemaTableName), keyspaceKey, fct);
-    }
-
-    private static <T> T readSchemaPartitionForKeyspaceAndApply(ColumnFamilyStore store, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
-    {
-        int nowInSec = FBUtilities.nowInSeconds();
-        try (OpOrder.Group op = store.readOrdering.start();
-             RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata, nowInSec, keyspaceKey)
-                                                                                             .queryMemtableAndDisk(store, op), nowInSec))
-        {
-            return fct.apply(partition);
-        }
-    }
-
-    private static <T> T readSchemaPartitionForTableAndApply(String schemaTableName, String keyspaceName, String tableName, Function<RowIterator, T> fct)
-    {
-        ColumnFamilyStore store = getSchemaCFS(schemaTableName);
-
-        ClusteringComparator comparator = store.metadata.comparator;
-        Slices slices = Slices.with(comparator, Slice.make(comparator, tableName));
-        int nowInSec = FBUtilities.nowInSeconds();
-        try (OpOrder.Group op = store.readOrdering.start();
-             RowIterator partition =  UnfilteredRowIterators.filter(SinglePartitionReadCommand.create(store.metadata, nowInSec, getSchemaKSKey(keyspaceName), slices)
-                                                                                              .queryMemtableAndDisk(store, op), nowInSec))
-        {
-            return fct.apply(partition);
-        }
-    }
-
     private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey)
     {
         return Schema.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey()));
     }
 
-    /**
-     * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
-     * (which also involves fs operations on add/drop ks/cf)
-     *
-     * @param mutations the schema changes to apply
-     *
-     * @throws ConfigurationException If one of metadata attributes has invalid value
-     * @throws IOException If data was corrupted during transportation or failed to apply fs operations
-     */
-    public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException, IOException
-    {
-        mergeSchema(mutations);
-        Schema.instance.updateVersionAndAnnounce();
-    }
-
-    public static synchronized void mergeSchema(Collection<Mutation> mutations) throws IOException
-    {
-        // compare before/after schemas of the affected keyspaces only
-        Set<String> keyspaces = new HashSet<>(mutations.size());
-        for (Mutation mutation : mutations)
-            keyspaces.add(ByteBufferUtil.string(mutation.key().getKey()));
-
-        // current state of the schema
-        Map<DecoratedKey, FilteredPartition> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldViews = readSchemaForKeyspaces(VIEWS, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldTypes = readSchemaForKeyspaces(TYPES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
-        Map<DecoratedKey, FilteredPartition> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
-
-        mutations.forEach(Mutation::apply);
-
-        if (FLUSH_SCHEMA_TABLES)
-            flush();
-
-        // with new data applied
-        Map<DecoratedKey, FilteredPartition> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newViews = readSchemaForKeyspaces(VIEWS, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newTypes = readSchemaForKeyspaces(TYPES, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
-        Map<DecoratedKey, FilteredPartition> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
-
-        Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
-        mergeTables(oldColumnFamilies, newColumnFamilies);
-        mergeViews(oldViews, newViews);
-        mergeTypes(oldTypes, newTypes);
-        mergeFunctions(oldFunctions, newFunctions);
-        mergeAggregates(oldAggregates, newAggregates);
-
-        // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
-        keyspacesToDrop.forEach(Schema.instance::dropKeyspace);
-    }
-
-    private static Set<String> mergeKeyspaces(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        for (FilteredPartition newPartition : after.values())
-        {
-            String name = AsciiType.instance.compose(newPartition.partitionKey().getKey());
-            KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(newPartition.rowIterator());
-
-            FilteredPartition oldPartition = before.remove(newPartition.partitionKey());
-            if (oldPartition == null || oldPartition.isEmpty())
-                Schema.instance.addKeyspace(KeyspaceMetadata.create(name, params));
-            else
-                Schema.instance.updateKeyspace(name, params);
-        }
-
-        // What's remain in old is those keyspace that are not in updated, i.e. the dropped ones.
-        return asKeyspaceNamesSet(before.keySet());
-    }
-
-    private static Set<String> asKeyspaceNamesSet(Set<DecoratedKey> keys)
-    {
-        Set<String> names = new HashSet<>(keys.size());
-        for (DecoratedKey key : keys)
-            names.add(AsciiType.instance.compose(key.getKey()));
-        return names;
-    }
-
-    private static void mergeTables(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                Schema.instance.dropTable(oldRow.getString("keyspace_name"), oldRow.getString("table_name"));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                Schema.instance.addTable(createTableFromTableRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
-            {
-                Schema.instance.updateTable(newRow.getString("keyspace_name"), newRow.getString("table_name"));
-            }
-        });
-    }
-
-    private static void mergeViews(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                Schema.instance.dropView(oldRow.getString("keyspace_name"), oldRow.getString("view_name"));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                Schema.instance.addView(createViewFromViewRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
-            {
-                Schema.instance.updateView(newRow.getString("keyspace_name"), newRow.getString("view_name"));
-            }
-        });
-    }
-
-    private static void mergeTypes(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                Schema.instance.dropType(createTypeFromRow(oldRow));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                Schema.instance.addType(createTypeFromRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
-            {
-                Schema.instance.updateType(createTypeFromRow(newRow));
-            }
-        });
-    }
-
-    private static void mergeFunctions(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                Schema.instance.dropFunction(createFunctionFromFunctionRow(oldRow));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                Schema.instance.addFunction(createFunctionFromFunctionRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
-            {
-                Schema.instance.updateFunction(createFunctionFromFunctionRow(newRow));
-            }
-        });
-    }
-
-    private static void mergeAggregates(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
-    {
-        diffSchema(before, after, new Differ()
-        {
-            public void onDropped(UntypedResultSet.Row oldRow)
-            {
-                Schema.instance.dropAggregate(createAggregateFromAggregateRow(oldRow));
-            }
-
-            public void onAdded(UntypedResultSet.Row newRow)
-            {
-                Schema.instance.addAggregate(createAggregateFromAggregateRow(newRow));
-            }
-
-            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
-            {
-                Schema.instance.updateAggregate(createAggregateFromAggregateRow(newRow));
-            }
-        });
-    }
-
-    public interface Differ
-    {
-        void onDropped(UntypedResultSet.Row oldRow);
-        void onAdded(UntypedResultSet.Row newRow);
-        void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow);
-    }
-
-    private static void diffSchema(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after, Differ differ)
-    {
-        for (FilteredPartition newPartition : after.values())
-        {
-            CFMetaData metadata = newPartition.metadata();
-            DecoratedKey key = newPartition.partitionKey();
-
-            FilteredPartition oldPartition = before.remove(key);
-
-            if (oldPartition == null || oldPartition.isEmpty())
-            {
-                // Means everything is to be added
-                for (Row row : newPartition)
-                    differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, row));
-                continue;
-            }
-
-            Iterator<Row> oldIter = oldPartition.iterator();
-            Iterator<Row> newIter = newPartition.iterator();
-
-            Row oldRow = oldIter.hasNext() ? oldIter.next() : null;
-            Row newRow = newIter.hasNext() ? newIter.next() : null;
-            while (oldRow != null && newRow != null)
-            {
-                int cmp = metadata.comparator.compare(oldRow.clustering(), newRow.clustering());
-                if (cmp < 0)
-                {
-                    differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
-                    oldRow = oldIter.hasNext() ? oldIter.next() : null;
-                }
-                else if (cmp > 0)
-                {
-
-                    differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
-                    newRow = newIter.hasNext() ? newIter.next() : null;
-                }
-                else
-                {
-                    if (!oldRow.equals(newRow))
-                        differ.onUpdated(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow), UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
-
-                    oldRow = oldIter.hasNext() ? oldIter.next() : null;
-                    newRow = newIter.hasNext() ? newIter.next() : null;
-                }
-            }
-
-            while (oldRow != null)
-            {
-                differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
-                oldRow = oldIter.hasNext() ? oldIter.next() : null;
-            }
-            while (newRow != null)
-            {
-                differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
-                newRow = newIter.hasNext() ? newIter.next() : null;
-            }
-        }
-
-        // What remains is those keys that were only in before.
-        for (FilteredPartition partition : before.values())
-            for (Row row : partition)
-                differ.onDropped(UntypedResultSet.Row.fromInternalRow(partition.metadata(), partition.partitionKey(), row));
-    }
-
     /*
-     * Keyspace metadata serialization/deserialization.
+     * Schema entities to mutations
      */
 
     public static Mutation makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp)
@@ -761,46 +414,6 @@ public final class SchemaKeyspace
         return mutation;
     }
 
-    private static KeyspaceMetadata createKeyspaceFromSchemaPartitions(RowIterator serializedParams,
-                                                                       RowIterator serializedTables,
-                                                                       RowIterator serializedViews,
-                                                                       RowIterator serializedTypes,
-                                                                       RowIterator serializedFunctions,
-                                                                       RowIterator serializedAggregates)
-    {
-        String name = AsciiType.instance.compose(serializedParams.partitionKey().getKey());
-
-        KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(serializedParams);
-        Tables tables = createTablesFromTablesPartition(serializedTables);
-        Views views = createViewsFromViewsPartition(serializedViews);
-        Types types = createTypesFromPartition(serializedTypes);
-
-        Collection<UDFunction> udfs = createFunctionsFromFunctionsPartition(serializedFunctions);
-        Functions functions = org.apache.cassandra.schema.Functions.builder().add(udfs).build();
-        functions = createAggregatesFromAggregatesPartition(functions, serializedAggregates);
-
-        return KeyspaceMetadata.create(name, params, tables, views, types, functions);
-    }
-
-    /**
-     * Deserialize only Keyspace attributes without nested tables or types
-     *
-     * @param partition Keyspace attributes in serialized form
-     */
-
-    private static KeyspaceParams createKeyspaceParamsFromSchemaPartition(RowIterator partition)
-    {
-        String query = String.format("SELECT * FROM %s.%s", NAME, KEYSPACES);
-        UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one();
-
-        return KeyspaceParams.create(row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()),
-                                     row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString()));
-    }
-
-    /*
-     * User type metadata serialization/deserialization.
-     */
-
     public static Mutation makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -813,8 +426,8 @@ public final class SchemaKeyspace
     {
         RowUpdateBuilder adder = new RowUpdateBuilder(Types, timestamp, mutation)
                                  .clustering(type.getNameAsString())
-                                 .frozenList("field_names", type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(Collectors.toList()))
-                                 .frozenList("field_types", type.fieldTypes().stream().map(AbstractType::toString).collect(Collectors.toList()));
+                                 .frozenList("field_names", type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(toList()))
+                                 .frozenList("field_types", type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList()));
 
         adder.build();
     }
@@ -838,36 +451,6 @@ public final class SchemaKeyspace
         return RowUpdateBuilder.deleteRow(Types, timestamp, mutation, type.name);
     }
 
-    private static Types createTypesFromPartition(RowIterator partition)
-    {
-        String query = String.format("SELECT * FROM %s.%s", NAME, TYPES);
-        Types.Builder types = org.apache.cassandra.schema.Types.builder();
-        QueryProcessor.resultify(query, partition).forEach(row -> types.add(createTypeFromRow(row)));
-        return types.build();
-    }
-
-    private static UserType createTypeFromRow(UntypedResultSet.Row row)
-    {
-        String keyspace = row.getString("keyspace_name");
-        ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
-        List<String> rawColumns = row.getFrozenList("field_names", UTF8Type.instance);
-        List<String> rawTypes = row.getFrozenList("field_types", UTF8Type.instance);
-
-        List<ByteBuffer> columns = new ArrayList<>(rawColumns.size());
-        for (String rawColumn : rawColumns)
-            columns.add(ByteBufferUtil.bytes(rawColumn));
-
-        List<AbstractType<?>> types = new ArrayList<>(rawTypes.size());
-        for (String rawType : rawTypes)
-            types.add(parseType(rawType));
-
-        return new UserType(keyspace, name, columns, types);
-    }
-
-    /*
-     * Table metadata serialization/deserialization.
-     */
-
     public static Mutation makeCreateTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -990,9 +573,7 @@ public final class SchemaKeyspace
 
         // updated indexes need to be updated
         for (MapDifference.ValueDifference<IndexMetadata> diff : indexesDiff.entriesDiffering().values())
-        {
             addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation);
-        }
 
         return mutation;
     }
@@ -1038,274 +619,59 @@ public final class SchemaKeyspace
         return mutation;
     }
 
-    public static CFMetaData createTableFromName(String keyspace, String table)
+    private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
     {
-        return readSchemaPartitionForTableAndApply(TABLES, keyspace, table, partition ->
-        {
-            if (partition.isEmpty())
-                throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table));
+        RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString());
+
+        AbstractType<?> type = column.type;
+        if (type instanceof ReversedType)
+            type = ((ReversedType) type).baseType;
 
-            return createTableFromTablePartition(partition);
-        });
+        adder.add("column_name_bytes", column.name.bytes)
+             .add("kind", column.kind.toString().toLowerCase())
+             .add("position", column.isOnAllComponents() ? ColumnDefinition.NO_POSITION : column.position())
+             .add("clustering_order", column.clusteringOrder().toString().toLowerCase())
+             .add("type", type.asCQL3Type().toString())
+             .build();
     }
 
-    /**
-     * Deserialize tables from low-level schema representation, all of them belong to the same keyspace
-     */
-    private static Tables createTablesFromTablesPartition(RowIterator partition)
+    private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
-        Tables.Builder tables = org.apache.cassandra.schema.Tables.builder();
-        QueryProcessor.resultify(query, partition).forEach(row -> tables.add(createTableFromTableRow(row)));
-        return tables.build();
+        // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
+        RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString());
     }
 
-    private static List<ColumnDefinition> createColumnsFromColumnsPartition(RowIterator serializedColumns)
+    private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, long timestamp, Mutation mutation)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, COLUMNS);
-        return createColumnsFromColumnRows(QueryProcessor.resultify(query, serializedColumns));
+        RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, timestamp, mutation).clustering(table.cfName, column.name);
+
+        adder.add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime)))
+             .add("type", expandUserTypes(column.type).asCQL3Type().toString())
+             .build();
     }
 
-    private static CFMetaData createTableFromTablePartition(RowIterator partition)
+    private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
-        return createTableFromTableRow(QueryProcessor.resultify(query, partition).one());
+        new RowUpdateBuilder(Triggers, timestamp, mutation)
+            .clustering(table.cfName, trigger.name)
+            .frozenMap("options", Collections.singletonMap("class", trigger.classOption))
+            .build();
     }
 
-    public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator tablePartition,
-                                                                              RowIterator columnsPartition)
+    private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation)
     {
-        List<ColumnDefinition> columns = createColumnsFromColumnsPartition(columnsPartition);
-        String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
-        return createTableFromTableRowAndColumns(QueryProcessor.resultify(query, tablePartition).one(), columns);
+        RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name);
     }
 
-    /**
-     * Deserialize table metadata from low-level representation
-     *
-     * @return Metadata deserialized from schema
-     */
-    private static CFMetaData createTableFromTableRow(UntypedResultSet.Row row)
+    public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
     {
-        String keyspace = row.getString("keyspace_name");
-        String table = row.getString("table_name");
-
-        List<ColumnDefinition> columns =
-            readSchemaPartitionForTableAndApply(COLUMNS, keyspace, table, SchemaKeyspace::createColumnsFromColumnsPartition);
-
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns =
-            readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, table, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition);
-
-        Triggers triggers =
-            readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition);
-
-        CFMetaData cfm = createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns)
-                                                                        .triggers(triggers);
-
-        // the CFMetaData itself is required to build the collection of indexes as
-        // the column definitions are needed because we store only the name each
-        // index's target columns and this is not enough to reconstruct a ColumnIdentifier
-        org.apache.cassandra.schema.Indexes indexes =
-            readSchemaPartitionForTableAndApply(INDEXES, keyspace, table, rowIterator -> createIndexesFromIndexesPartition(cfm, rowIterator));
-        cfm.indexes(indexes);
-
-        return cfm;
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addViewToSchemaMutation(view, timestamp, true, mutation);
+        return mutation;
     }
 
-    public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns)
-    {
-        String keyspace = row.getString("keyspace_name");
-        String table = row.getString("table_name");
-        UUID id = row.getUUID("id");
-
-        Set<CFMetaData.Flag> flags = row.has("flags")
-                                   ? CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance))
-                                   : Collections.emptySet();
-
-        boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
-        boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
-        boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
-        boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
-
-        return CFMetaData.create(keyspace,
-                                 table,
-                                 id,
-                                 isDense,
-                                 isCompound,
-                                 isSuper,
-                                 isCounter,
-                                 false,
-                                 columns,
-                                 DatabaseDescriptor.getPartitioner())
-                         .params(createTableParamsFromRow(row));
-    }
-
-    private static TableParams createTableParamsFromRow(UntypedResultSet.Row row)
-    {
-        TableParams.Builder builder = TableParams.builder();
-
-        builder.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
-               .caching(CachingParams.fromMap(row.getFrozenTextMap("caching")))
-               .comment(row.getString("comment"))
-               .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction")))
-               .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression")))
-               .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
-               .defaultTimeToLive(row.getInt("default_time_to_live"))
-               .gcGraceSeconds(row.getInt("gc_grace_seconds"))
-               .maxIndexInterval(row.getInt("max_index_interval"))
-               .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
-               .minIndexInterval(row.getInt("min_index_interval"))
-               .readRepairChance(row.getDouble("read_repair_chance"))
-               .crcCheckChance(row.getDouble("crc_check_chance"))
-               .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
-
-        if (row.has("extensions"))
-            builder.extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance));
-
-        return builder.build();
-    }
-
-    /*
-     * Column metadata serialization/deserialization.
-     */
-
-    private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString());
-
-        AbstractType<?> type = column.type;
-        if (type instanceof ReversedType)
-            type = ((ReversedType) type).baseType;
-
-        adder.add("column_name_bytes", column.name.bytes)
-             .add("kind", column.kind.toString().toLowerCase())
-             .add("position", column.isOnAllComponents() ? ColumnDefinition.NO_POSITION : column.position())
-             .add("clustering_order", column.clusteringOrder().toString().toLowerCase())
-             .add("type", type.toString())
-             .build();
-    }
-
-    private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
-    {
-        // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
-        RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString());
-    }
-
-    private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows)
-{
-        List<ColumnDefinition> columns = new ArrayList<>(rows.size());
-        rows.forEach(row -> columns.add(createColumnFromColumnRow(row)));
-        return columns;
-    }
-
-    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row)
-    {
-        String keyspace = row.getString("keyspace_name");
-        String table = row.getString("table_name");
-
-        ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));
-
-        ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase());
-
-        int position = row.getInt("position");
-        ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
-
-        AbstractType<?> type = parseType(row.getString("type"));
-        if (order == ClusteringOrder.DESC)
-            type = ReversedType.getInstance(type);
-
-        return new ColumnDefinition(keyspace, table, name, type, position, kind);
-    }
-
-    /*
-     * Dropped column metadata serialization/deserialization.
-     */
-
-    private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, timestamp, mutation).clustering(table.cfName, column.name);
-
-        adder.add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime)))
-             .add("type", column.type.toString())
-             .build();
-    }
-
-    private static Map<ByteBuffer, CFMetaData.DroppedColumn> createDroppedColumnsFromDroppedColumnsPartition(RowIterator serializedColumns)
-    {
-        String query = String.format("SELECT * FROM %s.%s", NAME, DROPPED_COLUMNS);
-        Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
-        for (CFMetaData.DroppedColumn column : createDroppedColumnsFromDroppedColumnRows(QueryProcessor.resultify(query, serializedColumns)))
-            columns.put(UTF8Type.instance.decompose(column.name), column);
-        return columns;
-    }
-
-    private static List<CFMetaData.DroppedColumn> createDroppedColumnsFromDroppedColumnRows(UntypedResultSet rows)
-    {
-        List<CFMetaData.DroppedColumn> columns = new ArrayList<>(rows.size());
-        rows.forEach(row -> columns.add(createDroppedColumnFromDroppedColumnRow(row)));
-        return columns;
-    }
-
-    private static CFMetaData.DroppedColumn createDroppedColumnFromDroppedColumnRow(UntypedResultSet.Row row)
-    {
-        String name = row.getString("column_name");
-        AbstractType<?> type = TypeParser.parse(row.getString("type"));
-        long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
-
-        return new CFMetaData.DroppedColumn(name, type, droppedTime);
-    }
-
-    /*
-     * Trigger metadata serialization/deserialization.
-     */
-
-    private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation)
-    {
-        new RowUpdateBuilder(Triggers, timestamp, mutation)
-            .clustering(table.cfName, trigger.name)
-            .frozenMap("options", Collections.singletonMap("class", trigger.classOption))
-            .build();
-    }
-
-    private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name);
-    }
-
-    /**
-     * Deserialize triggers from storage-level representation.
-     *
-     * @param partition storage-level partition containing the trigger definitions
-     * @return the list of processed TriggerDefinitions
-     */
-    private static Triggers createTriggersFromTriggersPartition(RowIterator partition)
-    {
-        Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
-        String query = String.format("SELECT * FROM %s.%s", NAME, TRIGGERS);
-        QueryProcessor.resultify(query, partition).forEach(row -> triggers.add(createTriggerFromTriggerRow(row)));
-        return triggers.build();
-    }
-
-    private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row)
-    {
-        String name = row.getString("trigger_name");
-        String classOption = row.getFrozenTextMap("options").get("class");
-        return new TriggerMetadata(name, classOption);
-    }
-
-    /*
-     * View metadata serialization/deserialization.
-     */
-
-    public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addViewToSchemaMutation(view, timestamp, true, mutation);
-        return mutation;
-    }
-
-    private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation)
+    private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation)
     {
         RowUpdateBuilder builder = new RowUpdateBuilder(Views, timestamp, mutation)
             .clustering(view.viewName);
@@ -1363,9 +729,7 @@ public final class SchemaKeyspace
 
         // columns that are no longer needed
         for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values())
-        {
             dropColumnFromSchemaMutation(oldView.metadata, column, timestamp, mutation);
-        }
 
         // newly added columns
         for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values())
@@ -1390,122 +754,303 @@ public final class SchemaKeyspace
         return mutation;
     }
 
-    public static ViewDefinition createViewFromName(String keyspace, String view)
+    private static void addIndexToSchemaMutation(CFMetaData table,
+                                                 IndexMetadata index,
+                                                 long timestamp,
+                                                 Mutation mutation)
     {
-        return readSchemaPartitionForTableAndApply(VIEWS, keyspace, view, partition ->
-        {
-            if (partition.isEmpty())
-                throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, view));
+        RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name);
 
-            return createViewFromViewPartition(partition);
-        });
+        builder.add("kind", index.kind.toString());
+        builder.frozenMap("options", index.options);
+        builder.build();
+    }
+
+    private static void dropIndexFromSchemaMutation(CFMetaData table,
+                                                    IndexMetadata index,
+                                                    long timestamp,
+                                                    Mutation mutation)
+    {
+        RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name);
     }
 
-    private static ViewDefinition createViewFromViewPartition(RowIterator partition)
+    private static void addUpdatedIndexToSchemaMutation(CFMetaData table,
+                                                        IndexMetadata index,
+                                                        long timestamp,
+                                                        Mutation mutation)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
-        return createViewFromViewRow(QueryProcessor.resultify(query, partition).one());
+        addIndexToSchemaMutation(table, index, timestamp, mutation);
     }
 
-    /**
-     * Deserialize views from storage-level representation.
-     *
-     * @param partition storage-level partition containing the view definitions
-     * @return the list of processed ViewDefinitions
+    public static Mutation makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addFunctionToSchemaMutation(function, timestamp, mutation);
+        return mutation;
+    }
+
+    static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
+    {
+        RowUpdateBuilder adder =
+            new RowUpdateBuilder(Functions, timestamp, mutation).clustering(function.name().name, functionArgumentsList(function));
+
+        adder.add("body", function.body())
+             .add("language", function.language())
+             .add("return_type", function.returnType().asCQL3Type().toString())
+             .add("called_on_null_input", function.isCalledOnNullInput())
+             .frozenList("argument_names", function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(toList()));
+
+        adder.build();
+    }
+
+    private static List<String> functionArgumentsList(AbstractFunction fun)
+    {
+        return fun.argTypes()
+                  .stream()
+                  .map(AbstractType::asCQL3Type)
+                  .map(CQL3Type::toString)
+                  .collect(toList());
+    }
+
+    public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionArgumentsList(function));
+    }
+
+    public static Mutation makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        addAggregateToSchemaMutation(aggregate, timestamp, mutation);
+        return mutation;
+    }
+
+    static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
+    {
+        RowUpdateBuilder adder =
+            new RowUpdateBuilder(Aggregates, timestamp, mutation) .clustering(aggregate.name().name, functionArgumentsList(aggregate));
+
+        adder.add("return_type", aggregate.returnType().asCQL3Type().toString())
+             .add("state_func", aggregate.stateFunction().name().name)
+             .add("state_type", aggregate.stateType() != null ? aggregate.stateType().asCQL3Type().toString() : null)
+             .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null)
+             .add("initcond", aggregate.initialCondition())
+             .build();
+    }
+
+    public static Mutation makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+        return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionArgumentsList(aggregate));
+    }
+
+    /*
+     * Fetching schema
      */
-    private static Views createViewsFromViewsPartition(RowIterator partition)
+
+    public static Keyspaces fetchNonSystemKeyspaces()
     {
-        Views.Builder views = org.apache.cassandra.schema.Views.builder();
-        String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+        return fetchKeyspacesWithout(Schema.SYSTEM_KEYSPACE_NAMES);
+    }
+
+    private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames)
+    {
+        String query = format("SELECT keyspace_name FROM %s.%s", NAME, KEYSPACES);
+
+        Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
+        for (UntypedResultSet.Row row : query(query))
         {
-            ViewDefinition view = createViewFromViewRow(row);
-            views.add(view);
+            String keyspaceName = row.getString("keyspace_name");
+            if (!excludedKeyspaceNames.contains(keyspaceName))
+                keyspaces.add(fetchKeyspace(keyspaceName));
         }
-        return views.build();
+        return keyspaces.build();
     }
 
-    private static ViewDefinition createViewFromViewRow(UntypedResultSet.Row row)
+    private static Keyspaces fetchKeyspacesOnly(Set<String> includedKeyspaceNames)
     {
-        String keyspace = row.getString("keyspace_name");
-        String view = row.getString("view_name");
-        UUID id = row.getUUID("id");
-        UUID baseTableId = row.getUUID("base_table_id");
-        String baseTableName = row.getString("base_table_name");
-        boolean includeAll = row.getBoolean("include_all_columns");
-        String whereClause = row.getString("where_clause");
+        /*
+         * We know the keyspace names we are going to query, but we still want to run the SELECT IN
+         * query, to filter out the keyspaces that had been dropped by the applied mutation set.
+         */
+        String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", NAME, KEYSPACES);
 
-        List<ColumnDefinition> columns =
-            readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, SchemaKeyspace::createColumnsFromColumnsPartition);
+        Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
+        for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames)))
+            keyspaces.add(fetchKeyspace(row.getString("keyspace_name")));
+        return keyspaces.build();
+    }
 
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns =
-            readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, view, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition);
+    private static KeyspaceMetadata fetchKeyspace(String keyspaceName)
+    {
+        KeyspaceParams params = fetchKeyspaceParams(keyspaceName);
+        Types types = fetchTypes(keyspaceName);
+        Tables tables = fetchTables(keyspaceName, types);
+        Views views = fetchViews(keyspaceName, types);
+        Functions functions = fetchFunctions(keyspaceName, types);
+        return KeyspaceMetadata.create(keyspaceName, params, tables, views, types, functions);
+    }
 
-        CFMetaData cfm = CFMetaData.create(keyspace,
-                                           view,
-                                           id,
-                                           false,
-                                           true,
-                                           false,
-                                           false,
-                                           true,
-                                           columns,
-                                           DatabaseDescriptor.getPartitioner())
-                                   .params(createTableParamsFromRow(row))
-                                   .droppedColumns(droppedColumns);
+    private static KeyspaceParams fetchKeyspaceParams(String keyspaceName)
+    {
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, KEYSPACES);
+
+        UntypedResultSet.Row row = query(query, keyspaceName).one();
+        boolean durableWrites = row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString());
+        Map<String, String> replication = row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString());
+        return KeyspaceParams.create(durableWrites, replication);
+    }
 
-        String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause);
-        SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+    private static Types fetchTypes(String keyspaceName)
+    {
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, TYPES);
 
-        return new ViewDefinition(keyspace, view, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm);
+        Types.RawBuilder types = org.apache.cassandra.schema.Types.rawBuilder(keyspaceName);
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+        {
+            String name = row.getString("type_name");
+            List<String> fieldNames = row.getFrozenList("field_names", UTF8Type.instance);
+            List<String> fieldTypes = row.getFrozenList("field_types", UTF8Type.instance);
+            types.add(name, fieldNames, fieldTypes);
+        }
+        return types.build();
     }
 
-    /*
-     * Secondary Index metadata serialization/deserialization.
-     */
+    private static Tables fetchTables(String keyspaceName, Types types)
+    {
+        String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", NAME, TABLES);
 
-    private static void addIndexToSchemaMutation(CFMetaData table,
-                                                 IndexMetadata index,
-                                                 long timestamp,
-                                                 Mutation mutation)
+        Tables.Builder tables = org.apache.cassandra.schema.Tables.builder();
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+            tables.add(fetchTable(keyspaceName, row.getString("table_name"), types));
+        return tables.build();
+    }
+
+    private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types)
     {
-        RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name);
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, TABLES);
+        UntypedResultSet rows = query(query, keyspaceName, tableName);
+        if (rows.isEmpty())
+            throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName));
+        UntypedResultSet.Row row = rows.one();
 
-        builder.add("kind", index.kind.toString());
-        builder.frozenMap("options", index.options);
-        builder.build();
+        UUID id = row.getUUID("id");
+
+        Set<CFMetaData.Flag> flags = CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance));
+
+        boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
+        boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
+        boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
+        boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+
+        List<ColumnDefinition> columns = fetchColumns(keyspaceName, tableName, types);
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, tableName);
+        Indexes indexes = fetchIndexes(keyspaceName, tableName);
+        Triggers triggers = fetchTriggers(keyspaceName, tableName);
+
+        return CFMetaData.create(keyspaceName,
+                                 tableName,
+                                 id,
+                                 isDense,
+                                 isCompound,
+                                 isSuper,
+                                 isCounter,
+                                 false,
+                                 columns,
+                                 DatabaseDescriptor.getPartitioner())
+                         .params(createTableParamsFromRow(row))
+                         .droppedColumns(droppedColumns)
+                         .indexes(indexes)
+                         .triggers(triggers);
+    }
+
+    public static TableParams createTableParamsFromRow(UntypedResultSet.Row row)
+    {
+        return TableParams.builder()
+                          .bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
+                          .caching(CachingParams.fromMap(row.getFrozenTextMap("caching")))
+                          .comment(row.getString("comment"))
+                          .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction")))
+                          .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression")))
+                          .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
+                          .defaultTimeToLive(row.getInt("default_time_to_live"))
+                          .extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance))
+                          .gcGraceSeconds(row.getInt("gc_grace_seconds"))
+                          .maxIndexInterval(row.getInt("max_index_interval"))
+                          .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
+                          .minIndexInterval(row.getInt("min_index_interval"))
+                          .readRepairChance(row.getDouble("read_repair_chance"))
+                          .crcCheckChance(row.getDouble("crc_check_chance"))
+                          .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")))
+                          .build();
+    }
+
+    private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types)
+    {
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, COLUMNS);
+        List<ColumnDefinition> columns = new ArrayList<>();
+        query(query, keyspace, table).forEach(row -> columns.add(createColumnFromRow(row, types)));
+        return columns;
     }
 
-    private static void dropIndexFromSchemaMutation(CFMetaData table,
-                                                    IndexMetadata index,
-                                                    long timestamp,
-                                                    Mutation mutation)
+    public static ColumnDefinition createColumnFromRow(UntypedResultSet.Row row, Types types)
     {
-        RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name);
+        String keyspace = row.getString("keyspace_name");
+        String table = row.getString("table_name");
+
+        ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));
+
+        ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase());
+
+        int position = row.getInt("position");
+        ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
+
+        AbstractType<?> type = parse(keyspace, row.getString("type"), types);
+        if (order == ClusteringOrder.DESC)
+            type = ReversedType.getInstance(type);
+
+        return new ColumnDefinition(keyspace, table, name, type, position, kind);
     }
 
-    private static void addUpdatedIndexToSchemaMutation(CFMetaData table,
-                                                        IndexMetadata index,
-                                                        long timestamp,
-                                                        Mutation mutation)
+    private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table)
     {
-        addIndexToSchemaMutation(table, index, timestamp, mutation);
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, DROPPED_COLUMNS);
+        Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
+        for (UntypedResultSet.Row row : query(query, keyspace, table))
+        {
+            CFMetaData.DroppedColumn column = createDroppedColumnFromRow(row);
+            columns.put(UTF8Type.instance.decompose(column.name), column);
+        }
+        return columns;
     }
-    /**
-     * Deserialize secondary indexes from storage-level representation.
-     *
-     * @param partition storage-level partition containing the index definitions
-     * @return the list of processed IndexMetadata
-     */
-    private static Indexes createIndexesFromIndexesPartition(CFMetaData cfm, RowIterator partition)
+
+    private static CFMetaData.DroppedColumn createDroppedColumnFromRow(UntypedResultSet.Row row)
+    {
+        String keyspace = row.getString("keyspace_name");
+        String name = row.getString("column_name");
+        /*
+         * we never store actual UDT names in dropped column types (so that we can safely drop types if nothing refers to
+         * them anymore), so before storing dropped columns in schema we expand UDTs to tuples. See expandUserTypes method.
+         * Because of that, we can safely pass Types.none() to parse()
+         */
+        AbstractType<?> type = parse(keyspace, row.getString("type"), org.apache.cassandra.schema.Types.none());
+        long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
+        return new CFMetaData.DroppedColumn(name, type, droppedTime);
+    }
+
+    private static Indexes fetchIndexes(String keyspace, String table)
     {
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, INDEXES);
         Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder();
-        String query = String.format("SELECT * FROM %s.%s", NAME, INDEXES);
-        QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(row)));
+        query(query, keyspace, table).forEach(row -> indexes.add(createIndexMetadataFromRow(row)));
         return indexes.build();
     }
 
-    private static IndexMetadata createIndexMetadataFromIndexesRow(UntypedResultSet.Row row)
+    private static IndexMetadata createIndexMetadataFromRow(UntypedResultSet.Row row)
     {
         String name = row.getString("index_name");
         IndexMetadata.Kind type = IndexMetadata.Kind.valueOf(row.getString("kind"));
@@ -1513,66 +1058,104 @@ public final class SchemaKeyspace
         return IndexMetadata.fromSchemaMetadata(name, type, options);
     }
 
-    /*
-     * UDF metadata serialization/deserialization.
-     */
+    private static Triggers fetchTriggers(String keyspace, String table)
+    {
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, TRIGGERS);
+        Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
+        query(query, keyspace, table).forEach(row -> triggers.add(createTriggerFromRow(row)));
+        return triggers.build();
+    }
 
-    public static Mutation makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+    private static TriggerMetadata createTriggerFromRow(UntypedResultSet.Row row)
     {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addFunctionToSchemaMutation(function, timestamp, mutation);
-        return mutation;
+        String name = row.getString("trigger_name");
+        String classOption = row.getFrozenTextMap("options").get("class");
+        return new TriggerMetadata(name, classOption);
     }
 
-    static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
+    private static Views fetchViews(String keyspaceName, Types types)
     {
-        RowUpdateBuilder adder = new RowUpdateBuilder(Functions, timestamp, mutation)
-                                 .clustering(function.name().name, functionSignatureWithTypes(function));
+        String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", NAME, VIEWS);
 
-        adder.add("body", function.body())
-             .add("language", function.language())
-             .add("return_type", function.returnType().toString())
-             .add("called_on_null_input", function.isCalledOnNullInput())
-             .frozenList("argument_names", function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(Collectors.toList()))
-             .frozenList("argument_types", function.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList()));
+        Views.Builder views = org.apache.cassandra.schema.Views.builder();
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+            views.add(fetchView(keyspaceName, row.getString("view_name"), types));
+        return views.build();
+    }
 
-        adder.build();
+    private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types)
+    {
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEWS);
+        UntypedResultSet rows = query(query, keyspaceName, viewName);
+        if (rows.isEmpty())
+            throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName));
+        UntypedResultSet.Row row = rows.one();
+
+        UUID id = row.getUUID("id");
+        UUID baseTableId = row.getUUID("base_table_id");
+        String baseTableName = row.getString("base_table_name");
+        boolean includeAll = row.getBoolean("include_all_columns");
+        String whereClause = row.getString("where_clause");
+
+        List<ColumnDefinition> columns = fetchColumns(keyspaceName, viewName, types);
+
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, viewName);
+
+        CFMetaData cfm = CFMetaData.create(keyspaceName,
+                                           viewName,
+                                           id,
+                                           false,
+                                           true,
+                                           false,
+                                           false,
+                                           true,
+                                           columns,
+                                           DatabaseDescriptor.getPartitioner())
+                                   .params(createTableParamsFromRow(row))
+                                   .droppedColumns(droppedColumns);
+
+            String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause);
+            SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+
+            return new ViewDefinition(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm);
     }
 
-    public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+    private static Functions fetchFunctions(String keyspaceName, Types types)
     {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionSignatureWithTypes(function));
+        Functions udfs = fetchUDFs(keyspaceName, types);
+        Functions udas = fetchUDAs(keyspaceName, udfs, types);
+
+        return org.apache.cassandra.schema.Functions.builder()
+                                                    .add(udfs)
+                                                    .add(udas)
+                                                    .build();
     }
 
-    private static Collection<UDFunction> createFunctionsFromFunctionsPartition(RowIterator partition)
+    private static Functions fetchUDFs(String keyspaceName, Types types)
     {
-        List<UDFunction> functions = new ArrayList<>();
-        String query = String.format("SELECT * FROM %s.%s", NAME, FUNCTIONS);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
-            functions.add(createFunctionFromFunctionRow(row));
-        return functions;
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, FUNCTIONS);
+
+        Functions.Builder functions = org.apache.cassandra.schema.Functions.builder();
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+            functions.add(createUDFFromRow(row, types));
+        return functions.build();
     }
 
-    private static UDFunction createFunctionFromFunctionRow(UntypedResultSet.Row row)
+    private static UDFunction createUDFFromRow(UntypedResultSet.Row row, Types types)
     {
         String ksName = row.getString("keyspace_name");
         String functionName = row.getString("function_name");
         FunctionName name = new FunctionName(ksName, functionName);
 
         List<ColumnIdentifier> argNames = new ArrayList<>();
-        if (row.has("argument_names"))
-            for (String arg : row.getFrozenList("argument_names", UTF8Type.instance))
-                argNames.add(new ColumnIdentifier(arg, true));
+        for (String arg : row.getFrozenList("argument_names", UTF8Type.instance))
+            argNames.add(new ColumnIdentifier(arg, true));
 
         List<AbstractType<?>> argTypes = new ArrayList<>();
-        if (row.has("argument_types"))
-            for (String type : row.getFrozenList("argument_types", UTF8Type.instance))
-                argTypes.add(parseType(type));
+        for (String type : row.getFrozenList("argument_types", UTF8Type.instance))
+            argTypes.add(parse(ksName, type, types));
 
-        AbstractType<?> returnType = parseType(row.getString("return_type"));
+        AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types);
 
         String language = row.getString("language");
         String body = row.getString("body");
@@ -1609,70 +1192,33 @@ public final class SchemaKeyspace
         }
     }
 
-    /*
-     * Aggregate UDF metadata serialization/deserialization.
-     */
-
-    public static Mutation makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addAggregateToSchemaMutation(aggregate, timestamp, mutation);
-        return mutation;
-    }
-
-    static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder adder = new RowUpdateBuilder(Aggregates, timestamp, mutation)
-                                 .clustering(aggregate.name().name, functionSignatureWithTypes(aggregate));
-
-        adder.add("return_type", aggregate.returnType().toString())
-             .add("state_func", aggregate.stateFunction().name().name)
-             .add("state_type", aggregate.stateType() != null ? aggregate.stateType().toString() : null)
-             .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null)
-             .add("initcond", aggregate.initialCondition())
-             .frozenList("argument_types", aggregate.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList()))
-             .build();
-    }
-
-    private static Functions createAggregatesFromAggregatesPartition(Functions functions, RowIterator partition)
+    private static Functions fetchUDAs(String keyspaceName, Functions udfs, Types types)
     {
-        String query = String.format("SELECT * FROM %s.%s", NAME, AGGREGATES);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
-            functions = functions.with(createAggregateFromAggregateRow(functions, row));
-        return functions;
-    }
+        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, AGGREGATES);
 
-    private static UDAggregate createAggregateFromAggregateRow(UntypedResultSet.Row row)
-    {
-        return createAggregateFromAggregateRow(Schema.instance.getKSMetaData(row.getString("keyspace_name")).functions, row);
+        Functions.Builder aggregates = org.apache.cassandra.schema.Functions.builder();
+        for (UntypedResultSet.Row row : query(query, keyspaceName))
+            aggregates.add(createUDAFromRow(row, udfs, types));
+        return aggregates.build();
     }
 
-    private static UDAggregate createAggregateFromAggregateRow(Functions functions, UntypedResultSet.Row row)
+    private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, Functions functions, Types types)
     {
         String ksName = row.getString("keyspace_name");
         String functionName = row.getString("aggregate_name");
         FunctionName name = new FunctionName(ksName, functionName);
 
-        List<String> types = row.getFrozenList("argument_types", UTF8Type.instance);
+        List<AbstractType<?>> argTypes =
+            row.getFrozenList("argument_types", UTF8Type.instance)
+               .stream()
+               .map(t -> parse(ksName, t, types))
+               .collect(toList());
 
-        List<AbstractType<?>> argTypes;
-        if (types == null)
-        {
-            argTypes = Collections.emptyList();
-        }
-        else
-        {
-            argTypes = new ArrayList<>(types.size());
-            for (String type : types)
-                argTypes.add(parseType(type));
-        }
-
-        AbstractType<?> returnType = parseType(row.getString("return_type"));
+        AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types);
 
         FunctionName stateFunc = new FunctionName(ksName, (row.getString("state_func")));
         FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
-        AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null;
+        AbstractType<?> stateType = row.has("state_type") ? parse(ksName, row.getString("state_type"), types) : null;
         ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
 
         try
@@ -1685,30 +1231,171 @@ public final class SchemaKeyspace
         }
     }
 
-    public static Mutation makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+    private static UntypedResultSet query(String query, Object... variables)
     {
-        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionSignatureWithTypes(aggregate));
+        return executeInternal(query, variables);
+    }
+
+    /*
+     * Merging schema
+     */
+
+    /**
+     * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
+     * (which also involves fs operations on add/drop ks/cf)
+     *
+     * @param mutations the schema changes to apply
+     *
+     * @throws ConfigurationException If one of metadata attributes has invalid value
+     */
+    public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException
+    {
+        mergeSchema(mutations);
+        Schema.instance.updateVersionAndAnnounce();
+    }
+
+    public static synchronized void mergeSchema(Collection<Mutation> mutations)
+    {
+        // only compare the keyspaces affected by this set of schema mutations
+        Set<String> affectedKeyspaces =
+        mutations.stream()
+                 .map(m -> UTF8Type.instance.compose(m.key().getKey()))
+                 .collect(Collectors.toSet());
+
+        // fetch the current state of schema for the affected keyspaces only
+        Keyspaces before = Schema.instance.getKeyspaces(affectedKeyspaces);
+
+        // apply the schema mutations and flush
+        mutations.forEach(Mutation::apply);
+        if (FLUSH_SCHEMA_TABLES)
+            flush();
+
+        // fetch the new state of schema from schema tables (not applied to Schema.instance yet)
+        Keyspaces after = fetchKeyspacesOnly(affectedKeyspaces);
+
+        // deal with the diff
+        MapDifference<String, KeyspaceMetadata> keyspacesDiff = before.diff(after);
+
+        // dropped keyspaces
+        for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnLeft().values())
+        {
+            keyspace.functions.udas().forEach(Schema.instance::dropAggregate);
+            keyspace.functions.udfs().forEach(Schema.instance::dropFunction);
+            keyspace.views.forEach(v -> Schema.instance.dropView(v.ksName, v.viewName));
+            keyspace.tables.forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName));
+            keyspace.types.forEach(Schema.instance::dropType);
+            Schema.instance.dropKeyspace(keyspace.name);
+        }
+
+        // new keyspaces
+        for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnRight().values())
+        {
+            Schema.instance.addKeyspace(KeyspaceMetadata.create(keyspace.name, keyspace.params));
+            keyspace.types.forEach(Schema.instance::addType);
+            keyspace.tables.forEach(Schema.instance::addTable);
+            keyspace.views.forEach(Schema.instance::addView);
+            keyspace.functions.udfs().forEach(Schema.instance::addFunction);
+            keyspace.functions.udas().forEach(Schema.instance::addAggregate);
+        }
+
+        // updated keyspaces
+        for (Map.Entry<String, MapDifference.ValueDifference<KeyspaceMetadata>> diff : keyspacesDiff.entriesDiffering().entrySet())
+            updateKeyspace(diff.getKey(), diff.getValue().leftValue(), diff.getValue().rightValue());
+    }
+
+    private static void updateKeyspace(String keyspaceName, KeyspaceMetadata keyspaceBefore, KeyspaceMetadata keyspaceAfter)
+    {
+        // calculate the deltas
+        MapDifference<String, CFMetaData> tablesDiff = keyspaceBefore.tables.diff(keyspaceAfter.tables);
+        MapDifference<String, ViewDefinition> viewsDiff = keyspaceBefore.views.diff(keyspaceAfter.views);
+        MapDifference<ByteBuffer, UserType> typesDiff = keyspaceBefore.types.diff(keyspaceAfter.types);
+
+        Map<Pair<FunctionName, List<String>>, UDFunction> udfsBefore = new HashMap<>();
+        keyspaceBefore.functions.udfs().forEach(f -> udfsBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+        Map<Pair<FunctionName, List<String>>, UDFunction> udfsAfter = new HashMap<>();
+        keyspaceAfter.functions.udfs().forEach(f -> udfsAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+        MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = Maps.difference(udfsBefore, udfsAfter);
+
+        Map<Pair<FunctionName, List<String>>, UDAggregate> udasBefore = new HashMap<>();
+        keyspaceBefore.functions.udas().forEach(f -> udasBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+        Map<Pair<FunctionName, List<String>>, UDAggregate> udasAfter = new HashMap<>();
+        keyspaceAfter.functions.udas().forEach(f -> udasAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+        MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff = Maps.difference(udasBefore, udasAfter);
+
+        // update keyspace params, if changed
+        if (!keyspaceBefore.params.equals(keyspaceAfter.params))
+            Schema.instance.updateKeyspace(keyspaceName, keyspaceAfter.params);
+
+        // drop everything removed
+        udasDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropAggregate);
+        udfsDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropFunction);
+        viewsDiff.entriesOnlyOnLeft().values().forEach(v -> Schema.instance.dropView(v.ksName, v.viewName));
+        tablesDiff.entriesOnlyOnLeft().values().forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName));
+        typesDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropType);
+
+        // add everything created
+        typesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addType);
+        tablesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addTable);
+        viewsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addView);
+        udfsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addFunction);
+        udasDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addAggregate);
+
+        // update everything altered
+        for (MapDifference.ValueDifference<UserType> diff : typesDiff.entriesDiffering().values())
+            Schema.instance.updateType(diff.rightValue());
+        for (MapDifference.ValueDifference<CFMetaData> diff : tablesDiff.entriesDiffering().values())
+            Schema.instance.updateTable(diff.rightValue());
+        for (MapDifference.ValueDifference<ViewDefinition> diff : viewsDiff.entriesDiffering().values())
+            Schema.instance.updateView(diff.rightValue());
+        for (MapDifference.ValueDifference<UDFunction> diff : udfsDiff.entriesDiffering().values())
+            Schema.instance.updateFunction(diff.rightValue());
+        for (MapDifference.ValueDifference<UDAggregate> diff : udasDiff.entriesDiffering().values())
+            Schema.instance.updateAggregate(diff.rightValue());
     }
 
-    private static AbstractType<?> parseType(String str)
+    /*
+     * Type parsing and transformation
+     */
+
+    /*
+     * Recursively replaces any instances of UserType with an equivalent TupleType.
+     * We do it for dropped_columns, to allow safely dropping unused user types without retaining any references
+     * in dropped_columns.
+     */
+    private static AbstractType<?> expandUserTypes(AbstractType<?> original)
     {
-        return TypeParser.parse(str);
+        if (original instanceof UserType)
+            return new TupleType(expandUserTypes(((UserType) original).fieldTypes()));
+
+        if (original instanceof TupleType)
+            return new TupleType(expandUserTypes(((TupleType) original).allTypes()));
+
+        if (original instanceof ListType<?>)
+            return ListType.getInstance(expandUserTypes(((ListType<?>) original).getElementsType()), original.isMultiCell());
+
+        if (original instanceof MapType<?,?>)
+        {
+            MapType<?, ?> mt = (MapType<?, ?>) original;
+            return MapType.getInstance(expandUserTypes(mt.getKeysType()), expandUserTypes(mt.getValuesType()), mt.isMultiCell());
+        }
+
+        if (original instanceof SetType<?>)
+            return SetType.getInstance(expandUserTypes(((SetType<?>) original).getElementsType()), original.isMultiCell());
+
+        // this is very unlikely to ever happen, but it's better to be safe than sorry
+        if (original instanceof ReversedType<?>)
+            return ReversedType.getInstance(expandUserTypes(((ReversedType) original).baseType));
+
+        if (original instanceof CompositeType)
+            return CompositeType.getInstance(expandUserTypes(original.getComponents()));
+
+        return original;
     }
 
-    // We allow method overloads, so a function is not uniquely identified by its name only, but
-    // also by its argument types. To distinguish overloads of given function name in the schema
-    // we use a "signature" which is just a list of it's CQL argument types (we could replace that by
-    // using a "signature" UDT that would be comprised of the function name and argument types,
-    // which we could then use as clustering column. But as we haven't yet used UDT in system tables,
-    // We'll leave that decision to #6717).
-    public static ByteBuffer functionSignatureWithTypes(AbstractFunction fun)
+    private static List<AbstractType<?>> expandUserTypes(List<AbstractType<?>> types)
     {
-        ListType<String> list = ListType.getInstance(UTF8Type.instance, false);
-        List<String> strList = new ArrayList<>(fun.argTypes().size());
-        for (AbstractType<?> argType : fun.argTypes())
-            strList.add(argType.asCQL3Type().toString());
-        return list.decompose(strList);
+        return types.stream()
+                    .map(SchemaKeyspace::expandUserTypes)
+                    .collect(toList());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Tables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Tables.java b/src/java/org/apache/cassandra/schema/Tables.java
index 151697d..4f728d4 100644
--- a/src/java/org/apache/cassandra/schema/Tables.java
+++ b/src/java/org/apache/cassandra/schema/Tables.java
@@ -23,6 +23,8 @@ import java.util.Optional;
 import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
 
 import org.apache.cassandra.config.CFMetaData;
 
@@ -115,6 +117,11 @@ public final class Tables implements Iterable<CFMetaData>
         return builder().add(filter(this, t -> t != table)).build();
     }
 
+    MapDifference<String, CFMetaData> diff(Tables other)
+    {
+        return Maps.difference(tables, other.tables);
+    }
+
     @Override
     public boolean equals(Object o)
     {