You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:24 UTC

[18/37] cassandra git commit: Make TableMetadata immutable, optimize Schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index ee0974f..6716652 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -23,7 +23,6 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.MapDifference;
@@ -32,7 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.cql3.statements.SelectStatement;
@@ -42,19 +42,17 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.view.View;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 import static java.lang.String.format;
 
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
-import static org.apache.cassandra.schema.CQLTypeParser.parse;
 
 /**
  * system_schema.* tables and methods for manipulating them.
@@ -83,163 +81,167 @@ public final class SchemaKeyspace
     public static final List<String> ALL =
         ImmutableList.of(KEYSPACES, TABLES, COLUMNS, DROPPED_COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
 
-    private static final CFMetaData Keyspaces =
-        compile(KEYSPACES,
-                "keyspace definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "durable_writes boolean,"
-                + "replication frozen<map<text, text>>,"
-                + "PRIMARY KEY ((keyspace_name)))");
-
-    private static final CFMetaData Tables =
-        compile(TABLES,
-                "table definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "table_name text,"
-                + "bloom_filter_fp_chance double,"
-                + "caching frozen<map<text, text>>,"
-                + "comment text,"
-                + "compaction frozen<map<text, text>>,"
-                + "compression frozen<map<text, text>>,"
-                + "crc_check_chance double,"
-                + "dclocal_read_repair_chance double,"
-                + "default_time_to_live int,"
-                + "extensions frozen<map<text, blob>>,"
-                + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, COMPOUND
-                + "gc_grace_seconds int,"
-                + "id uuid,"
-                + "max_index_interval int,"
-                + "memtable_flush_period_in_ms int,"
-                + "min_index_interval int,"
-                + "read_repair_chance double,"
-                + "speculative_retry text,"
-                + "cdc boolean,"
-                + "PRIMARY KEY ((keyspace_name), table_name))");
-
-    private static final CFMetaData Columns =
-        compile(COLUMNS,
-                "column definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "table_name text,"
-                + "column_name text,"
-                + "clustering_order text,"
-                + "column_name_bytes blob,"
-                + "kind text,"
-                + "position int,"
-                + "type text,"
-                + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
-
-    private static final CFMetaData DroppedColumns =
-        compile(DROPPED_COLUMNS,
-                "dropped column registry",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "table_name text,"
-                + "column_name text,"
-                + "dropped_time timestamp,"
-                + "type text,"
-                + "kind text,"
-                + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
-
-    private static final CFMetaData Triggers =
-        compile(TRIGGERS,
-                "trigger definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "table_name text,"
-                + "trigger_name text,"
-                + "options frozen<map<text, text>>,"
-                + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
-
-    private static final CFMetaData Views =
-        compile(VIEWS,
-                "view definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "view_name text,"
-                + "base_table_id uuid,"
-                + "base_table_name text,"
-                + "where_clause text,"
-                + "bloom_filter_fp_chance double,"
-                + "caching frozen<map<text, text>>,"
-                + "comment text,"
-                + "compaction frozen<map<text, text>>,"
-                + "compression frozen<map<text, text>>,"
-                + "crc_check_chance double,"
-                + "dclocal_read_repair_chance double,"
-                + "default_time_to_live int,"
-                + "extensions frozen<map<text, blob>>,"
-                + "gc_grace_seconds int,"
-                + "id uuid,"
-                + "include_all_columns boolean,"
-                + "max_index_interval int,"
-                + "memtable_flush_period_in_ms int,"
-                + "min_index_interval int,"
-                + "read_repair_chance double,"
-                + "speculative_retry text,"
-                + "cdc boolean,"
-                + "PRIMARY KEY ((keyspace_name), view_name))");
-
-    private static final CFMetaData Indexes =
-        compile(INDEXES,
-                "secondary index definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "table_name text,"
-                + "index_name text,"
-                + "kind text,"
-                + "options frozen<map<text, text>>,"
-                + "PRIMARY KEY ((keyspace_name), table_name, index_name))");
-
-    private static final CFMetaData Types =
-        compile(TYPES,
-                "user defined type definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "type_name text,"
-                + "field_names frozen<list<text>>,"
-                + "field_types frozen<list<text>>,"
-                + "PRIMARY KEY ((keyspace_name), type_name))");
-
-    private static final CFMetaData Functions =
-        compile(FUNCTIONS,
-                "user defined function definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "function_name text,"
-                + "argument_types frozen<list<text>>,"
-                + "argument_names frozen<list<text>>,"
-                + "body text,"
-                + "language text,"
-                + "return_type text,"
-                + "called_on_null_input boolean,"
-                + "PRIMARY KEY ((keyspace_name), function_name, argument_types))");
-
-    private static final CFMetaData Aggregates =
-        compile(AGGREGATES,
-                "user defined aggregate definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "aggregate_name text,"
-                + "argument_types frozen<list<text>>,"
-                + "final_func text,"
-                + "initcond text,"
-                + "return_type text,"
-                + "state_func text,"
-                + "state_type text,"
-                + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))");
-
-    public static final List<CFMetaData> ALL_TABLE_METADATA =
+    private static final TableMetadata Keyspaces =
+        parse(KEYSPACES,
+              "keyspace definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "durable_writes boolean,"
+              + "replication frozen<map<text, text>>,"
+              + "PRIMARY KEY ((keyspace_name)))");
+
+    private static final TableMetadata Tables =
+        parse(TABLES,
+              "table definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "table_name text,"
+              + "bloom_filter_fp_chance double,"
+              + "caching frozen<map<text, text>>,"
+              + "comment text,"
+              + "compaction frozen<map<text, text>>,"
+              + "compression frozen<map<text, text>>,"
+              + "crc_check_chance double,"
+              + "dclocal_read_repair_chance double,"
+              + "default_time_to_live int,"
+              + "extensions frozen<map<text, blob>>,"
+              + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, COMPOUND
+              + "gc_grace_seconds int,"
+              + "id uuid,"
+              + "max_index_interval int,"
+              + "memtable_flush_period_in_ms int,"
+              + "min_index_interval int,"
+              + "read_repair_chance double,"
+              + "speculative_retry text,"
+              + "cdc boolean,"
+              + "PRIMARY KEY ((keyspace_name), table_name))");
+
+    private static final TableMetadata Columns =
+        parse(COLUMNS,
+              "column definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "table_name text,"
+              + "column_name text,"
+              + "clustering_order text,"
+              + "column_name_bytes blob,"
+              + "kind text,"
+              + "position int,"
+              + "type text,"
+              + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
+
+    private static final TableMetadata DroppedColumns =
+        parse(DROPPED_COLUMNS,
+              "dropped column registry",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "table_name text,"
+              + "column_name text,"
+              + "dropped_time timestamp,"
+              + "type text,"
+              + "kind text,"
+              + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
+
+    private static final TableMetadata Triggers =
+        parse(TRIGGERS,
+              "trigger definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "table_name text,"
+              + "trigger_name text,"
+              + "options frozen<map<text, text>>,"
+              + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
+
+    private static final TableMetadata Views =
+        parse(VIEWS,
+              "view definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "view_name text,"
+              + "base_table_id uuid,"
+              + "base_table_name text,"
+              + "where_clause text,"
+              + "bloom_filter_fp_chance double,"
+              + "caching frozen<map<text, text>>,"
+              + "comment text,"
+              + "compaction frozen<map<text, text>>,"
+              + "compression frozen<map<text, text>>,"
+              + "crc_check_chance double,"
+              + "dclocal_read_repair_chance double,"
+              + "default_time_to_live int,"
+              + "extensions frozen<map<text, blob>>,"
+              + "gc_grace_seconds int,"
+              + "id uuid,"
+              + "include_all_columns boolean,"
+              + "max_index_interval int,"
+              + "memtable_flush_period_in_ms int,"
+              + "min_index_interval int,"
+              + "read_repair_chance double,"
+              + "speculative_retry text,"
+              + "cdc boolean,"
+              + "PRIMARY KEY ((keyspace_name), view_name))");
+
+    private static final TableMetadata Indexes =
+        parse(INDEXES,
+              "secondary index definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "table_name text,"
+              + "index_name text,"
+              + "kind text,"
+              + "options frozen<map<text, text>>,"
+              + "PRIMARY KEY ((keyspace_name), table_name, index_name))");
+
+    private static final TableMetadata Types =
+        parse(TYPES,
+              "user defined type definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "type_name text,"
+              + "field_names frozen<list<text>>,"
+              + "field_types frozen<list<text>>,"
+              + "PRIMARY KEY ((keyspace_name), type_name))");
+
+    private static final TableMetadata Functions =
+        parse(FUNCTIONS,
+              "user defined function definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "function_name text,"
+              + "argument_types frozen<list<text>>,"
+              + "argument_names frozen<list<text>>,"
+              + "body text,"
+              + "language text,"
+              + "return_type text,"
+              + "called_on_null_input boolean,"
+              + "PRIMARY KEY ((keyspace_name), function_name, argument_types))");
+
+    private static final TableMetadata Aggregates =
+        parse(AGGREGATES,
+              "user defined aggregate definitions",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "aggregate_name text,"
+              + "argument_types frozen<list<text>>,"
+              + "final_func text,"
+              + "initcond text,"
+              + "return_type text,"
+              + "state_func text,"
+              + "state_type text,"
+              + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))");
+
+    private static final List<TableMetadata> ALL_TABLE_METADATA =
         ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes);
 
-    private static CFMetaData compile(String name, String description, String schema)
+    private static TableMetadata parse(String name, String description, String cql)
     {
-        return CFMetaData.compile(String.format(schema, name), SchemaConstants.SCHEMA_KEYSPACE_NAME)
-                         .comment(description)
-                         .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7));
+        return CreateTableStatement.parse(format(cql, name), SchemaConstants.SCHEMA_KEYSPACE_NAME)
+                                   .id(TableId.forSystemTable(SchemaConstants.SCHEMA_KEYSPACE_NAME, name))
+                                   .dcLocalReadRepairChance(0.0)
+                                   .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7))
+                                   .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1))
+                                   .comment(description)
+                                   .build();
     }
 
     public static KeyspaceMetadata metadata()
@@ -252,8 +254,8 @@ public final class SchemaKeyspace
      */
     public static void saveSystemKeyspacesSchema()
     {
-        KeyspaceMetadata system = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-        KeyspaceMetadata schema = Schema.instance.getKSMetaData(SchemaConstants.SCHEMA_KEYSPACE_NAME);
+        KeyspaceMetadata system = Schema.instance.getKeyspaceMetadata(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+        KeyspaceMetadata schema = Schema.instance.getKeyspaceMetadata(SchemaConstants.SCHEMA_KEYSPACE_NAME);
 
         long timestamp = FBUtilities.timestampMicros();
 
@@ -285,7 +287,7 @@ public final class SchemaKeyspace
      * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
      * will be converted into UUID which would act as content-based version of the schema.
      */
-    public static UUID calculateSchemaDigest()
+    static UUID calculateSchemaDigest()
     {
         MessageDigest digest;
         try
@@ -337,10 +339,10 @@ public final class SchemaKeyspace
     private static ReadCommand getReadCommandForTableSchema(String schemaTableName)
     {
         ColumnFamilyStore cfs = getSchemaCFS(schemaTableName);
-        return PartitionRangeReadCommand.allDataRead(cfs.metadata, FBUtilities.nowInSeconds());
+        return PartitionRangeReadCommand.allDataRead(cfs.metadata(), FBUtilities.nowInSeconds());
     }
 
-    public static Collection<Mutation> convertSchemaToMutations()
+    static Collection<Mutation> convertSchemaToMutations()
     {
         Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
 
@@ -364,12 +366,7 @@ public final class SchemaKeyspace
                         continue;
 
                     DecoratedKey key = partition.partitionKey();
-                    Mutation mutation = mutationMap.get(key);
-                    if (mutation == null)
-                    {
-                        mutation = new Mutation(SchemaConstants.SCHEMA_KEYSPACE_NAME, key);
-                        mutationMap.put(key, mutation);
-                    }
+                    Mutation mutation = mutationMap.computeIfAbsent(key, k -> new Mutation(SchemaConstants.SCHEMA_KEYSPACE_NAME, key));
 
                     mutation.add(makeUpdateForSchema(partition, cmd.columnFilter()));
                 }
@@ -387,13 +384,13 @@ public final class SchemaKeyspace
         // This method is used during schema migration tasks, and if cdc is disabled, we want to force excluding the
         // 'cdc' column from the TABLES schema table because it is problematic if received by older nodes (see #12236
         // and #12697). Otherwise though, we just simply "buffer" the content of the partition into a PartitionUpdate.
-        if (DatabaseDescriptor.isCDCEnabled() || !partition.metadata().cfName.equals(TABLES))
+        if (DatabaseDescriptor.isCDCEnabled() || !partition.metadata().name.equals(TABLES))
             return PartitionUpdate.fromIterator(partition, filter);
 
         // We want to skip the 'cdc' column. A simple solution for that is based on the fact that
         // 'PartitionUpdate.fromIterator()' will ignore any columns that are marked as 'fetched' but not 'queried'.
         ColumnFilter.Builder builder = ColumnFilter.allRegularColumnsBuilder(partition.metadata());
-        for (ColumnDefinition column : filter.fetchedColumns())
+        for (ColumnMetadata column : filter.fetchedColumns())
         {
             if (!column.name.toString().equals("cdc"))
                 builder.add(column);
@@ -411,14 +408,15 @@ public final class SchemaKeyspace
      * Schema entities to mutations
      */
 
-    private static DecoratedKey decorate(CFMetaData metadata, Object value)
+    @SuppressWarnings("unchecked")
+    private static DecoratedKey decorate(TableMetadata metadata, Object value)
     {
-        return metadata.decorateKey(((AbstractType)metadata.getKeyValidator()).decompose(value));
+        return metadata.partitioner.decorateKey(((AbstractType)metadata.partitionKeyType).decompose(value));
     }
 
-    public static Mutation.SimpleBuilder makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp)
+    static Mutation.SimpleBuilder makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp)
     {
-        Mutation.SimpleBuilder builder = Mutation.simpleBuilder(Keyspaces.ksName, decorate(Keyspaces, name))
+        Mutation.SimpleBuilder builder = Mutation.simpleBuilder(Keyspaces.keyspace, decorate(Keyspaces, name))
                                                  .timestamp(timestamp);
 
         builder.update(Keyspaces)
@@ -429,7 +427,7 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    public static Mutation.SimpleBuilder makeCreateKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp)
+    static Mutation.SimpleBuilder makeCreateKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp)
     {
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
 
@@ -442,18 +440,18 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    public static Mutation.SimpleBuilder makeDropKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp)
+    static Mutation.SimpleBuilder makeDropKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp)
     {
         Mutation.SimpleBuilder builder = Mutation.simpleBuilder(SchemaConstants.SCHEMA_KEYSPACE_NAME, decorate(Keyspaces, keyspace.name))
                                                  .timestamp(timestamp);
 
-        for (CFMetaData schemaTable : ALL_TABLE_METADATA)
+        for (TableMetadata schemaTable : ALL_TABLE_METADATA)
             builder.update(schemaTable).delete();
 
         return builder;
     }
 
-    public static Mutation.SimpleBuilder makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp)
+    static Mutation.SimpleBuilder makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
@@ -469,7 +467,7 @@ public final class SchemaKeyspace
                 .add("field_types", type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList()));
     }
 
-    public static Mutation.SimpleBuilder dropTypeFromSchemaMutation(KeyspaceMetadata keyspace, UserType type, long timestamp)
+    static Mutation.SimpleBuilder dropTypeFromSchemaMutation(KeyspaceMetadata keyspace, UserType type, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
@@ -477,7 +475,7 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    public static Mutation.SimpleBuilder makeCreateTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp)
+    static Mutation.SimpleBuilder makeCreateTableMutation(KeyspaceMetadata keyspace, TableMetadata table, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
@@ -485,27 +483,27 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    static void addTableToSchemaMutation(CFMetaData table, boolean withColumnsAndTriggers, Mutation.SimpleBuilder builder)
+    static void addTableToSchemaMutation(TableMetadata table, boolean withColumnsAndTriggers, Mutation.SimpleBuilder builder)
     {
         Row.SimpleBuilder rowBuilder = builder.update(Tables)
-                                              .row(table.cfName)
-                                              .add("id", table.cfId)
-                                              .add("flags", CFMetaData.flagsToStrings(table.flags()));
+                                              .row(table.name)
+                                              .add("id", table.id.asUUID())
+                                              .add("flags", TableMetadata.Flag.toStringSet(table.flags));
 
         addTableParamsToRowBuilder(table.params, rowBuilder);
 
         if (withColumnsAndTriggers)
         {
-            for (ColumnDefinition column : table.allColumns())
+            for (ColumnMetadata column : table.columns())
                 addColumnToSchemaMutation(table, column, builder);
 
-            for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values())
+            for (DroppedColumn column : table.droppedColumns.values())
                 addDroppedColumnToSchemaMutation(table, column, builder);
 
-            for (TriggerMetadata trigger : table.getTriggers())
+            for (TriggerMetadata trigger : table.triggers)
                 addTriggerToSchemaMutation(table, trigger, builder);
 
-            for (IndexMetadata index : table.getIndexes())
+            for (IndexMetadata index : table.indexes)
                 addIndexToSchemaMutation(table, index, builder);
         }
     }
@@ -534,43 +532,42 @@ public final class SchemaKeyspace
             builder.add("cdc", params.cdc);
     }
 
-    public static Mutation.SimpleBuilder makeUpdateTableMutation(KeyspaceMetadata keyspace,
-                                                                 CFMetaData oldTable,
-                                                                 CFMetaData newTable,
+    static Mutation.SimpleBuilder makeUpdateTableMutation(KeyspaceMetadata keyspace,
+                                                                 TableMetadata oldTable,
+                                                                 TableMetadata newTable,
                                                                  long timestamp)
     {
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
 
         addTableToSchemaMutation(newTable, false, builder);
 
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldTable.getColumnMetadata(),
-                                                                                 newTable.getColumnMetadata());
+        MapDifference<ByteBuffer, ColumnMetadata> columnDiff = Maps.difference(oldTable.columns, newTable.columns);
 
         // columns that are no longer needed
-        for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values())
+        for (ColumnMetadata column : columnDiff.entriesOnlyOnLeft().values())
             dropColumnFromSchemaMutation(oldTable, column, builder);
 
         // newly added columns
-        for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values())
+        for (ColumnMetadata column : columnDiff.entriesOnlyOnRight().values())
             addColumnToSchemaMutation(newTable, column, builder);
 
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
-            addColumnToSchemaMutation(newTable, newTable.getColumnDefinition(name), builder);
+            addColumnToSchemaMutation(newTable, newTable.getColumn(name), builder);
 
         // dropped columns
-        MapDifference<ByteBuffer, CFMetaData.DroppedColumn> droppedColumnDiff =
-            Maps.difference(oldTable.getDroppedColumns(), newTable.getDroppedColumns());
+        MapDifference<ByteBuffer, DroppedColumn> droppedColumnDiff =
+            Maps.difference(oldTable.droppedColumns, newTable.droppedColumns);
 
         // newly dropped columns
-        for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values())
+        for (DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values())
             addDroppedColumnToSchemaMutation(newTable, column, builder);
 
         // columns added then dropped again
         for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet())
-            addDroppedColumnToSchemaMutation(newTable, newTable.getDroppedColumns().get(name), builder);
+            addDroppedColumnToSchemaMutation(newTable, newTable.droppedColumns.get(name), builder);
 
-        MapDifference<String, TriggerMetadata> triggerDiff = triggersDiff(oldTable.getTriggers(), newTable.getTriggers());
+        MapDifference<String, TriggerMetadata> triggerDiff = triggersDiff(oldTable.triggers, newTable.triggers);
 
         // dropped triggers
         for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnLeft().values())
@@ -580,8 +577,7 @@ public final class SchemaKeyspace
         for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values())
             addTriggerToSchemaMutation(newTable, trigger, builder);
 
-        MapDifference<String, IndexMetadata> indexesDiff = indexesDiff(oldTable.getIndexes(),
-                                                                       newTable.getIndexes());
+        MapDifference<String, IndexMetadata> indexesDiff = indexesDiff(oldTable.indexes, newTable.indexes);
 
         // dropped indexes
         for (IndexMetadata index : indexesDiff.entriesOnlyOnLeft().values())
@@ -620,33 +616,33 @@ public final class SchemaKeyspace
         return Maps.difference(beforeMap, afterMap);
     }
 
-    public static Mutation.SimpleBuilder makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp)
+    static Mutation.SimpleBuilder makeDropTableMutation(KeyspaceMetadata keyspace, TableMetadata table, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
 
-        builder.update(Tables).row(table.cfName).delete();
+        builder.update(Tables).row(table.name).delete();
 
-        for (ColumnDefinition column : table.allColumns())
+        for (ColumnMetadata column : table.columns())
             dropColumnFromSchemaMutation(table, column, builder);
 
-        for (TriggerMetadata trigger : table.getTriggers())
+        for (TriggerMetadata trigger : table.triggers)
             dropTriggerFromSchemaMutation(table, trigger, builder);
 
-        for (IndexMetadata index : table.getIndexes())
+        for (IndexMetadata index : table.indexes)
             dropIndexFromSchemaMutation(table, index, builder);
 
         return builder;
     }
 
-    private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, Mutation.SimpleBuilder builder)
+    private static void addColumnToSchemaMutation(TableMetadata table, ColumnMetadata column, Mutation.SimpleBuilder builder)
     {
         AbstractType<?> type = column.type;
         if (type instanceof ReversedType)
             type = ((ReversedType) type).baseType;
 
         builder.update(Columns)
-               .row(table.cfName, column.name.toString())
+               .row(table.name, column.name.toString())
                .add("column_name_bytes", column.name.bytes)
                .add("kind", column.kind.toString().toLowerCase())
                .add("position", column.position())
@@ -654,34 +650,34 @@ public final class SchemaKeyspace
                .add("type", type.asCQL3Type().toString());
     }
 
-    private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, Mutation.SimpleBuilder builder)
+    private static void dropColumnFromSchemaMutation(TableMetadata table, ColumnMetadata column, Mutation.SimpleBuilder builder)
     {
         // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
-        builder.update(Columns).row(table.cfName, column.name.toString()).delete();
+        builder.update(Columns).row(table.name, column.name.toString()).delete();
     }
 
-    private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, Mutation.SimpleBuilder builder)
+    private static void addDroppedColumnToSchemaMutation(TableMetadata table, DroppedColumn column, Mutation.SimpleBuilder builder)
     {
         builder.update(DroppedColumns)
-               .row(table.cfName, column.name)
+               .row(table.name, column.column.name.toString())
                .add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime)))
-               .add("type", expandUserTypes(column.type).asCQL3Type().toString())
-               .add("kind", column.kind.toString().toLowerCase());
+               .add("type", expandUserTypes(column.column.type).asCQL3Type().toString())
+               .add("kind", column.column.kind.toString().toLowerCase());
     }
 
-    private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, Mutation.SimpleBuilder builder)
+    private static void addTriggerToSchemaMutation(TableMetadata table, TriggerMetadata trigger, Mutation.SimpleBuilder builder)
     {
         builder.update(Triggers)
-               .row(table.cfName, trigger.name)
+               .row(table.name, trigger.name)
                .add("options", Collections.singletonMap("class", trigger.classOption));
     }
 
-    private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, Mutation.SimpleBuilder builder)
+    private static void dropTriggerFromSchemaMutation(TableMetadata table, TriggerMetadata trigger, Mutation.SimpleBuilder builder)
     {
-        builder.update(Triggers).row(table.cfName, trigger.name).delete();
+        builder.update(Triggers).row(table.name, trigger.name).delete();
     }
 
-    public static Mutation.SimpleBuilder makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
+    static Mutation.SimpleBuilder makeCreateViewMutation(KeyspaceMetadata keyspace, ViewMetadata view, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
@@ -689,110 +685,106 @@ public final class SchemaKeyspace
         return builder;
     }
 
-    private static void addViewToSchemaMutation(ViewDefinition view, boolean includeColumns, Mutation.SimpleBuilder builder)
+    private static void addViewToSchemaMutation(ViewMetadata view, boolean includeColumns, Mutation.SimpleBuilder builder)
     {
-        CFMetaData table = view.metadata;
+        TableMetadata table = view.metadata;
         Row.SimpleBuilder rowBuilder = builder.update(Views)
-                                              .row(view.viewName)
+                                              .row(view.name)
                                               .add("include_all_columns", view.includeAllColumns)
-                                              .add("base_table_id", view.baseTableId)
-                                              .add("base_table_name", view.baseTableMetadata().cfName)
+                                              .add("base_table_id", view.baseTableId.asUUID())
+                                              .add("base_table_name", view.baseTableName)
                                               .add("where_clause", view.whereClause)
-                                              .add("id", table.cfId);
+                                              .add("id", table.id.asUUID());
 
         addTableParamsToRowBuilder(table.params, rowBuilder);
 
         if (includeColumns)
         {
-            for (ColumnDefinition column : table.allColumns())
+            for (ColumnMetadata column : table.columns())
                 addColumnToSchemaMutation(table, column, builder);
 
-            for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values())
+            for (DroppedColumn column : table.droppedColumns.values())
                 addDroppedColumnToSchemaMutation(table, column, builder);
         }
     }
 
-    public static Mutation.SimpleBuilder makeDropViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
+    static Mutation.SimpleBuilder makeDropViewMutation(KeyspaceMetadata keyspace, ViewMetadata view, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
 
-        builder.update(Views).row(view.viewName).delete();
+        builder.update(Views).row(view.name).delete();
 
-        CFMetaData table = view.metadata;
-        for (ColumnDefinition column : table.allColumns())
+        TableMetadata table = view.metadata;
+        for (ColumnMetadata column : table.columns())
             dropColumnFromSchemaMutation(table, column, builder);
 
-        for (IndexMetadata index : table.getIndexes())
+        for (IndexMetadata index : table.indexes)
             dropIndexFromSchemaMutation(table, index, builder);
 
         return builder;
     }
 
-    public static Mutation.SimpleBuilder makeUpdateViewMutation(KeyspaceMetadata keyspace,
-                                                                ViewDefinition oldView,
-                                                                ViewDefinition newView,
+    static Mutation.SimpleBuilder makeUpdateViewMutation(KeyspaceMetadata keyspace,
+                                                                ViewMetadata oldView,
+                                                                ViewMetadata newView,
                                                                 long timestamp)
     {
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
 
         addViewToSchemaMutation(newView, false, builder);
 
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldView.metadata.getColumnMetadata(),
-                                                                                 newView.metadata.getColumnMetadata());
+        MapDifference<ByteBuffer, ColumnMetadata> columnDiff = Maps.difference(oldView.metadata.columns,
+                                                                               newView.metadata.columns);
 
         // columns that are no longer needed
-        for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values())
+        for (ColumnMetadata column : columnDiff.entriesOnlyOnLeft().values())
             dropColumnFromSchemaMutation(oldView.metadata, column, builder);
 
         // newly added columns
-        for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values())
+        for (ColumnMetadata column : columnDiff.entriesOnlyOnRight().values())
             addColumnToSchemaMutation(newView.metadata, column, builder);
 
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
-            addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumnDefinition(name), builder);
+            addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumn(name), builder);
 
         // dropped columns
-        MapDifference<ByteBuffer, CFMetaData.DroppedColumn> droppedColumnDiff =
-        Maps.difference(oldView.metadata.getDroppedColumns(), oldView.metadata.getDroppedColumns());
+        MapDifference<ByteBuffer, DroppedColumn> droppedColumnDiff =
+        Maps.difference(oldView.metadata.droppedColumns, oldView.metadata.droppedColumns);
 
         // newly dropped columns
-        for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values())
+        for (DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values())
             addDroppedColumnToSchemaMutation(oldView.metadata, column, builder);
 
         // columns added then dropped again
         for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet())
-            addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.getDroppedColumns().get(name), builder);
+            addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.droppedColumns.get(name), builder);
 
         return builder;
     }
 
-    private static void addIndexToSchemaMutation(CFMetaData table,
-                                                 IndexMetadata index,
-                                                 Mutation.SimpleBuilder builder)
+    private static void addIndexToSchemaMutation(TableMetadata table, IndexMetadata index, Mutation.SimpleBuilder builder)
     {
         builder.update(Indexes)
-               .row(table.cfName, index.name)
+               .row(table.name, index.name)
                .add("kind", index.kind.toString())
                .add("options", index.options);
     }
 
-    private static void dropIndexFromSchemaMutation(CFMetaData table,
-                                                    IndexMetadata index,
-                                                    Mutation.SimpleBuilder builder)
+    private static void dropIndexFromSchemaMutation(TableMetadata table, IndexMetadata index, Mutation.SimpleBuilder builder)
     {
-        builder.update(Indexes).row(table.cfName, index.name).delete();
+        builder.update(Indexes).row(table.name, index.name).delete();
     }
 
-    private static void addUpdatedIndexToSchemaMutation(CFMetaData table,
+    private static void addUpdatedIndexToSchemaMutation(TableMetadata table,
                                                         IndexMetadata index,
                                                         Mutation.SimpleBuilder builder)
     {
         addIndexToSchemaMutation(table, index, builder);
     }
 
-    public static Mutation.SimpleBuilder makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+    static Mutation.SimpleBuilder makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
@@ -803,7 +795,7 @@ public final class SchemaKeyspace
     static void addFunctionToSchemaMutation(UDFunction function, Mutation.SimpleBuilder builder)
     {
         builder.update(Functions)
-               .row(function.name().name, functionArgumentsList(function))
+               .row(function.name().name, function.argumentsList())
                .add("body", function.body())
                .add("language", function.language())
                .add("return_type", function.returnType().asCQL3Type().toString())
@@ -823,24 +815,15 @@ public final class SchemaKeyspace
         }
     }
 
-    private static List<String> functionArgumentsList(AbstractFunction fun)
-    {
-        return fun.argTypes()
-                  .stream()
-                  .map(AbstractType::asCQL3Type)
-                  .map(CQL3Type::toString)
-                  .collect(toList());
-    }
-
-    public static Mutation.SimpleBuilder makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+    static Mutation.SimpleBuilder makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        builder.update(Functions).row(function.name().name, functionArgumentsList(function)).delete();
+        builder.update(Functions).row(function.name().name, function.argumentsList()).delete();
         return builder;
     }
 
-    public static Mutation.SimpleBuilder makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+    static Mutation.SimpleBuilder makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
@@ -851,7 +834,7 @@ public final class SchemaKeyspace
     static void addAggregateToSchemaMutation(UDAggregate aggregate, Mutation.SimpleBuilder builder)
     {
         builder.update(Aggregates)
-               .row(aggregate.name().name, functionArgumentsList(aggregate))
+               .row(aggregate.name().name, aggregate.argumentsList())
                .add("return_type", aggregate.returnType().asCQL3Type().toString())
                .add("state_func", aggregate.stateFunction().name().name)
                .add("state_type", aggregate.stateType().asCQL3Type().toString())
@@ -862,11 +845,11 @@ public final class SchemaKeyspace
                                 : null);
     }
 
-    public static Mutation.SimpleBuilder makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+    static Mutation.SimpleBuilder makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        builder.update(Aggregates).row(aggregate.name().name, functionArgumentsList(aggregate)).delete();
+        builder.update(Aggregates).row(aggregate.name().name, aggregate.argumentsList()).delete();
         return builder;
     }
 
@@ -874,7 +857,7 @@ public final class SchemaKeyspace
      * Fetching schema
      */
 
-    public static Keyspaces fetchNonSystemKeyspaces()
+    static Keyspaces fetchNonSystemKeyspaces()
     {
         return fetchKeyspacesWithout(SchemaConstants.SYSTEM_KEYSPACE_NAMES);
     }
@@ -893,20 +876,6 @@ public final class SchemaKeyspace
         return keyspaces.build();
     }
 
-    private static Keyspaces fetchKeyspacesOnly(Set<String> includedKeyspaceNames)
-    {
-        /*
-         * We know the keyspace names we are going to query, but we still want to run the SELECT IN
-         * query, to filter out the keyspaces that had been dropped by the applied mutation set.
-         */
-        String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
-
-        Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
-        for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames)))
-            keyspaces.add(fetchKeyspace(row.getString("keyspace_name")));
-        return keyspaces.build();
-    }
-
     private static KeyspaceMetadata fetchKeyspace(String keyspaceName)
     {
         KeyspaceParams params = fetchKeyspaceParams(keyspaceName);
@@ -952,7 +921,7 @@ public final class SchemaKeyspace
         return tables.build();
     }
 
-    private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types)
+    private static TableMetadata fetchTable(String keyspaceName, String tableName, Types types)
     {
         String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, TABLES);
         UntypedResultSet rows = query(query, keyspaceName, tableName);
@@ -960,43 +929,17 @@ public final class SchemaKeyspace
             throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName));
         UntypedResultSet.Row row = rows.one();
 
-        UUID id = row.getUUID("id");
-
-        Set<CFMetaData.Flag> flags = CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance));
-
-        boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
-        boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
-        boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
-        boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
-
-        List<ColumnDefinition> columns = fetchColumns(keyspaceName, tableName, types);
-        if (!columns.stream().anyMatch(ColumnDefinition::isPartitionKey))
-        {
-            String msg = String.format("Table %s.%s did not have any partition key columns in the schema tables", keyspaceName, tableName);
-            throw new AssertionError(msg);
-        }
-
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, tableName);
-        Indexes indexes = fetchIndexes(keyspaceName, tableName);
-        Triggers triggers = fetchTriggers(keyspaceName, tableName);
-
-        return CFMetaData.create(keyspaceName,
-                                 tableName,
-                                 id,
-                                 isDense,
-                                 isCompound,
-                                 isSuper,
-                                 isCounter,
-                                 false,
-                                 columns,
-                                 DatabaseDescriptor.getPartitioner())
-                         .params(createTableParamsFromRow(row))
-                         .droppedColumns(droppedColumns)
-                         .indexes(indexes)
-                         .triggers(triggers);
+        return TableMetadata.builder(keyspaceName, tableName, TableId.fromUUID(row.getUUID("id")))
+                            .flags(TableMetadata.Flag.fromStringSet(row.getFrozenSet("flags", UTF8Type.instance)))
+                            .params(createTableParamsFromRow(row))
+                            .addColumns(fetchColumns(keyspaceName, tableName, types))
+                            .droppedColumns(fetchDroppedColumns(keyspaceName, tableName))
+                            .indexes(fetchIndexes(keyspaceName, tableName))
+                            .triggers(fetchTriggers(keyspaceName, tableName))
+                            .build();
     }
 
-    public static TableParams createTableParamsFromRow(UntypedResultSet.Row row)
+    static TableParams createTableParamsFromRow(UntypedResultSet.Row row)
     {
         return TableParams.builder()
                           .bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
@@ -1014,29 +957,29 @@ public final class SchemaKeyspace
                           .readRepairChance(row.getDouble("read_repair_chance"))
                           .crcCheckChance(row.getDouble("crc_check_chance"))
                           .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")))
-                          .cdc(row.has("cdc") ? row.getBoolean("cdc") : false)
+                          .cdc(row.has("cdc") && row.getBoolean("cdc"))
                           .build();
     }
 
-    private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types)
+    private static List<ColumnMetadata> fetchColumns(String keyspace, String table, Types types)
     {
         String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, COLUMNS);
-        List<ColumnDefinition> columns = new ArrayList<>();
+        List<ColumnMetadata> columns = new ArrayList<>();
         query(query, keyspace, table).forEach(row -> columns.add(createColumnFromRow(row, types)));
         return columns;
     }
 
-    public static ColumnDefinition createColumnFromRow(UntypedResultSet.Row row, Types types)
+    static ColumnMetadata createColumnFromRow(UntypedResultSet.Row row, Types types)
     {
         String keyspace = row.getString("keyspace_name");
         String table = row.getString("table_name");
 
-        ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase());
+        ColumnMetadata.Kind kind = ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
 
         int position = row.getInt("position");
         ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
 
-        AbstractType<?> type = parse(keyspace, row.getString("type"), types);
+        AbstractType<?> type = CQLTypeParser.parse(keyspace, row.getString("type"), types);
         if (order == ClusteringOrder.DESC)
             type = ReversedType.getInstance(type);
 
@@ -1044,38 +987,41 @@ public final class SchemaKeyspace
                                                              row.getBytes("column_name_bytes"),
                                                              row.getString("column_name"));
 
-        return new ColumnDefinition(keyspace, table, name, type, position, kind);
+        return new ColumnMetadata(keyspace, table, name, type, position, kind);
     }
 
-    private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table)
+    private static Map<ByteBuffer, DroppedColumn> fetchDroppedColumns(String keyspace, String table)
     {
         String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, DROPPED_COLUMNS);
-        Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
+        Map<ByteBuffer, DroppedColumn> columns = new HashMap<>();
         for (UntypedResultSet.Row row : query(query, keyspace, table))
         {
-            CFMetaData.DroppedColumn column = createDroppedColumnFromRow(row);
-            columns.put(UTF8Type.instance.decompose(column.name), column);
+            DroppedColumn column = createDroppedColumnFromRow(row);
+            columns.put(column.column.name.bytes, column);
         }
         return columns;
     }
 
-    private static CFMetaData.DroppedColumn createDroppedColumnFromRow(UntypedResultSet.Row row)
+    private static DroppedColumn createDroppedColumnFromRow(UntypedResultSet.Row row)
     {
         String keyspace = row.getString("keyspace_name");
+        String table = row.getString("table_name");
         String name = row.getString("column_name");
         /*
          * we never store actual UDT names in dropped column types (so that we can safely drop types if nothing refers to
          * them anymore), so before storing dropped columns in schema we expand UDTs to tuples. See expandUserTypes method.
          * Because of that, we can safely pass Types.none() to parse()
          */
-        AbstractType<?> type = parse(keyspace, row.getString("type"), org.apache.cassandra.schema.Types.none());
+        AbstractType<?> type = CQLTypeParser.parse(keyspace, row.getString("type"), org.apache.cassandra.schema.Types.none());
+        ColumnMetadata.Kind kind = row.has("kind")
+                                 ? ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase())
+                                 : ColumnMetadata.Kind.REGULAR;
+        assert kind == ColumnMetadata.Kind.REGULAR || kind == ColumnMetadata.Kind.STATIC
+            : "Unexpected dropped column kind: " + kind.toString();
+
+        ColumnMetadata column = new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, ColumnMetadata.NO_POSITION, kind);
         long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
-        ColumnDefinition.Kind kind = row.has("kind")
-                                     ? ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase())
-                                     : ColumnDefinition.Kind.REGULAR;
-        assert kind == ColumnDefinition.Kind.REGULAR || kind == ColumnDefinition.Kind.STATIC
-            : "Unexpected dropped column kind: " + kind;
-        return new CFMetaData.DroppedColumn(name, type, droppedTime, kind);
+        return new DroppedColumn(column, droppedTime);
     }
 
     private static Indexes fetchIndexes(String keyspace, String table)
@@ -1119,7 +1065,7 @@ public final class SchemaKeyspace
         return views.build();
     }
 
-    private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types)
+    private static ViewMetadata fetchView(String keyspaceName, String viewName, Types types)
     {
         String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, VIEWS);
         UntypedResultSet rows = query(query, keyspaceName, viewName);
@@ -1127,33 +1073,25 @@ public final class SchemaKeyspace
             throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName));
         UntypedResultSet.Row row = rows.one();
 
-        UUID id = row.getUUID("id");
-        UUID baseTableId = row.getUUID("base_table_id");
+        TableId baseTableId = TableId.fromUUID(row.getUUID("base_table_id"));
         String baseTableName = row.getString("base_table_name");
         boolean includeAll = row.getBoolean("include_all_columns");
         String whereClause = row.getString("where_clause");
 
-        List<ColumnDefinition> columns = fetchColumns(keyspaceName, viewName, types);
+        List<ColumnMetadata> columns = fetchColumns(keyspaceName, viewName, types);
 
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, viewName);
-
-        CFMetaData cfm = CFMetaData.create(keyspaceName,
-                                           viewName,
-                                           id,
-                                           false,
-                                           true,
-                                           false,
-                                           false,
-                                           true,
-                                           columns,
-                                           DatabaseDescriptor.getPartitioner())
-                                   .params(createTableParamsFromRow(row))
-                                   .droppedColumns(droppedColumns);
+        TableMetadata metadata =
+            TableMetadata.builder(keyspaceName, viewName, TableId.fromUUID(row.getUUID("id")))
+                         .isView(true)
+                         .addColumns(columns)
+                         .droppedColumns(fetchDroppedColumns(keyspaceName, viewName))
+                         .params(createTableParamsFromRow(row))
+                         .build();
 
-            String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause);
-            SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+        String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause);
+        SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
 
-            return new ViewDefinition(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm);
+        return new ViewMetadata(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, metadata);
     }
 
     private static Functions fetchFunctions(String keyspaceName, Types types)
@@ -1189,14 +1127,18 @@ public final class SchemaKeyspace
 
         List<AbstractType<?>> argTypes = new ArrayList<>();
         for (String type : row.getFrozenList("argument_types", UTF8Type.instance))
-            argTypes.add(parse(ksName, type, types));
+            argTypes.add(CQLTypeParser.parse(ksName, type, types));
 
-        AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types);
+        AbstractType<?> returnType = CQLTypeParser.parse(ksName, row.getString("return_type"), types);
 
         String language = row.getString("language");
         String body = row.getString("body");
         boolean calledOnNullInput = row.getBoolean("called_on_null_input");
 
+        /*
+         * TODO: find a way to get rid of Schema.instance dependency; evaluate if the opimisation below makes a difference
+         * in the first place. Remove if it isn't.
+         */
         org.apache.cassandra.cql3.functions.Function existing = Schema.instance.findFunction(name, argTypes).orElse(null);
         if (existing instanceof UDFunction)
         {
@@ -1205,7 +1147,8 @@ public final class SchemaKeyspace
             // statement, since CreateFunctionStatement needs to execute UDFunction.create but schema migration
             // also needs that (since it needs to handle its own change).
             UDFunction udf = (UDFunction) existing;
-            if (udf.argNames().equals(argNames) && // arg types checked in Functions.find call
+            if (udf.argNames().equals(argNames) &&
+                udf.argTypes().equals(argTypes) &&
                 udf.returnType().equals(returnType) &&
                 !udf.isAggregate() &&
                 udf.language().equals(language) &&
@@ -1247,14 +1190,14 @@ public final class SchemaKeyspace
         List<AbstractType<?>> argTypes =
             row.getFrozenList("argument_types", UTF8Type.instance)
                .stream()
-               .map(t -> parse(ksName, t, types))
+               .map(t -> CQLTypeParser.parse(ksName, t, types))
                .collect(toList());
 
-        AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types);
+        AbstractType<?> returnType = CQLTypeParser.parse(ksName, row.getString("return_type"), types);
 
         FunctionName stateFunc = new FunctionName(ksName, (row.getString("state_func")));
         FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
-        AbstractType<?> stateType = row.has("state_type") ? parse(ksName, row.getString("state_type"), types) : null;
+        AbstractType<?> stateType = row.has("state_type") ? CQLTypeParser.parse(ksName, row.getString("state_type"), types) : null;
         ByteBuffer initcond = row.has("initcond") ? Terms.asBytes(ksName, row.getString("initcond"), stateType) : null;
 
         try
@@ -1277,116 +1220,35 @@ public final class SchemaKeyspace
      */
 
     /**
-     * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
-     * (which also involves fs operations on add/drop ks/cf)
-     *
-     * @param mutations the schema changes to apply
-     *
-     * @throws ConfigurationException If one of metadata attributes has invalid value
+     * Computes the set of names of keyspaces affected by the provided schema mutations.
      */
-    public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException
+    static Set<String> affectedKeyspaces(Collection<Mutation> mutations)
     {
-        mergeSchema(mutations);
-        Schema.instance.updateVersionAndAnnounce();
+        // only compare the keyspaces affected by this set of schema mutations
+        return mutations.stream()
+                        .map(m -> UTF8Type.instance.compose(m.key().getKey()))
+                        .collect(toSet());
     }
 
-    public static synchronized void mergeSchema(Collection<Mutation> mutations)
+    static void applyChanges(Collection<Mutation> mutations)
     {
-        // only compare the keyspaces affected by this set of schema mutations
-        Set<String> affectedKeyspaces =
-        mutations.stream()
-                 .map(m -> UTF8Type.instance.compose(m.key().getKey()))
-                 .collect(Collectors.toSet());
-
-        // fetch the current state of schema for the affected keyspaces only
-        Keyspaces before = Schema.instance.getKeyspaces(affectedKeyspaces);
-
-        // apply the schema mutations and flush
         mutations.forEach(Mutation::apply);
-        if (FLUSH_SCHEMA_TABLES)
-            flush();
-
-        // fetch the new state of schema from schema tables (not applied to Schema.instance yet)
-        Keyspaces after = fetchKeyspacesOnly(affectedKeyspaces);
-
-        // deal with the diff
-        MapDifference<String, KeyspaceMetadata> keyspacesDiff = before.diff(after);
-
-        // dropped keyspaces
-        for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnLeft().values())
-        {
-            keyspace.functions.udas().forEach(Schema.instance::dropAggregate);
-            keyspace.functions.udfs().forEach(Schema.instance::dropFunction);
-            keyspace.views.forEach(v -> Schema.instance.dropView(v.ksName, v.viewName));
-            keyspace.tables.forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName));
-            keyspace.types.forEach(Schema.instance::dropType);
-            Schema.instance.dropKeyspace(keyspace.name);
-        }
+        if (SchemaKeyspace.FLUSH_SCHEMA_TABLES)
+            SchemaKeyspace.flush();
+    }
 
-        // new keyspaces
-        for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnRight().values())
-        {
-            Schema.instance.addKeyspace(KeyspaceMetadata.create(keyspace.name, keyspace.params));
-            keyspace.types.forEach(Schema.instance::addType);
-            keyspace.tables.forEach(Schema.instance::addTable);
-            keyspace.views.forEach(Schema.instance::addView);
-            keyspace.functions.udfs().forEach(Schema.instance::addFunction);
-            keyspace.functions.udas().forEach(Schema.instance::addAggregate);
-        }
+    static Keyspaces fetchKeyspaces(Set<String> toFetch)
+    {
+        /*
+         * We know the keyspace names we are going to query, but we still want to run the SELECT IN
+         * query, to filter out the keyspaces that had been dropped by the applied mutation set.
+         */
+        String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, KEYSPACES);
 
-        // updated keyspaces
-        for (Map.Entry<String, MapDifference.ValueDifference<KeyspaceMetadata>> diff : keyspacesDiff.entriesDiffering().entrySet())
-            updateKeyspace(diff.getKey(), diff.getValue().leftValue(), diff.getValue().rightValue());
-    }
-
-    private static void updateKeyspace(String keyspaceName, KeyspaceMetadata keyspaceBefore, KeyspaceMetadata keyspaceAfter)
-    {
-        // calculate the deltas
-        MapDifference<String, CFMetaData> tablesDiff = keyspaceBefore.tables.diff(keyspaceAfter.tables);
-        MapDifference<String, ViewDefinition> viewsDiff = keyspaceBefore.views.diff(keyspaceAfter.views);
-        MapDifference<ByteBuffer, UserType> typesDiff = keyspaceBefore.types.diff(keyspaceAfter.types);
-
-        Map<Pair<FunctionName, List<String>>, UDFunction> udfsBefore = new HashMap<>();
-        keyspaceBefore.functions.udfs().forEach(f -> udfsBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f));
-        Map<Pair<FunctionName, List<String>>, UDFunction> udfsAfter = new HashMap<>();
-        keyspaceAfter.functions.udfs().forEach(f -> udfsAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f));
-        MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = Maps.difference(udfsBefore, udfsAfter);
-
-        Map<Pair<FunctionName, List<String>>, UDAggregate> udasBefore = new HashMap<>();
-        keyspaceBefore.functions.udas().forEach(f -> udasBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f));
-        Map<Pair<FunctionName, List<String>>, UDAggregate> udasAfter = new HashMap<>();
-        keyspaceAfter.functions.udas().forEach(f -> udasAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f));
-        MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff = Maps.difference(udasBefore, udasAfter);
-
-        // update keyspace params, if changed
-        if (!keyspaceBefore.params.equals(keyspaceAfter.params))
-            Schema.instance.updateKeyspace(keyspaceName, keyspaceAfter.params);
-
-        // drop everything removed
-        udasDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropAggregate);
-        udfsDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropFunction);
-        viewsDiff.entriesOnlyOnLeft().values().forEach(v -> Schema.instance.dropView(v.ksName, v.viewName));
-        tablesDiff.entriesOnlyOnLeft().values().forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName));
-        typesDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropType);
-
-        // add everything created
-        typesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addType);
-        tablesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addTable);
-        viewsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addView);
-        udfsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addFunction);
-        udasDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addAggregate);
-
-        // update everything altered
-        for (MapDifference.ValueDifference<UserType> diff : typesDiff.entriesDiffering().values())
-            Schema.instance.updateType(diff.rightValue());
-        for (MapDifference.ValueDifference<CFMetaData> diff : tablesDiff.entriesDiffering().values())
-            Schema.instance.updateTable(diff.rightValue());
-        for (MapDifference.ValueDifference<ViewDefinition> diff : viewsDiff.entriesDiffering().values())
-            Schema.instance.updateView(diff.rightValue());
-        for (MapDifference.ValueDifference<UDFunction> diff : udfsDiff.entriesDiffering().values())
-            Schema.instance.updateFunction(diff.rightValue());
-        for (MapDifference.ValueDifference<UDAggregate> diff : udasDiff.entriesDiffering().values())
-            Schema.instance.updateAggregate(diff.rightValue());
+        Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
+        for (UntypedResultSet.Row row : query(query, new ArrayList<>(toFetch)))
+            keyspaces.add(fetchKeyspace(row.getString("keyspace_name")));
+        return keyspaces.build();
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaPullVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaPullVerbHandler.java b/src/java/org/apache/cassandra/schema/SchemaPullVerbHandler.java
new file mode 100644
index 0000000..45cf365
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaPullVerbHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Sends it's current schema state in form of mutations in reply to the remote node's request.
+ * Such a request is made when one of the nodes, by means of Gossip, detects schema disagreement in the ring.
+ */
+public final class SchemaPullVerbHandler implements IVerbHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(SchemaPullVerbHandler.class);
+
+    public void doVerb(MessageIn message, int id)
+    {
+        logger.trace("Received schema pull request from {}", message.from);
+
+        MessageOut<Collection<Mutation>> response =
+            new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
+                             SchemaKeyspace.convertSchemaToMutations(),
+                             MigrationManager.MigrationsSerializer.instance);
+
+        MessagingService.instance().sendReply(response, id, message.from);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java b/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java
new file mode 100644
index 0000000..f939cda
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaPushVerbHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+
+/**
+ * Called when node receives updated schema state from the schema migration coordinator node.
+ * Such happens when user makes local schema migration on one of the nodes in the ring
+ * (which is going to act as coordinator) and that node sends (pushes) it's updated schema state
+ * (in form of mutations) to all the alive nodes in the cluster.
+ */
+public final class SchemaPushVerbHandler implements IVerbHandler<Collection<Mutation>>
+{
+    private static final Logger logger = LoggerFactory.getLogger(SchemaPushVerbHandler.class);
+
+    public void doVerb(final MessageIn<Collection<Mutation>> message, int id)
+    {
+        logger.trace("Received schema push request from {}", message.from);
+
+        StageManager.getStage(Stage.MIGRATION).submit(() -> Schema.instance.mergeAndAnnounceVersion(message.payload));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaVersionVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaVersionVerbHandler.java b/src/java/org/apache/cassandra/schema/SchemaVersionVerbHandler.java
new file mode 100644
index 0000000..0a506e3
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaVersionVerbHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public final class SchemaVersionVerbHandler implements IVerbHandler
+{
+    private final Logger logger = LoggerFactory.getLogger(SchemaVersionVerbHandler.class);
+
+    public void doVerb(MessageIn message, int id)
+    {
+        logger.trace("Received schema version request from {}", message.from);
+
+        MessageOut<UUID> response =
+            new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
+                             Schema.instance.getVersion(),
+                             UUIDSerializer.serializer);
+
+        MessagingService.instance().sendReply(response, id, message.from);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/TableId.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java
new file mode 100644
index 0000000..4b2592e
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * The unique identifier of a table.
+ * <p>
+ * This is essentially a UUID, but we wrap it as it's used quite a bit in the code and having a nicely name class make
+ * the code more readable.
+ */
+public class TableId
+{
+    private final UUID id;
+
+    private TableId(UUID id)
+    {
+        this.id = id;
+    }
+
+    public static TableId fromUUID(UUID id)
+    {
+        return new TableId(id);
+    }
+
+    public static TableId generate()
+    {
+        return new TableId(UUIDGen.getTimeUUID());
+    }
+
+    public static TableId fromString(String idString)
+    {
+        return new TableId(UUID.fromString(idString));
+    }
+
+    /**
+     * Creates the UUID of a system table.
+     *
+     * This is deterministically based on the table name as system tables are hardcoded and initialized independently
+     * on each node (they don't go through a CREATE), but we still want them to have the same ID everywhere.
+     *
+     * We shouldn't use this for any other table.
+     */
+    public static TableId forSystemTable(String keyspace, String table)
+    {
+        assert SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(keyspace)
+               || SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(keyspace);
+        return new TableId(UUID.nameUUIDFromBytes(ArrayUtils.addAll(keyspace.getBytes(), table.getBytes())));
+    }
+
+    public String toHexString()
+    {
+        return ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(id));
+    }
+
+    public UUID asUUID()
+    {
+        return id;
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        return id.hashCode();
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        return this == o || (o instanceof TableId && this.id.equals(((TableId) o).id));
+    }
+
+    @Override
+    public String toString()
+    {
+        return id.toString();
+    }
+
+    public void serialize(DataOutput out) throws IOException
+    {
+        out.writeLong(id.getMostSignificantBits());
+        out.writeLong(id.getLeastSignificantBits());
+    }
+
+    public int serializedSize()
+    {
+        return 16;
+    }
+
+    public static TableId deserialize(DataInput in) throws IOException
+    {
+        return new TableId(new UUID(in.readLong(), in.readLong()));
+    }
+}