You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/17 00:08:11 UTC

[2/5] cassandra git commit: Isolate schema serializaton code

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
new file mode 100644
index 0000000..047698c
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
@@ -0,0 +1,1480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
+import static org.apache.cassandra.utils.FBUtilities.json;
+
+/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
+public class LegacySchemaTables
+{
+    private static final Logger logger = LoggerFactory.getLogger(LegacySchemaTables.class);
+
+    public static final String KEYSPACES = "schema_keyspaces";
+    public static final String COLUMNFAMILIES = "schema_columnfamilies";
+    public static final String COLUMNS = "schema_columns";
+    public static final String TRIGGERS = "schema_triggers";
+    public static final String USERTYPES = "schema_usertypes";
+    public static final String FUNCTIONS = "schema_functions";
+    public static final String AGGREGATES = "schema_aggregates";
+
+    public static final List<String> ALL = Arrays.asList(KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USERTYPES, FUNCTIONS, AGGREGATES);
+
+    private static final CFMetaData Keyspaces =
+        compile(KEYSPACES,
+                "keyspace definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "durable_writes boolean,"
+                + "strategy_class text,"
+                + "strategy_options text,"
+                + "PRIMARY KEY ((keyspace_name))) "
+                + "WITH COMPACT STORAGE");
+
+    private static final CFMetaData Columnfamilies =
+        compile(COLUMNFAMILIES,
+                "table definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "columnfamily_name text,"
+                + "bloom_filter_fp_chance double,"
+                + "caching text,"
+                + "cf_id uuid," // post-2.1 UUID cfid
+                + "comment text,"
+                + "compaction_strategy_class text,"
+                + "compaction_strategy_options text,"
+                + "comparator text,"
+                + "compression_parameters text,"
+                + "default_time_to_live int,"
+                + "default_validator text,"
+                + "dropped_columns map<text, bigint>,"
+                + "gc_grace_seconds int,"
+                + "is_dense boolean,"
+                + "key_validator text,"
+                + "local_read_repair_chance double,"
+                + "max_compaction_threshold int,"
+                + "max_index_interval int,"
+                + "memtable_flush_period_in_ms int,"
+                + "min_compaction_threshold int,"
+                + "min_index_interval int,"
+                + "read_repair_chance double,"
+                + "speculative_retry text,"
+                + "subcomparator text,"
+                + "type text,"
+                + "PRIMARY KEY ((keyspace_name), columnfamily_name))");
+
+    private static final CFMetaData Columns =
+        compile(COLUMNS,
+                "column definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "columnfamily_name text,"
+                + "column_name text,"
+                + "component_index int,"
+                + "index_name text,"
+                + "index_options text,"
+                + "index_type text,"
+                + "type text,"
+                + "validator text,"
+                + "PRIMARY KEY ((keyspace_name), columnfamily_name, column_name))");
+
+    private static final CFMetaData Triggers =
+        compile(TRIGGERS,
+                "trigger definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "columnfamily_name text,"
+                + "trigger_name text,"
+                + "trigger_options map<text, text>,"
+                + "PRIMARY KEY ((keyspace_name), columnfamily_name, trigger_name))");
+
+    private static final CFMetaData Usertypes =
+        compile(USERTYPES,
+                "user defined type definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "type_name text,"
+                + "field_names list<text>,"
+                + "field_types list<text>,"
+                + "PRIMARY KEY ((keyspace_name), type_name))");
+
+    private static final CFMetaData Functions =
+        compile(FUNCTIONS,
+                "user defined function definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "function_name text,"
+                + "signature blob,"
+                + "argument_names list<text>,"
+                + "argument_types list<text>,"
+                + "body text,"
+                + "is_deterministic boolean,"
+                + "language text,"
+                + "return_type text,"
+                + "PRIMARY KEY ((keyspace_name), function_name, signature))");
+
+    private static final CFMetaData Aggregates =
+        compile(AGGREGATES,
+                "user defined aggregate definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "aggregate_name text,"
+                + "signature blob,"
+                + "argument_types list<text>,"
+                + "final_func text,"
+                + "initcond blob,"
+                + "return_type text,"
+                + "state_func text,"
+                + "state_type text,"
+                + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
+
+    public static final List<CFMetaData> All = Arrays.asList(Keyspaces, Columnfamilies, Columns, Triggers, Usertypes, Functions, Aggregates);
+
+    private static CFMetaData compile(String name, String description, String schema)
+    {
+        return CFMetaData.compile(String.format(schema, name), SystemKeyspace.NAME)
+                         .comment(description)
+                         .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7));
+    }
+
+    /** add entries to system.schema_* for the hardcoded system definitions */
+    public static void saveSystemKeyspaceSchema()
+    {
+        KSMetaData keyspace = Schema.instance.getKSMetaData(SystemKeyspace.NAME);
+        // delete old, possibly obsolete entries in schema tables
+        for (String table : ALL)
+            executeOnceInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ?", table), keyspace.name);
+        // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added)
+        makeCreateKeyspaceMutation(keyspace, FBUtilities.timestampMicros() + 1).apply();
+    }
+
+    public static Collection<KSMetaData> readSchemaFromSystemTables()
+    {
+        List<Row> serializedSchema = getSchemaPartitionsForTable(KEYSPACES);
+
+        List<KSMetaData> keyspaces = new ArrayList<>(serializedSchema.size());
+
+        for (Row partition : serializedSchema)
+        {
+            if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition))
+                continue;
+
+            keyspaces.add(createKeyspaceFromSchemaPartitions(partition,
+                                                             readSchemaPartitionForKeyspace(COLUMNFAMILIES, partition.key),
+                                                             readSchemaPartitionForKeyspace(USERTYPES, partition.key)));
+
+            // Will be moved away in #6717
+            for (UDFunction function : createFunctionsFromFunctionsPartition(readSchemaPartitionForKeyspace(FUNCTIONS, partition.key)).values())
+                org.apache.cassandra.cql3.functions.Functions.addFunction(function);
+
+            // Will be moved away in #6717
+            for (UDAggregate aggregate : createAggregatesFromAggregatesPartition(readSchemaPartitionForKeyspace(AGGREGATES, partition.key)).values())
+                org.apache.cassandra.cql3.functions.Functions.addFunction(aggregate);
+        }
+
+        return keyspaces;
+    }
+
+    public static void truncateSchemaTables()
+    {
+        for (String table : ALL)
+            getSchemaCFS(table).truncateBlocking();
+    }
+
+    private static void flushSchemaTables()
+    {
+        for (String table : ALL)
+            SystemKeyspace.forceBlockingFlush(table);
+    }
+
+    /**
+     * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
+     * will be converted into UUID which would act as content-based version of the schema.
+     */
+    public static UUID calculateSchemaDigest()
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("MD5");
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        for (String table : ALL)
+        {
+            for (Row partition : getSchemaPartitionsForTable(table))
+            {
+                if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition))
+                    continue;
+
+                // we want to digest only live columns
+                ColumnFamilyStore.removeDeletedColumnsOnly(partition.cf, Integer.MAX_VALUE, SecondaryIndexManager.nullUpdater);
+                partition.cf.purgeTombstones(Integer.MAX_VALUE);
+                partition.cf.updateDigest(digest);
+            }
+        }
+
+        return UUID.nameUUIDFromBytes(digest.digest());
+    }
+
+    /**
+     * @param schemaTableName The name of the table responsible for part of the schema
+     * @return CFS responsible to hold low-level serialized schema
+     */
+    private static ColumnFamilyStore getSchemaCFS(String schemaTableName)
+    {
+        return Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(schemaTableName);
+    }
+
+    /**
+     * @param schemaTableName The name of the table responsible for part of the schema.
+     * @return low-level schema representation
+     */
+    private static List<Row> getSchemaPartitionsForTable(String schemaTableName)
+    {
+        Token minToken = StorageService.getPartitioner().getMinimumToken();
+        return getSchemaCFS(schemaTableName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
+                                                           null,
+                                                           new IdentityQueryFilter(),
+                                                           Integer.MAX_VALUE,
+                                                           System.currentTimeMillis());
+    }
+
+    public static Collection<Mutation> convertSchemaToMutations()
+    {
+        Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
+
+        for (String table : ALL)
+            convertSchemaToMutations(mutationMap, table);
+
+        return mutationMap.values();
+    }
+
+    private static void convertSchemaToMutations(Map<DecoratedKey, Mutation> mutationMap, String schemaTableName)
+    {
+        for (Row partition : getSchemaPartitionsForTable(schemaTableName))
+        {
+            if (isSystemKeyspaceSchemaPartition(partition))
+                continue;
+
+            Mutation mutation = mutationMap.get(partition.key);
+            if (mutation == null)
+            {
+                mutation = new Mutation(SystemKeyspace.NAME, partition.key.getKey());
+                mutationMap.put(partition.key, mutation);
+            }
+
+            mutation.add(partition.cf);
+        }
+    }
+
+    private static Map<DecoratedKey, ColumnFamily> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
+    {
+        Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
+
+        for (String keyspaceName : keyspaceNames)
+        {
+            Row schemaEntity = readSchemaPartitionForKeyspace(schemaTableName, keyspaceName);
+            if (schemaEntity.cf != null)
+                schema.put(schemaEntity.key, schemaEntity.cf);
+        }
+
+        return schema;
+    }
+
+    private static ByteBuffer getSchemaKSKey(String ksName)
+    {
+        return AsciiType.instance.fromString(ksName);
+    }
+
+    private static Row readSchemaPartitionForKeyspace(String schemaTableName, String keyspaceName)
+    {
+        DecoratedKey keyspaceKey = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
+        return readSchemaPartitionForKeyspace(schemaTableName, keyspaceKey);
+    }
+
+    private static Row readSchemaPartitionForKeyspace(String schemaTableName, DecoratedKey keyspaceKey)
+    {
+        QueryFilter filter = QueryFilter.getIdentityFilter(keyspaceKey, schemaTableName, System.currentTimeMillis());
+        return new Row(keyspaceKey, getSchemaCFS(schemaTableName).getColumnFamily(filter));
+    }
+
+    private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName)
+    {
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
+        ColumnFamilyStore store = getSchemaCFS(schemaTableName);
+        Composite prefix = store.getComparator().make(tableName);
+        ColumnFamily cells = store.getColumnFamily(key, prefix, prefix.end(), false, Integer.MAX_VALUE, System.currentTimeMillis());
+        return new Row(key, cells);
+    }
+
+    private static boolean isEmptySchemaPartition(Row partition)
+    {
+        return partition.cf == null || (partition.cf.isMarkedForDelete() && !partition.cf.hasColumns());
+    }
+
+    private static boolean isSystemKeyspaceSchemaPartition(Row partition)
+    {
+        return getSchemaKSKey(SystemKeyspace.NAME).equals(partition.key.getKey());
+    }
+
+    /**
+     * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
+     * (which also involves fs operations on add/drop ks/cf)
+     *
+     * @param mutations the schema changes to apply
+     *
+     * @throws ConfigurationException If one of metadata attributes has invalid value
+     * @throws IOException If data was corrupted during transportation or failed to apply fs operations
+     */
+    public static synchronized void mergeSchema(Collection<Mutation> mutations) throws ConfigurationException, IOException
+    {
+        mergeSchema(mutations, true);
+        Schema.instance.updateVersionAndAnnounce();
+    }
+
+    public static synchronized void mergeSchema(Collection<Mutation> mutations, boolean doFlush) throws IOException
+    {
+        // compare before/after schemas of the affected keyspaces only
+        Set<String> keyspaces = new HashSet<>(mutations.size());
+        for (Mutation mutation : mutations)
+            keyspaces.add(ByteBufferUtil.string(mutation.key()));
+
+        // current state of the schema
+        Map<DecoratedKey, ColumnFamily> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
+        Map<DecoratedKey, ColumnFamily> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
+        Map<DecoratedKey, ColumnFamily> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
+        Map<DecoratedKey, ColumnFamily> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
+        Map<DecoratedKey, ColumnFamily> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
+
+        for (Mutation mutation : mutations)
+            mutation.apply();
+
+        if (doFlush)
+            flushSchemaTables();
+
+        // with new data applied
+        Map<DecoratedKey, ColumnFamily> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
+        Map<DecoratedKey, ColumnFamily> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
+        Map<DecoratedKey, ColumnFamily> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
+        Map<DecoratedKey, ColumnFamily> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
+        Map<DecoratedKey, ColumnFamily> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
+
+        Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
+        mergeTables(oldColumnFamilies, newColumnFamilies);
+        mergeTypes(oldTypes, newTypes);
+        mergeFunctions(oldFunctions, newFunctions);
+        mergeAggregates(oldAggregates, newAggregates);
+
+        // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
+        for (String keyspaceToDrop : keyspacesToDrop)
+            Schema.instance.dropKeyspace(keyspaceToDrop);
+    }
+
+    private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    {
+        List<Row> created = new ArrayList<>();
+        List<String> altered = new ArrayList<>();
+        Set<String> dropped = new HashSet<>();
+
+        /*
+         * - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us
+         * - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily
+         *   there that only has the top-level deletion, if:
+         *      a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place
+         *      b) a pulled dropped keyspace that got dropped before it could find a way to this node
+         * - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns:
+         *   that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way
+         *   to this node
+         */
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+            if (entry.getValue().hasColumns())
+                created.add(new Row(entry.getKey(), entry.getValue()));
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        {
+            String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
+
+            ColumnFamily pre  = entry.getValue().leftValue();
+            ColumnFamily post = entry.getValue().rightValue();
+
+            if (pre.hasColumns() && post.hasColumns())
+                altered.add(keyspaceName);
+            else if (pre.hasColumns())
+                dropped.add(keyspaceName);
+            else if (post.hasColumns()) // a (re)created keyspace
+                created.add(new Row(entry.getKey(), post));
+        }
+
+        for (Row row : created)
+            Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(row));
+        for (String name : altered)
+            Schema.instance.updateKeyspace(name);
+        return dropped;
+    }
+
+    // see the comments for mergeKeyspaces()
+    private static void mergeTables(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    {
+        List<CFMetaData> created = new ArrayList<>();
+        List<CFMetaData> altered = new ArrayList<>();
+        List<CFMetaData> dropped = new ArrayList<>();
+
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+            if (entry.getValue().hasColumns())
+                created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), entry.getValue())).values());
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        {
+            String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
+
+            ColumnFamily pre  = entry.getValue().leftValue();
+            ColumnFamily post = entry.getValue().rightValue();
+
+            if (pre.hasColumns() && post.hasColumns())
+            {
+                MapDifference<String, CFMetaData> delta =
+                    Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(),
+                                    createTablesFromTablesPartition(new Row(entry.getKey(), post)));
+
+                dropped.addAll(delta.entriesOnlyOnLeft().values());
+                created.addAll(delta.entriesOnlyOnRight().values());
+                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<CFMetaData>, CFMetaData>()
+                {
+                    public CFMetaData apply(MapDifference.ValueDifference<CFMetaData> pair)
+                    {
+                        return pair.rightValue();
+                    }
+                }));
+            }
+            else if (pre.hasColumns())
+            {
+                dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values());
+            }
+            else if (post.hasColumns())
+            {
+                created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), post)).values());
+            }
+        }
+
+        for (CFMetaData cfm : created)
+            Schema.instance.addTable(cfm);
+        for (CFMetaData cfm : altered)
+            Schema.instance.updateTable(cfm.ksName, cfm.cfName);
+        for (CFMetaData cfm : dropped)
+            Schema.instance.dropTable(cfm.ksName, cfm.cfName);
+    }
+
+    // see the comments for mergeKeyspaces()
+    private static void mergeTypes(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    {
+        List<UserType> created = new ArrayList<>();
+        List<UserType> altered = new ArrayList<>();
+        List<UserType> dropped = new ArrayList<>();
+
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+
+        // New keyspace with types
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+            if (entry.getValue().hasColumns())
+                created.addAll(createTypesFromPartition(new Row(entry.getKey(), entry.getValue())).values());
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        {
+            String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
+
+            ColumnFamily pre  = entry.getValue().leftValue();
+            ColumnFamily post = entry.getValue().rightValue();
+
+            if (pre.hasColumns() && post.hasColumns())
+            {
+                MapDifference<ByteBuffer, UserType> delta =
+                    Maps.difference(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes(),
+                                    createTypesFromPartition(new Row(entry.getKey(), post)));
+
+                dropped.addAll(delta.entriesOnlyOnLeft().values());
+                created.addAll(delta.entriesOnlyOnRight().values());
+                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UserType>, UserType>()
+                {
+                    public UserType apply(MapDifference.ValueDifference<UserType> pair)
+                    {
+                        return pair.rightValue();
+                    }
+                }));
+            }
+            else if (pre.hasColumns())
+            {
+                dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes().values());
+            }
+            else if (post.hasColumns())
+            {
+                created.addAll(createTypesFromPartition(new Row(entry.getKey(), post)).values());
+            }
+        }
+
+        for (UserType type : created)
+            Schema.instance.addType(type);
+        for (UserType type : altered)
+            Schema.instance.updateType(type);
+        for (UserType type : dropped)
+            Schema.instance.dropType(type);
+    }
+
+    // see the comments for mergeKeyspaces()
+    private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    {
+        List<UDFunction> created = new ArrayList<>();
+        List<UDFunction> altered = new ArrayList<>();
+        List<UDFunction> dropped = new ArrayList<>();
+
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+
+        // New keyspace with functions
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+            if (entry.getValue().hasColumns())
+                created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), entry.getValue())).values());
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        {
+            ColumnFamily pre = entry.getValue().leftValue();
+            ColumnFamily post = entry.getValue().rightValue();
+
+            if (pre.hasColumns() && post.hasColumns())
+            {
+                MapDifference<ByteBuffer, UDFunction> delta =
+                    Maps.difference(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)),
+                                    createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post)));
+
+                dropped.addAll(delta.entriesOnlyOnLeft().values());
+                created.addAll(delta.entriesOnlyOnRight().values());
+                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDFunction>, UDFunction>()
+                {
+                    public UDFunction apply(MapDifference.ValueDifference<UDFunction> pair)
+                    {
+                        return pair.rightValue();
+                    }
+                }));
+            }
+            else if (pre.hasColumns())
+            {
+                dropped.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)).values());
+            }
+            else if (post.hasColumns())
+            {
+                created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post)).values());
+            }
+        }
+
+        for (UDFunction udf : created)
+            Schema.instance.addFunction(udf);
+        for (UDFunction udf : altered)
+            Schema.instance.updateFunction(udf);
+        for (UDFunction udf : dropped)
+            Schema.instance.dropFunction(udf);
+    }
+
+    // see the comments for mergeKeyspaces()
+    private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    {
+        List<UDAggregate> created = new ArrayList<>();
+        List<UDAggregate> altered = new ArrayList<>();
+        List<UDAggregate> dropped = new ArrayList<>();
+
+        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+
+        // New keyspace with functions
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
+            if (entry.getValue().hasColumns())
+                created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), entry.getValue())).values());
+
+        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        {
+            ColumnFamily pre = entry.getValue().leftValue();
+            ColumnFamily post = entry.getValue().rightValue();
+
+            if (pre.hasColumns() && post.hasColumns())
+            {
+                MapDifference<ByteBuffer, UDAggregate> delta =
+                    Maps.difference(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)),
+                                    createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)));
+
+                dropped.addAll(delta.entriesOnlyOnLeft().values());
+                created.addAll(delta.entriesOnlyOnRight().values());
+                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>()
+                {
+                    public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair)
+                    {
+                        return pair.rightValue();
+                    }
+                }));
+            }
+            else if (pre.hasColumns())
+            {
+                dropped.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)).values());
+            }
+            else if (post.hasColumns())
+            {
+                created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)).values());
+            }
+        }
+
+        for (UDAggregate udf : created)
+            Schema.instance.addAggregate(udf);
+        for (UDAggregate udf : altered)
+            Schema.instance.updateAggregate(udf);
+        for (UDAggregate udf : dropped)
+            Schema.instance.dropAggregate(udf);
+    }
+
+    /*
+     * Keyspace metadata serialization/deserialization.
+     */
+
+    public static Mutation makeCreateKeyspaceMutation(KSMetaData keyspace, long timestamp)
+    {
+        return makeCreateKeyspaceMutation(keyspace, timestamp, true);
+    }
+
+    private static Mutation makeCreateKeyspaceMutation(KSMetaData keyspace, long timestamp, boolean withTablesAndTypesAndFunctions)
+    {
+        Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name));
+        ColumnFamily cells = mutation.addOrGet(Keyspaces);
+        CFRowAdder adder = new CFRowAdder(cells, Keyspaces.comparator.builder().build(), timestamp);
+
+        adder.add("durable_writes", keyspace.durableWrites);
+        adder.add("strategy_class", keyspace.strategyClass.getName());
+        adder.add("strategy_options", json(keyspace.strategyOptions));
+
+        if (withTablesAndTypesAndFunctions)
+        {
+            for (UserType type : keyspace.userTypes.getAllTypes().values())
+                addTypeToSchemaMutation(type, timestamp, mutation);
+
+            for (CFMetaData table : keyspace.cfMetaData().values())
+                addTableToSchemaMutation(table, timestamp, true, mutation);
+        }
+
+        return mutation;
+    }
+
+    public static Mutation makeDropKeyspaceMutation(KSMetaData keyspace, long timestamp)
+    {
+        Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name));
+        for (String schemaTable : ALL)
+            mutation.delete(schemaTable, timestamp);
+        mutation.delete(SystemKeyspace.BUILT_INDEXES, timestamp);
+        return mutation;
+    }
+
+    private static KSMetaData createKeyspaceFromSchemaPartitions(Row serializedKeyspace, Row serializedTables, Row serializedTypes)
+    {
+        Collection<CFMetaData> tables = createTablesFromTablesPartition(serializedTables).values();
+        UTMetaData types = new UTMetaData(createTypesFromPartition(serializedTypes));
+        return createKeyspaceFromSchemaPartition(serializedKeyspace).cloneWith(tables, types);
+    }
+
+    public static KSMetaData createKeyspaceFromName(String keyspace)
+    {
+        Row partition = readSchemaPartitionForKeyspace(KEYSPACES, keyspace);
+
+        if (isEmptySchemaPartition(partition))
+            throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES));
+
+        return createKeyspaceFromSchemaPartition(partition);
+    }
+
+    /**
+     * Deserialize only Keyspace attributes without nested tables or types
+     *
+     * @param partition Keyspace attributes in serialized form
+     */
+    private static KSMetaData createKeyspaceFromSchemaPartition(Row partition)
+    {
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, KEYSPACES);
+        UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one();
+        try
+        {
+            return new KSMetaData(row.getString("keyspace_name"),
+                                  AbstractReplicationStrategy.getClass(row.getString("strategy_class")),
+                                  fromJsonMap(row.getString("strategy_options")),
+                                  row.getBoolean("durable_writes"));
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /*
+     * User type metadata serialization/deserialization.
+     */
+
+    public static Mutation makeCreateTypeMutation(KSMetaData keyspace, UserType type, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+        addTypeToSchemaMutation(type, timestamp, mutation);
+        return mutation;
+    }
+
+    private static void addTypeToSchemaMutation(UserType type, long timestamp, Mutation mutation)
+    {
+        ColumnFamily cells = mutation.addOrGet(Usertypes);
+
+        Composite prefix = Usertypes.comparator.make(type.name);
+        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+
+        adder.resetCollection("field_names");
+        adder.resetCollection("field_types");
+
+        for (int i = 0; i < type.size(); i++)
+        {
+            adder.addListEntry("field_names", type.fieldName(i));
+            adder.addListEntry("field_types", type.fieldType(i).toString());
+        }
+    }
+
+    public static Mutation dropTypeFromSchemaMutation(KSMetaData keyspace, UserType type, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+
+        ColumnFamily cells = mutation.addOrGet(Usertypes);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        Composite prefix = Usertypes.comparator.make(type.name);
+        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+
+        return mutation;
+    }
+
+    private static Map<ByteBuffer, UserType> createTypesFromPartition(Row partition)
+    {
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, USERTYPES);
+        Map<ByteBuffer, UserType> types = new HashMap<>();
+        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+        {
+            UserType type = createTypeFromRow(row);
+            types.put(type.name, type);
+        }
+        return types;
+    }
+
+    private static UserType createTypeFromRow(UntypedResultSet.Row row)
+    {
+        String keyspace = row.getString("keyspace_name");
+        ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
+        List<String> rawColumns = row.getList("field_names", UTF8Type.instance);
+        List<String> rawTypes = row.getList("field_types", UTF8Type.instance);
+
+        List<ByteBuffer> columns = new ArrayList<>(rawColumns.size());
+        for (String rawColumn : rawColumns)
+            columns.add(ByteBufferUtil.bytes(rawColumn));
+
+        List<AbstractType<?>> types = new ArrayList<>(rawTypes.size());
+        for (String rawType : rawTypes)
+            types.add(parseType(rawType));
+
+        return new UserType(keyspace, name, columns, types);
+    }
+
+    /*
+     * Table metadata serialization/deserialization.
+     */
+
+    public static Mutation makeCreateTableMutation(KSMetaData keyspace, CFMetaData table, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+        addTableToSchemaMutation(table, timestamp, true, mutation);
+        return mutation;
+    }
+
+    private static void addTableToSchemaMutation(CFMetaData table, long timestamp, boolean withColumnsAndTriggers, Mutation mutation)
+    {
+        // For property that can be null (and can be changed), we insert tombstones, to make sure
+        // we don't keep a property the user has removed
+        ColumnFamily cells = mutation.addOrGet(Columnfamilies);
+        Composite prefix = Columnfamilies.comparator.make(table.cfName);
+        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+
+        adder.add("cf_id", table.cfId);
+        adder.add("type", table.cfType.toString());
+
+        if (table.isSuper())
+        {
+            // We need to continue saving the comparator and subcomparator separatly, otherwise
+            // we won't know at deserialization if the subcomparator should be taken into account
+            // TODO: we should implement an on-start migration if we want to get rid of that.
+            adder.add("comparator", table.comparator.subtype(0).toString());
+            adder.add("subcomparator", table.comparator.subtype(1).toString());
+        }
+        else
+        {
+            adder.add("comparator", table.comparator.toString());
+        }
+
+        adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance());
+        adder.add("caching", table.getCaching().toString());
+        adder.add("comment", table.getComment());
+        adder.add("compaction_strategy_class", table.compactionStrategyClass.getName());
+        adder.add("compaction_strategy_options", json(table.compactionStrategyOptions));
+        adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions()));
+        adder.add("default_time_to_live", table.getDefaultTimeToLive());
+        adder.add("default_validator", table.getDefaultValidator().toString());
+        adder.add("gc_grace_seconds", table.getGcGraceSeconds());
+        adder.add("key_validator", table.getKeyValidator().toString());
+        adder.add("local_read_repair_chance", table.getDcLocalReadRepairChance());
+        adder.add("max_compaction_threshold", table.getMaxCompactionThreshold());
+        adder.add("max_index_interval", table.getMaxIndexInterval());
+        adder.add("memtable_flush_period_in_ms", table.getMemtableFlushPeriod());
+        adder.add("min_compaction_threshold", table.getMinCompactionThreshold());
+        adder.add("min_index_interval", table.getMinIndexInterval());
+        adder.add("read_repair_chance", table.getReadRepairChance());
+        adder.add("speculative_retry", table.getSpeculativeRetry().toString());
+
+        for (Map.Entry<ColumnIdentifier, Long> entry : table.getDroppedColumns().entrySet())
+            adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue());
+
+        adder.add("is_dense", table.getIsDense());
+
+        if (withColumnsAndTriggers)
+        {
+            for (ColumnDefinition column : table.allColumns())
+                addColumnToSchemaMutation(table, column, timestamp, mutation);
+
+            for (TriggerDefinition trigger : table.getTriggers().values())
+                addTriggerToSchemaMutation(table, trigger, timestamp, mutation);
+        }
+    }
+
+    public static Mutation makeUpdateTableMutation(KSMetaData keyspace,
+                                                   CFMetaData oldTable,
+                                                   CFMetaData newTable,
+                                                   long timestamp,
+                                                   boolean fromThrift)
+    {
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+
+        addTableToSchemaMutation(newTable, timestamp, false, mutation);
+
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldTable.getColumnMetadata(),
+                                                                                 newTable.getColumnMetadata());
+
+        // columns that are no longer needed
+        for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values())
+        {
+            // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type
+            // are being deleted just because they are not here.
+            if (fromThrift && column.kind != ColumnDefinition.Kind.REGULAR)
+                continue;
+
+            dropColumnFromSchemaMutation(oldTable, column, timestamp, mutation);
+        }
+
+        // newly added columns
+        for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values())
+            addColumnToSchemaMutation(newTable, column, timestamp, mutation);
+
+        // old columns with updated attributes
+        for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
+            addColumnToSchemaMutation(newTable, newTable.getColumnDefinition(name), timestamp, mutation);
+
+        MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(oldTable.getTriggers(), newTable.getTriggers());
+
+        // dropped triggers
+        for (TriggerDefinition trigger : triggerDiff.entriesOnlyOnLeft().values())
+            dropTriggerFromSchemaMutation(oldTable, trigger, timestamp, mutation);
+
+        // newly created triggers
+        for (TriggerDefinition trigger : triggerDiff.entriesOnlyOnRight().values())
+            addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
+
+        return mutation;
+    }
+
+    public static Mutation makeDropTableMutation(KSMetaData keyspace, CFMetaData table, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+
+        ColumnFamily cells = mutation.addOrGet(Columnfamilies);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        Composite prefix = Columnfamilies.comparator.make(table.cfName);
+        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+
+        for (ColumnDefinition column : table.allColumns())
+            dropColumnFromSchemaMutation(table, column, timestamp, mutation);
+
+        for (TriggerDefinition trigger : table.getTriggers().values())
+            dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
+
+        // TODO: get rid of in #6717
+        ColumnFamily indexCells = mutation.addOrGet(SystemKeyspace.BuiltIndexes);
+        for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes())
+            indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp);
+
+        return mutation;
+    }
+
+    public static CFMetaData createTableFromName(String keyspace, String table)
+    {
+        Row partition = readSchemaPartitionForTable(COLUMNFAMILIES, keyspace, table);
+
+        if (isEmptySchemaPartition(partition))
+            throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table));
+
+        return createTableFromTablePartition(partition);
+    }
+
+    /**
+     * Deserialize tables from low-level schema representation, all of them belong to the same keyspace
+     *
+     * @return map containing name of the table and its metadata for faster lookup
+     */
+    private static Map<String, CFMetaData> createTablesFromTablesPartition(Row partition)
+    {
+        if (partition.cf == null)
+            return Collections.emptyMap();
+
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
+        Map<String, CFMetaData> tables = new HashMap<>();
+        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+        {
+            CFMetaData cfm = createTableFromTableRow(row);
+            tables.put(cfm.cfName, cfm);
+        }
+        return tables;
+    }
+
+    public static CFMetaData createTableFromTablePartitionAndColumnsPartition(Row serializedTable, Row serializedColumns)
+    {
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
+        return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns);
+    }
+
+    private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, Row serializedColumns)
+    {
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNS);
+        return createTableFromTableRowAndColumnRows(tableRow, QueryProcessor.resultify(query, serializedColumns));
+    }
+
+    private static CFMetaData createTableFromTablePartition(Row row)
+    {
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
+        return createTableFromTableRow(QueryProcessor.resultify(query, row).one());
+    }
+
+    /**
+     * Deserialize table metadata from low-level representation
+     *
+     * @return Metadata deserialized from schema
+     */
+    private static CFMetaData createTableFromTableRow(UntypedResultSet.Row result)
+    {
+        String ksName = result.getString("keyspace_name");
+        String cfName = result.getString("columnfamily_name");
+
+        Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName);
+        CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns);
+
+        Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName);
+        try
+        {
+            for (TriggerDefinition trigger : createTriggersFromTriggersPartition(serializedTriggers))
+                cfm.addTriggerDefinition(trigger);
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        return cfm;
+    }
+
+    public static CFMetaData createTableFromTableRowAndColumnRows(UntypedResultSet.Row result,
+                                                                  UntypedResultSet serializedColumnDefinitions)
+    {
+        try
+        {
+            String ksName = result.getString("keyspace_name");
+            String cfName = result.getString("columnfamily_name");
+
+            AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
+            AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
+            ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type"));
+
+            AbstractType<?> fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator);
+
+            List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions,
+                                                                            ksName,
+                                                                            cfName,
+                                                                            fullRawComparator,
+                                                                            cfType == ColumnFamilyType.Super);
+
+            boolean isDense = result.has("is_dense")
+                            ? result.getBoolean("is_dense")
+                            : CFMetaData.calculateIsDense(fullRawComparator, columnDefs);
+
+            CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense);
+
+            // if we are upgrading, we use id generated from names initially
+            UUID cfId = result.has("cf_id")
+                      ? result.getUUID("cf_id")
+                      : CFMetaData.generateLegacyCfId(ksName, cfName);
+
+            CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId);
+            cfm.isDense(isDense);
+
+            cfm.readRepairChance(result.getDouble("read_repair_chance"));
+            cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
+            cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
+            cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
+            cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
+            cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
+            cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
+            if (result.has("comment"))
+                cfm.comment(result.getString("comment"));
+            if (result.has("memtable_flush_period_in_ms"))
+                cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
+            cfm.caching(CachingOptions.fromString(result.getString("caching")));
+            if (result.has("default_time_to_live"))
+                cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
+            if (result.has("speculative_retry"))
+                cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry")));
+            cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class")));
+            cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
+            cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
+
+            if (result.has("min_index_interval"))
+                cfm.minIndexInterval(result.getInt("min_index_interval"));
+
+            if (result.has("max_index_interval"))
+                cfm.maxIndexInterval(result.getInt("max_index_interval"));
+
+            if (result.has("bloom_filter_fp_chance"))
+                cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
+            else
+                cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
+
+            if (result.has("dropped_columns"))
+                cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
+
+            for (ColumnDefinition cd : columnDefs)
+                cfm.addOrReplaceColumnDefinition(cd);
+
+            return cfm.rebuild();
+        }
+        catch (SyntaxException | ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw)
+    {
+        Map<ColumnIdentifier, Long> converted = Maps.newHashMap();
+        for (Map.Entry<String, Long> entry : raw.entrySet())
+            converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue());
+        return converted;
+    }
+
+    /*
+     * Column metadata serialization/deserialization.
+     */
+
+    private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
+    {
+        ColumnFamily cells = mutation.addOrGet(Columns);
+        Composite prefix = Columns.comparator.make(table.cfName, column.name.toString());
+        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+
+        adder.add("validator", column.type.toString());
+        adder.add("type", serializeKind(column.kind));
+        adder.add("component_index", column.isOnAllComponents() ? null : column.position());
+        adder.add("index_name", column.getIndexName());
+        adder.add("index_type", column.getIndexType() == null ? null : column.getIndexType().toString());
+        adder.add("index_options", json(column.getIndexOptions()));
+    }
+
+    private static String serializeKind(ColumnDefinition.Kind kind)
+    {
+        // For backward compatibility we need to special case CLUSTERING_COLUMN
+        return kind == ColumnDefinition.Kind.CLUSTERING_COLUMN ? "clustering_key" : kind.toString().toLowerCase();
+    }
+
+    private static ColumnDefinition.Kind deserializeKind(String kind)
+    {
+        if (kind.equalsIgnoreCase("clustering_key"))
+            return ColumnDefinition.Kind.CLUSTERING_COLUMN;
+        return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
+    }
+
+    private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
+    {
+        ColumnFamily cells = mutation.addOrGet(Columns);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
+        Composite prefix = Columns.comparator.make(table.cfName, column.name.toString());
+        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+    }
+
+    private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows,
+                                                                      String keyspace,
+                                                                      String table,
+                                                                      AbstractType<?> rawComparator,
+                                                                      boolean isSuper)
+    {
+        List<ColumnDefinition> columns = new ArrayList<>();
+        for (UntypedResultSet.Row row : rows)
+            columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, isSuper));
+        return columns;
+    }
+
+    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
+                                                              String keyspace,
+                                                              String table,
+                                                              AbstractType<?> rawComparator,
+                                                              boolean isSuper)
+    {
+        ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
+
+        Integer componentIndex = null;
+        if (row.has("component_index"))
+            componentIndex = row.getInt("component_index");
+        else if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN && isSuper)
+            componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
+
+        // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
+        // we need to use the comparator fromString method
+        AbstractType<?> comparator = kind == ColumnDefinition.Kind.REGULAR
+                                   ? getComponentComparator(rawComparator, componentIndex)
+                                   : UTF8Type.instance;
+        ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString("column_name")), comparator);
+
+        AbstractType<?> validator = parseType(row.getString("validator"));
+
+        IndexType indexType = null;
+        if (row.has("index_type"))
+            indexType = IndexType.valueOf(row.getString("index_type"));
+
+        Map<String, String> indexOptions = null;
+        if (row.has("index_options"))
+            indexOptions = fromJsonMap(row.getString("index_options"));
+
+        String indexName = null;
+        if (row.has("index_name"))
+            indexName = row.getString("index_name");
+
+        return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind);
+    }
+
+    private static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex)
+    {
+        return (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType)))
+               ? rawComparator
+               : ((CompositeType)rawComparator).types.get(componentIndex);
+    }
+
+    /*
+     * Trigger metadata serialization/deserialization.
+     */
+
+    private static void addTriggerToSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation)
+    {
+        ColumnFamily cells = mutation.addOrGet(Triggers);
+        Composite prefix = Triggers.comparator.make(table.cfName, trigger.name);
+        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+        adder.addMapEntry("trigger_options", "class", trigger.classOption);
+    }
+
+    private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation)
+    {
+        ColumnFamily cells = mutation.addOrGet(Triggers);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        Composite prefix = Triggers.comparator.make(table.cfName, trigger.name);
+        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+    }
+
+    /**
+     * Deserialize triggers from storage-level representation.
+     *
+     * @param partition storage-level partition containing the trigger definitions
+     * @return the list of processed TriggerDefinitions
+     */
+    private static List<TriggerDefinition> createTriggersFromTriggersPartition(Row partition)
+    {
+        List<TriggerDefinition> triggers = new ArrayList<>();
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, TRIGGERS);
+        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+        {
+            String name = row.getString("trigger_name");
+            String classOption = row.getMap("trigger_options", UTF8Type.instance, UTF8Type.instance).get("class");
+            triggers.add(new TriggerDefinition(name, classOption));
+        }
+        return triggers;
+    }
+
+    /*
+     * UDF metadata serialization/deserialization.
+     */
+
+    public static Mutation makeCreateFunctionMutation(KSMetaData keyspace, UDFunction function, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+        addFunctionToSchemaMutation(function, timestamp, mutation);
+        return mutation;
+    }
+
+    private static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
+    {
+        ColumnFamily cells = mutation.addOrGet(Functions);
+        Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function));
+        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+
+        adder.resetCollection("argument_names");
+        adder.resetCollection("argument_types");
+
+        for (int i = 0; i < function.argNames().size(); i++)
+        {
+            adder.addListEntry("argument_names", function.argNames().get(i).bytes);
+            adder.addListEntry("argument_types", function.argTypes().get(i).toString());
+        }
+
+        adder.add("body", function.body());
+        adder.add("is_deterministic", function.isDeterministic());
+        adder.add("language", function.language());
+        adder.add("return_type", function.returnType().toString());
+    }
+
+    public static Mutation makeDropFunctionMutation(KSMetaData keyspace, UDFunction function, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+
+        ColumnFamily cells = mutation.addOrGet(Functions);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function));
+        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+
+        return mutation;
+    }
+
+    private static Map<ByteBuffer, UDFunction> createFunctionsFromFunctionsPartition(Row partition)
+    {
+        Map<ByteBuffer, UDFunction> functions = new HashMap<>();
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, FUNCTIONS);
+        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+        {
+            UDFunction function = createFunctionFromFunctionRow(row);
+            functions.put(UDHelper.calculateSignature(function), function);
+        }
+        return functions;
+    }
+
+    private static UDFunction createFunctionFromFunctionRow(UntypedResultSet.Row row)
+    {
+        String ksName = row.getString("keyspace_name");
+        String functionName = row.getString("function_name");
+        FunctionName name = new FunctionName(ksName, functionName);
+
+        List<ColumnIdentifier> argNames = new ArrayList<>();
+        if (row.has("argument_names"))
+            for (String arg : row.getList("argument_names", UTF8Type.instance))
+                argNames.add(new ColumnIdentifier(arg, true));
+
+        List<AbstractType<?>> argTypes = new ArrayList<>();
+        if (row.has("argument_types"))
+            for (String type : row.getList("argument_types", UTF8Type.instance))
+                argTypes.add(parseType(type));
+
+        AbstractType<?> returnType = parseType(row.getString("return_type"));
+
+        boolean isDeterministic = row.getBoolean("is_deterministic");
+        String language = row.getString("language");
+        String body = row.getString("body");
+
+        try
+        {
+            return UDFunction.create(name, argNames, argTypes, returnType, language, body, isDeterministic);
+        }
+        catch (InvalidRequestException e)
+        {
+            logger.error(String.format("Cannot load function '%s' from schema: this function won't be available (on this node)", name), e);
+            return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, language, body, e);
+        }
+    }
+
+    /*
+     * Aggregate UDF metadata serialization/deserialization.
+     */
+
+    public static Mutation makeCreateAggregateMutation(KSMetaData keyspace, UDAggregate aggregate, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+        addAggregateToSchemaMutation(aggregate, timestamp, mutation);
+        return mutation;
+    }
+
+    private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
+    {
+        ColumnFamily cells = mutation.addOrGet(Aggregates);
+        Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate));
+        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+
+        adder.resetCollection("argument_types");
+        adder.add("return_type", aggregate.returnType().toString());
+        adder.add("state_func", aggregate.stateFunction().name().name);
+        if (aggregate.stateType() != null)
+            adder.add("state_type", aggregate.stateType().toString());
+        if (aggregate.finalFunction() != null)
+            adder.add("final_func", aggregate.finalFunction().name().name);
+        if (aggregate.initialCondition() != null)
+            adder.add("initcond", aggregate.initialCondition());
+
+        for (AbstractType<?> argType : aggregate.argTypes())
+            adder.addListEntry("argument_types", argType.toString());
+    }
+
+    private static Map<ByteBuffer, UDAggregate> createAggregatesFromAggregatesPartition(Row partition)
+    {
+        Map<ByteBuffer, UDAggregate> aggregates = new HashMap<>();
+        String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, AGGREGATES);
+        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+        {
+            UDAggregate aggregate = createAggregateFromAggregateRow(row);
+            aggregates.put(UDHelper.calculateSignature(aggregate), aggregate);
+        }
+        return aggregates;
+    }
+
+    private static UDAggregate createAggregateFromAggregateRow(UntypedResultSet.Row row)
+    {
+        String ksName = row.getString("keyspace_name");
+        String functionName = row.getString("aggregate_name");
+        FunctionName name = new FunctionName(ksName, functionName);
+
+        List<String> types = row.getList("argument_types", UTF8Type.instance);
+
+        List<AbstractType<?>> argTypes;
+        if (types == null)
+        {
+            argTypes = Collections.emptyList();
+        }
+        else
+        {
+            argTypes = new ArrayList<>(types.size());
+            for (String type : types)
+                argTypes.add(parseType(type));
+        }
+
+        AbstractType<?> returnType = parseType(row.getString("return_type"));
+
+        FunctionName stateFunc = new FunctionName(ksName, row.getString("state_func"));
+        FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
+        AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null;
+        ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
+
+        try
+        {
+            return UDAggregate.create(name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
+        }
+        catch (InvalidRequestException reason)
+        {
+            return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason);
+        }
+    }
+
+    public static Mutation makeDropAggregateMutation(KSMetaData keyspace, UDAggregate aggregate, long timestamp)
+    {
+        // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+        Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
+
+        ColumnFamily cells = mutation.addOrGet(Aggregates);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate));
+        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+
+        return mutation;
+    }
+
+    private static AbstractType<?> parseType(String str)
+    {
+        try
+        {
+            return TypeParser.parse(str);
+        }
+        catch (SyntaxException | ConfigurationException e)
+        {
+            // We only use this when reading the schema where we shouldn't get an error
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 99a2d87..edc91ab 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -34,7 +34,6 @@ import javax.management.StandardMBean;
 
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.hyperic.sigar.SigarException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +45,6 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.functions.Functions;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
@@ -253,10 +251,8 @@ public class CassandraDaemon
             exitOrFail(100, "Fatal exception during initialization", e);
         }
 
-
-        // load keyspace && function descriptions.
-        DatabaseDescriptor.loadSchemas();
-        Functions.loadUDFFromSchema();
+        // load schema from disk
+        Schema.instance.loadFromDisk();
 
         // clean up compaction leftovers
         Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 9c8d7d8..dcdf838 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -58,7 +59,7 @@ public class ClientState
     {
         // We want these system cfs to be always readable to authenticated users since many tools rely on them
         // (nodetool, cqlsh, bulkloader, etc.)
-        for (String cf : Iterables.concat(Arrays.asList(SystemKeyspace.LOCAL_TABLE, SystemKeyspace.PEERS_TABLE), SystemKeyspace.ALL_SCHEMA_TABLES))
+        for (String cf : Iterables.concat(Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.PEERS), LegacySchemaTables.ALL))
             READABLE_SYSTEM_RESOURCES.add(DataResource.columnFamily(SystemKeyspace.NAME, cf));
 
         PROTECTED_AUTH_RESOURCES.addAll(DatabaseDescriptor.getAuthenticator().protectedResources());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index c3fe1fa..fe32559 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.UTMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
@@ -50,6 +49,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -63,7 +63,7 @@ public class MigrationManager
 
     public static final int MIGRATION_DELAY_IN_MS = 60000;
 
-    private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>();
+    private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<>();
     
     private MigrationManager() {}
 
@@ -182,52 +182,40 @@ public class MigrationManager
             listener.onCreateFunction(udf.name().keyspace, udf.name().name);
     }
 
-    public void notifyUpdateFunction(UDFunction udf)
-    {
-        for (IMigrationListener listener : listeners)
-            listener.onUpdateFunction(udf.name().keyspace, udf.name().name);
-    }
-
-    public void notifyDropFunction(UDFunction udf)
-    {
-        for (IMigrationListener listener : listeners)
-            listener.onDropFunction(udf.name().keyspace, udf.name().name);
-    }
-
     public void notifyCreateAggregate(UDAggregate udf)
     {
         for (IMigrationListener listener : listeners)
             listener.onCreateAggregate(udf.name().keyspace, udf.name().name);
     }
 
-    public void notifyUpdateAggregate(UDAggregate udf)
+    public void notifyUpdateKeyspace(KSMetaData ksm)
     {
         for (IMigrationListener listener : listeners)
-            listener.onUpdateAggregate(udf.name().keyspace, udf.name().name);
+            listener.onUpdateKeyspace(ksm.name);
     }
 
-    public void notifyDropAggregate(UDAggregate udf)
+    public void notifyUpdateColumnFamily(CFMetaData cfm)
     {
         for (IMigrationListener listener : listeners)
-            listener.onDropAggregate(udf.name().keyspace, udf.name().name);
+            listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
-    public void notifyUpdateKeyspace(KSMetaData ksm)
+    public void notifyUpdateUserType(UserType ut)
     {
         for (IMigrationListener listener : listeners)
-            listener.onUpdateKeyspace(ksm.name);
+            listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
     }
 
-    public void notifyUpdateColumnFamily(CFMetaData cfm)
+    public void notifyUpdateFunction(UDFunction udf)
     {
         for (IMigrationListener listener : listeners)
-            listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
+            listener.onUpdateFunction(udf.name().keyspace, udf.name().name);
     }
 
-    public void notifyUpdateUserType(UserType ut)
+    public void notifyUpdateAggregate(UDAggregate udf)
     {
         for (IMigrationListener listener : listeners)
-            listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
+            listener.onUpdateAggregate(udf.name().keyspace, udf.name().name);
     }
 
     public void notifyDropKeyspace(KSMetaData ksm)
@@ -248,6 +236,18 @@ public class MigrationManager
             listener.onDropUserType(ut.keyspace, ut.getNameAsString());
     }
 
+    public void notifyDropFunction(UDFunction udf)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onDropFunction(udf.name().keyspace, udf.name().name);
+    }
+
+    public void notifyDropAggregate(UDAggregate udf)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onDropAggregate(udf.name().keyspace, udf.name().name);
+    }
+
     public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException
     {
         announceNewKeyspace(ksm, false);
@@ -266,7 +266,7 @@ public class MigrationManager
             throw new AlreadyExistsException(ksm.name);
 
         logger.info(String.format("Create new Keyspace: %s", ksm));
-        announce(ksm.toSchema(timestamp), announceLocally);
+        announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally);
     }
 
     public static void announceNewColumnFamily(CFMetaData cfm) throws ConfigurationException
@@ -285,17 +285,27 @@ public class MigrationManager
             throw new AlreadyExistsException(cfm.ksName, cfm.cfName);
 
         logger.info(String.format("Create new table: %s", cfm));
-        announce(addSerializedKeyspace(cfm.toSchema(FBUtilities.timestampMicros()), cfm.ksName), announceLocally);
+        announce(LegacySchemaTables.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally);
     }
 
-    public static void announceNewType(UserType newType)
+    public static void announceNewType(UserType newType, boolean announceLocally)
     {
-        announceNewType(newType, false);
+        KSMetaData ksm = Schema.instance.getKSMetaData(newType.keyspace);
+        announce(LegacySchemaTables.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally);
     }
 
-    public static void announceNewType(UserType newType, boolean announceLocally)
+    public static void announceNewFunction(UDFunction udf, boolean announceLocally)
     {
-        announce(addSerializedKeyspace(UTMetaData.toSchema(newType, FBUtilities.timestampMicros()), newType.keyspace), announceLocally);
+        logger.info(String.format("Create scalar function '%s'", udf.name()));
+        KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace);
+        announce(LegacySchemaTables.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally);
+    }
+
+    public static void announceNewAggregate(UDAggregate udf, boolean announceLocally)
+    {
+        logger.info(String.format("Create aggregate function '%s'", udf.name()));
+        KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace);
+        announce(LegacySchemaTables.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally);
     }
 
     public static void announceKeyspaceUpdate(KSMetaData ksm) throws ConfigurationException
@@ -312,7 +322,7 @@ public class MigrationManager
             throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name));
 
         logger.info(String.format("Update Keyspace '%s' From %s To %s", ksm.name, oldKsm, ksm));
-        announce(oldKsm.toSchemaUpdate(ksm, FBUtilities.timestampMicros()), announceLocally);
+        announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, FBUtilities.timestampMicros()), announceLocally);
     }
 
     public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift) throws ConfigurationException
@@ -327,16 +337,12 @@ public class MigrationManager
         CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName);
         if (oldCfm == null)
             throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName));
+        KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName);
 
         oldCfm.validateCompatility(cfm);
 
         logger.info(String.format("Update table '%s/%s' From %s To %s", cfm.ksName, cfm.cfName, oldCfm, cfm));
-        announce(addSerializedKeyspace(oldCfm.toSchemaUpdate(cfm, FBUtilities.timestampMicros(), fromThrift), cfm.ksName), announceLocally);
-    }
-
-    public static void announceTypeUpdate(UserType updatedType)
-    {
-        announceTypeUpdate(updatedType, false);
+        announce(LegacySchemaTables.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros(), fromThrift), announceLocally);
     }
 
     public static void announceTypeUpdate(UserType updatedType, boolean announceLocally)
@@ -356,7 +362,7 @@ public class MigrationManager
             throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName));
 
         logger.info(String.format("Drop Keyspace '%s'", oldKsm.name));
-        announce(oldKsm.dropFromSchema(FBUtilities.timestampMicros()), announceLocally);
+        announce(LegacySchemaTables.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally);
     }
 
     public static void announceColumnFamilyDrop(String ksName, String cfName) throws ConfigurationException
@@ -369,16 +375,10 @@ public class MigrationManager
         CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName);
         if (oldCfm == null)
             throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName));
+        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
 
         logger.info(String.format("Drop table '%s/%s'", oldCfm.ksName, oldCfm.cfName));
-        announce(addSerializedKeyspace(oldCfm.dropFromSchema(FBUtilities.timestampMicros()), ksName), announceLocally);
-    }
-
-    // Include the serialized keyspace for when a target node missed the CREATE KEYSPACE migration (see #5631).
-    private static Mutation addSerializedKeyspace(Mutation migration, String ksName)
-    {
-        migration.add(SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, ksName).cf);
-        return migration;
+        announce(LegacySchemaTables.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally);
     }
 
     public static void announceTypeDrop(UserType droppedType)
@@ -388,35 +388,22 @@ public class MigrationManager
 
     public static void announceTypeDrop(UserType droppedType, boolean announceLocally)
     {
-        announce(addSerializedKeyspace(UTMetaData.dropFromSchema(droppedType, FBUtilities.timestampMicros()), droppedType.keyspace), announceLocally);
+        KSMetaData ksm = Schema.instance.getKSMetaData(droppedType.keyspace);
+        announce(LegacySchemaTables.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally);
     }
 
     public static void announceFunctionDrop(UDFunction udf, boolean announceLocally)
     {
-        Mutation mutation = udf.toSchemaDrop(FBUtilities.timestampMicros());
         logger.info(String.format("Drop scalar function overload '%s' args '%s'", udf.name(), udf.argTypes()));
-        announce(mutation, announceLocally);
+        KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace);
+        announce(LegacySchemaTables.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally);
     }
 
     public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally)
     {
-        Mutation mutation = udf.toSchemaDrop(FBUtilities.timestampMicros());
         logger.info(String.format("Drop aggregate function overload '%s' args '%s'", udf.name(), udf.argTypes()));
-        announce(mutation, announceLocally);
-    }
-
-    public static void announceNewFunction(UDFunction udf, boolean announceLocally)
-    {
-        Mutation mutation = udf.toSchemaUpdate(FBUtilities.timestampMicros());
-        logger.info(String.format("Create scalar function '%s'", udf.name()));
-        announce(mutation, announceLocally);
-    }
-
-    public static void announceNewAggregate(UDAggregate udf, boolean announceLocally)
-    {
-        Mutation mutation = udf.toSchemaUpdate(FBUtilities.timestampMicros());
-        logger.info(String.format("Create aggregate function '%s'", udf.name()));
-        announce(mutation, announceLocally);
+        KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace);
+        announce(LegacySchemaTables.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally);
     }
 
     /**
@@ -429,7 +416,7 @@ public class MigrationManager
         {
             try
             {
-                DefsTables.mergeSchemaInternal(Collections.singletonList(schema), false);
+                LegacySchemaTables.mergeSchema(Collections.singletonList(schema), false);
             }
             catch (IOException e)
             {
@@ -457,7 +444,7 @@ public class MigrationManager
         {
             protected void runMayThrow() throws IOException, ConfigurationException
             {
-                DefsTables.mergeSchema(schema);
+                LegacySchemaTables.mergeSchema(schema);
             }
         });
 
@@ -497,9 +484,7 @@ public class MigrationManager
 
         logger.debug("Truncating schema tables...");
 
-        // truncate schema tables
-        for (String cf : SystemKeyspace.ALL_SCHEMA_TABLES)
-            SystemKeyspace.schemaCFS(cf).truncateBlocking();
+        LegacySchemaTables.truncateSchemaTables();
 
         logger.debug("Clearing local schema keyspace definitions...");
 
@@ -536,7 +521,7 @@ public class MigrationManager
         public Collection<Mutation> deserialize(DataInput in, int version) throws IOException
         {
             int count = in.readInt();
-            Collection<Mutation> schema = new ArrayList<Mutation>(count);
+            Collection<Mutation> schema = new ArrayList<>(count);
 
             for (int i = 0; i < count; i++)
                 schema.add(Mutation.serializer.deserialize(in, version));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index 9fdbff4..2a67e6d 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.DefsTables;
+import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
@@ -63,7 +63,7 @@ class MigrationTask extends WrappedRunnable
             {
                 try
                 {
-                    DefsTables.mergeSchema(message.payload);
+                    LegacySchemaTables.mergeSchema(message.payload);
                 }
                 catch (IOException e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9c57946..d6eb317 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -713,7 +713,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                         null,
                                                                         WriteType.SIMPLE);
         Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid));
-        mutation.delete(SystemKeyspace.BATCHLOG_TABLE, FBUtilities.timestampMicros());
+        mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros());
         MessageOut<Mutation> message = mutation.createMessage();
         for (InetAddress target : endpoints)
         {