You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:38 UTC

[14/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..87d186c 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -31,7 +31,8 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.tracing.Tracing;
@@ -121,18 +122,18 @@ public class Validator implements Runnable
      *
      * @param row Row to add hash
      */
-    public void add(AbstractCompactedRow row)
+    public void add(UnfilteredRowIterator partition)
     {
-        assert desc.range.contains(row.key.getToken()) : row.key.getToken() + " is not contained in " + desc.range;
-        assert lastKey == null || lastKey.compareTo(row.key) < 0
-               : "row " + row.key + " received out of order wrt " + lastKey;
-        lastKey = row.key;
+        assert desc.range.contains(partition.partitionKey().getToken()) : partition.partitionKey().getToken() + " is not contained in " + desc.range;
+        assert lastKey == null || lastKey.compareTo(partition.partitionKey()) < 0
+               : "partition " + partition.partitionKey() + " received out of order wrt " + lastKey;
+        lastKey = partition.partitionKey();
 
         if (range == null)
             range = ranges.next();
 
         // generate new ranges as long as case 1 is true
-        while (!range.contains(row.key.getToken()))
+        while (!range.contains(lastKey.getToken()))
         {
             // add the empty hash, and move to the next range
             range.ensureHashInitialised();
@@ -140,7 +141,7 @@ public class Validator implements Runnable
         }
 
         // case 3 must be true: mix in the hashed row
-        RowHash rowHash = rowHash(row);
+        RowHash rowHash = rowHash(partition);
         if (rowHash != null)
         {
             range.addHash(rowHash);
@@ -186,21 +187,16 @@ public class Validator implements Runnable
 
     }
 
-    private MerkleTree.RowHash rowHash(AbstractCompactedRow row)
+    private MerkleTree.RowHash rowHash(UnfilteredRowIterator partition)
     {
         validated++;
         // MerkleTree uses XOR internally, so we want lots of output bits here
         CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256"));
-        row.update(digest);
+        UnfilteredRowIterators.digest(partition, digest);
         // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979
-        if (digest.count > 0)
-        {
-            return new MerkleTree.RowHash(row.key.getToken(), digest.digest(), digest.count);
-        }
-        else
-        {
-            return null;
-        }
+        return digest.count > 0
+             ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count)
+             : null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
index b8f6421..1348d12 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
@@ -23,9 +23,8 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import org.slf4j.Logger;
@@ -34,17 +33,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.cql3.functions.AbstractFunction;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.compress.CompressionParameters;
@@ -52,6 +48,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
@@ -100,6 +97,7 @@ public class LegacySchemaTables
                 + "default_time_to_live int,"
                 + "default_validator text,"
                 + "dropped_columns map<text, bigint>,"
+                + "dropped_columns_types map<text, text>,"
                 + "gc_grace_seconds int,"
                 + "is_dense boolean,"
                 + "key_validator text,"
@@ -207,29 +205,37 @@ public class LegacySchemaTables
 
     public static Collection<KSMetaData> readSchemaFromSystemTables()
     {
-        List<Row> serializedSchema = getSchemaPartitionsForTable(KEYSPACES);
+        ReadCommand cmd = getReadCommandForTableSchema(KEYSPACES);
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup))
+        {
+            List<KSMetaData> keyspaces = new ArrayList<>();
 
-        List<KSMetaData> keyspaces = new ArrayList<>(serializedSchema.size());
+            while (schema.hasNext())
+            {
+                try (RowIterator partition = schema.next())
+                {
+                    if (isSystemKeyspaceSchemaPartition(partition.partitionKey()))
+                        continue;
 
-        for (Row partition : serializedSchema)
-        {
-            if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition))
-                continue;
+                    DecoratedKey key = partition.partitionKey();
 
-            keyspaces.add(createKeyspaceFromSchemaPartitions(partition,
-                                                             readSchemaPartitionForKeyspace(COLUMNFAMILIES, partition.key),
-                                                             readSchemaPartitionForKeyspace(USERTYPES, partition.key)));
+                    readSchemaPartitionForKeyspaceAndApply(USERTYPES, key,
+                        types -> readSchemaPartitionForKeyspaceAndApply(COLUMNFAMILIES, key, tables -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, types)))
+                    );
 
-            // Will be moved away in #6717
-            for (UDFunction function : createFunctionsFromFunctionsPartition(readSchemaPartitionForKeyspace(FUNCTIONS, partition.key)).values())
-                org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(function);
+                    // Will be moved away in #6717
+                    readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
+                        functions -> { createFunctionsFromFunctionsPartition(functions).forEach(function -> org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(function)); return null; }
+                    );
 
-            // Will be moved away in #6717
-            for (UDAggregate aggregate : createAggregatesFromAggregatesPartition(readSchemaPartitionForKeyspace(AGGREGATES, partition.key)).values())
-                org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(aggregate);
+                    // Will be moved away in #6717
+                    readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key,
+                        aggregates -> { createAggregatesFromAggregatesPartition(aggregates).forEach(aggregate -> org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(aggregate)); return null; }
+                    );
+                }
+            }
+            return keyspaces;
         }
-
-        return keyspaces;
     }
 
     public static void truncateSchemaTables()
@@ -262,18 +268,19 @@ public class LegacySchemaTables
 
         for (String table : ALL)
         {
-            for (Row partition : getSchemaPartitionsForTable(table))
+            ReadCommand cmd = getReadCommandForTableSchema(table);
+            try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup))
             {
-                if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition))
-                    continue;
-
-                // we want to digest only live columns
-                ColumnFamilyStore.removeDeletedColumnsOnly(partition.cf, Integer.MAX_VALUE, SecondaryIndexManager.nullUpdater);
-                partition.cf.purgeTombstones(Integer.MAX_VALUE);
-                partition.cf.updateDigest(digest);
+                while (schema.hasNext())
+                {
+                    try (RowIterator partition = schema.next())
+                    {
+                        if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
+                            RowIterators.digest(partition, digest);
+                    }
+                }
             }
         }
-
         return UUID.nameUUIDFromBytes(digest.digest());
     }
 
@@ -290,14 +297,10 @@ public class LegacySchemaTables
      * @param schemaTableName The name of the table responsible for part of the schema.
      * @return low-level schema representation
      */
-    private static List<Row> getSchemaPartitionsForTable(String schemaTableName)
+    private static ReadCommand getReadCommandForTableSchema(String schemaTableName)
     {
-        Token minToken = StorageService.getPartitioner().getMinimumToken();
-        return getSchemaCFS(schemaTableName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
-                                                           null,
-                                                           new IdentityQueryFilter(),
-                                                           Integer.MAX_VALUE,
-                                                           System.currentTimeMillis());
+        ColumnFamilyStore cfs = getSchemaCFS(schemaTableName);
+        return PartitionRangeReadCommand.allDataRead(cfs.metadata, FBUtilities.nowInSeconds());
     }
 
     public static Collection<Mutation> convertSchemaToMutations()
@@ -312,31 +315,45 @@ public class LegacySchemaTables
 
     private static void convertSchemaToMutations(Map<DecoratedKey, Mutation> mutationMap, String schemaTableName)
     {
-        for (Row partition : getSchemaPartitionsForTable(schemaTableName))
+        ReadCommand cmd = getReadCommandForTableSchema(schemaTableName);
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
         {
-            if (isSystemKeyspaceSchemaPartition(partition))
-                continue;
-
-            Mutation mutation = mutationMap.get(partition.key);
-            if (mutation == null)
+            while (iter.hasNext())
             {
-                mutation = new Mutation(SystemKeyspace.NAME, partition.key.getKey());
-                mutationMap.put(partition.key, mutation);
-            }
+                try (UnfilteredRowIterator partition = iter.next())
+                {
+                    if (isSystemKeyspaceSchemaPartition(partition.partitionKey()))
+                        continue;
+
+                    DecoratedKey key = partition.partitionKey();
+                    Mutation mutation = mutationMap.get(key);
+                    if (mutation == null)
+                    {
+                        mutation = new Mutation(SystemKeyspace.NAME, key);
+                        mutationMap.put(key, mutation);
+                    }
 
-            mutation.add(partition.cf);
+                    mutation.add(UnfilteredRowIterators.toUpdate(partition));
+                }
+            }
         }
     }
 
-    private static Map<DecoratedKey, ColumnFamily> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
+    private static Map<DecoratedKey, FilteredPartition> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
     {
-        Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
+        Map<DecoratedKey, FilteredPartition> schema = new HashMap<>();
 
         for (String keyspaceName : keyspaceNames)
         {
-            Row schemaEntity = readSchemaPartitionForKeyspace(schemaTableName, keyspaceName);
-            if (schemaEntity.cf != null)
-                schema.put(schemaEntity.key, schemaEntity.cf);
+            // We don't to return the RowIterator directly because we should guarantee that this iterator
+            // will be closed, and putting it in a Map make that harder/more awkward.
+            readSchemaPartitionForKeyspaceAndApply(schemaTableName, keyspaceName,
+                partition -> {
+                    if (!partition.isEmpty())
+                        schema.put(partition.partitionKey(), FilteredPartition.create(partition));
+                    return null;
+                }
+            );
         }
 
         return schema;
@@ -347,35 +364,46 @@ public class LegacySchemaTables
         return AsciiType.instance.fromString(ksName);
     }
 
-    private static Row readSchemaPartitionForKeyspace(String schemaTableName, String keyspaceName)
+    private static DecoratedKey getSchemaKSDecoratedKey(String ksName)
     {
-        DecoratedKey keyspaceKey = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
-        return readSchemaPartitionForKeyspace(schemaTableName, keyspaceKey);
+        return StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
     }
 
-    private static Row readSchemaPartitionForKeyspace(String schemaTableName, DecoratedKey keyspaceKey)
+    private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function<RowIterator, T> fct)
     {
-        QueryFilter filter = QueryFilter.getIdentityFilter(keyspaceKey, schemaTableName, System.currentTimeMillis());
-        return new Row(keyspaceKey, getSchemaCFS(schemaTableName).getColumnFamily(filter));
+        return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSDecoratedKey(keyspaceName), fct);
     }
 
-    private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName)
+    private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
         ColumnFamilyStore store = getSchemaCFS(schemaTableName);
-        Composite prefix = store.getComparator().make(tableName);
-        ColumnFamily cells = store.getColumnFamily(key, prefix, prefix.end(), false, Integer.MAX_VALUE, System.currentTimeMillis());
-        return new Row(key, cells);
+        int nowInSec = FBUtilities.nowInSeconds();
+        try (OpOrder.Group op = store.readOrdering.start();
+             RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata, nowInSec, keyspaceKey)
+                                                                                             .queryMemtableAndDisk(store, op), nowInSec))
+        {
+            return fct.apply(partition);
+        }
     }
 
-    private static boolean isEmptySchemaPartition(Row partition)
+    private static <T> T readSchemaPartitionForTableAndApply(String schemaTableName, String keyspaceName, String tableName, Function<RowIterator, T> fct)
     {
-        return partition.cf == null || (partition.cf.isMarkedForDelete() && !partition.cf.hasColumns());
+        ColumnFamilyStore store = getSchemaCFS(schemaTableName);
+
+        ClusteringComparator comparator = store.metadata.comparator;
+        Slices slices = Slices.with(comparator, Slice.make(comparator, tableName));
+        int nowInSec = FBUtilities.nowInSeconds();
+        try (OpOrder.Group op = store.readOrdering.start();
+             RowIterator partition =  UnfilteredRowIterators.filter(SinglePartitionSliceCommand.create(store.metadata, nowInSec, getSchemaKSDecoratedKey(keyspaceName), slices)
+                                                                                               .queryMemtableAndDisk(store, op), nowInSec))
+        {
+            return fct.apply(partition);
+        }
     }
 
-    private static boolean isSystemKeyspaceSchemaPartition(Row partition)
+    private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey)
     {
-        return getSchemaKSKey(SystemKeyspace.NAME).equals(partition.key.getKey());
+        return getSchemaKSKey(SystemKeyspace.NAME).equals(partitionKey.getKey());
     }
 
     /**
@@ -398,14 +426,14 @@ public class LegacySchemaTables
         // compare before/after schemas of the affected keyspaces only
         Set<String> keyspaces = new HashSet<>(mutations.size());
         for (Mutation mutation : mutations)
-            keyspaces.add(ByteBufferUtil.string(mutation.key()));
+            keyspaces.add(ByteBufferUtil.string(mutation.key().getKey()));
 
         // current state of the schema
-        Map<DecoratedKey, ColumnFamily> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
-        Map<DecoratedKey, ColumnFamily> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
-        Map<DecoratedKey, ColumnFamily> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
-        Map<DecoratedKey, ColumnFamily> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
-        Map<DecoratedKey, ColumnFamily> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
+        Map<DecoratedKey, FilteredPartition> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
 
         for (Mutation mutation : mutations)
             mutation.apply();
@@ -414,11 +442,11 @@ public class LegacySchemaTables
             flushSchemaTables();
 
         // with new data applied
-        Map<DecoratedKey, ColumnFamily> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
-        Map<DecoratedKey, ColumnFamily> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
-        Map<DecoratedKey, ColumnFamily> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
-        Map<DecoratedKey, ColumnFamily> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
-        Map<DecoratedKey, ColumnFamily> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
+        Map<DecoratedKey, FilteredPartition> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
+        Map<DecoratedKey, FilteredPartition> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
 
         Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
         mergeTables(oldColumnFamilies, newColumnFamilies);
@@ -431,263 +459,187 @@ public class LegacySchemaTables
             Schema.instance.dropKeyspace(keyspaceToDrop);
     }
 
-    private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    private static Set<String> mergeKeyspaces(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
     {
-        List<Row> created = new ArrayList<>();
-        List<String> altered = new ArrayList<>();
-        Set<String> dropped = new HashSet<>();
-
-        /*
-         * - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us
-         * - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily
-         *   there that only has the top-level deletion, if:
-         *      a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place
-         *      b) a pulled dropped keyspace that got dropped before it could find a way to this node
-         * - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns:
-         *   that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way
-         *   to this node
-         */
-        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
-            if (entry.getValue().hasColumns())
-                created.add(new Row(entry.getKey(), entry.getValue()));
-
-        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        for (FilteredPartition newPartition : after.values())
         {
-            String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
-            ColumnFamily pre  = entry.getValue().leftValue();
-            ColumnFamily post = entry.getValue().rightValue();
-
-            if (pre.hasColumns() && post.hasColumns())
-                altered.add(keyspaceName);
-            else if (pre.hasColumns())
-                dropped.add(keyspaceName);
-            else if (post.hasColumns()) // a (re)created keyspace
-                created.add(new Row(entry.getKey(), post));
+            FilteredPartition oldPartition = before.remove(newPartition.partitionKey());
+            if (oldPartition == null || oldPartition.isEmpty())
+            {
+                Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(newPartition.rowIterator()));
+            }
+            else
+            {
+                String name = AsciiType.instance.compose(newPartition.partitionKey().getKey());
+                Schema.instance.updateKeyspace(name);
+            }
         }
 
-        for (Row row : created)
-            Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(row));
-        for (String name : altered)
-            Schema.instance.updateKeyspace(name);
-        return dropped;
+        // What's remain in old is those keyspace that are not in updated, i.e. the dropped ones.
+        return asKeyspaceNamesSet(before.keySet());
     }
 
-    // see the comments for mergeKeyspaces()
-    private static void mergeTables(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    private static Set<String> asKeyspaceNamesSet(Set<DecoratedKey> keys)
     {
-        List<CFMetaData> created = new ArrayList<>();
-        List<CFMetaData> altered = new ArrayList<>();
-        List<CFMetaData> dropped = new ArrayList<>();
-
-        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
-            if (entry.getValue().hasColumns())
-                created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), entry.getValue())).values());
+        Set<String> names = new HashSet(keys.size());
+        for (DecoratedKey key : keys)
+            names.add(AsciiType.instance.compose(key.getKey()));
+        return names;
+    }
 
-        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+    private static void mergeTables(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
+    {
+        diffSchema(before, after, new Differ()
         {
-            String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
-            ColumnFamily pre  = entry.getValue().leftValue();
-            ColumnFamily post = entry.getValue().rightValue();
-
-            if (pre.hasColumns() && post.hasColumns())
+            public void onDropped(UntypedResultSet.Row oldRow)
             {
-                MapDifference<String, CFMetaData> delta =
-                    Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(),
-                                    createTablesFromTablesPartition(new Row(entry.getKey(), post)));
-
-                dropped.addAll(delta.entriesOnlyOnLeft().values());
-                created.addAll(delta.entriesOnlyOnRight().values());
-                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<CFMetaData>, CFMetaData>()
-                {
-                    public CFMetaData apply(MapDifference.ValueDifference<CFMetaData> pair)
-                    {
-                        return pair.rightValue();
-                    }
-                }));
+                Schema.instance.dropTable(oldRow.getString("keyspace_name"), oldRow.getString("columnfamily_name"));
             }
-            else if (pre.hasColumns())
+
+            public void onAdded(UntypedResultSet.Row newRow)
             {
-                dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values());
+                Schema.instance.addTable(createTableFromTableRow(newRow));
             }
-            else if (post.hasColumns())
+
+            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
             {
-                created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), post)).values());
+                Schema.instance.updateTable(newRow.getString("keyspace_name"), newRow.getString("columnfamily_name"));
             }
-        }
-
-        for (CFMetaData cfm : created)
-            Schema.instance.addTable(cfm);
-        for (CFMetaData cfm : altered)
-            Schema.instance.updateTable(cfm.ksName, cfm.cfName);
-        for (CFMetaData cfm : dropped)
-            Schema.instance.dropTable(cfm.ksName, cfm.cfName);
+        });
     }
 
-    // see the comments for mergeKeyspaces()
-    private static void mergeTypes(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    private static void mergeTypes(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
     {
-        List<UserType> created = new ArrayList<>();
-        List<UserType> altered = new ArrayList<>();
-        List<UserType> dropped = new ArrayList<>();
-
-        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
-        // New keyspace with types
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
-            if (entry.getValue().hasColumns())
-                created.addAll(createTypesFromPartition(new Row(entry.getKey(), entry.getValue())).values());
-
-        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+        diffSchema(before, after, new Differ()
         {
-            String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
-            ColumnFamily pre  = entry.getValue().leftValue();
-            ColumnFamily post = entry.getValue().rightValue();
-
-            if (pre.hasColumns() && post.hasColumns())
+            public void onDropped(UntypedResultSet.Row oldRow)
             {
-                MapDifference<ByteBuffer, UserType> delta =
-                    Maps.difference(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes(),
-                                    createTypesFromPartition(new Row(entry.getKey(), post)));
-
-                dropped.addAll(delta.entriesOnlyOnLeft().values());
-                created.addAll(delta.entriesOnlyOnRight().values());
-                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UserType>, UserType>()
-                {
-                    public UserType apply(MapDifference.ValueDifference<UserType> pair)
-                    {
-                        return pair.rightValue();
-                    }
-                }));
+                Schema.instance.dropType(createTypeFromRow(oldRow));
             }
-            else if (pre.hasColumns())
+
+            public void onAdded(UntypedResultSet.Row newRow)
             {
-                dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes().values());
+                Schema.instance.addType(createTypeFromRow(newRow));
             }
-            else if (post.hasColumns())
+
+            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
             {
-                created.addAll(createTypesFromPartition(new Row(entry.getKey(), post)).values());
+                Schema.instance.updateType(createTypeFromRow(newRow));
             }
-        }
-
-        for (UserType type : created)
-            Schema.instance.addType(type);
-        for (UserType type : altered)
-            Schema.instance.updateType(type);
-        for (UserType type : dropped)
-            Schema.instance.dropType(type);
+        });
     }
 
-    // see the comments for mergeKeyspaces()
-    private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    private static void mergeFunctions(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
     {
-        List<UDFunction> created = new ArrayList<>();
-        List<UDFunction> altered = new ArrayList<>();
-        List<UDFunction> dropped = new ArrayList<>();
+        diffSchema(before, after, new Differ()
+        {
+            public void onDropped(UntypedResultSet.Row oldRow)
+            {
+                Schema.instance.dropFunction(createFunctionFromFunctionRow(oldRow));
+            }
 
-        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+            public void onAdded(UntypedResultSet.Row newRow)
+            {
+                Schema.instance.addFunction(createFunctionFromFunctionRow(newRow));
+            }
 
-        // New keyspace with functions
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
-            if (entry.getValue().hasColumns())
-                created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), entry.getValue())).values());
+            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
+            {
+                Schema.instance.updateFunction(createFunctionFromFunctionRow(newRow));
+            }
+        });
+    }
 
-        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+    private static void mergeAggregates(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
+    {
+        diffSchema(before, after, new Differ()
         {
-            ColumnFamily pre = entry.getValue().leftValue();
-            ColumnFamily post = entry.getValue().rightValue();
-
-            if (pre.hasColumns() && post.hasColumns())
+            public void onDropped(UntypedResultSet.Row oldRow)
             {
-                MapDifference<ByteBuffer, UDFunction> delta =
-                    Maps.difference(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)),
-                                    createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post)));
-
-                dropped.addAll(delta.entriesOnlyOnLeft().values());
-                created.addAll(delta.entriesOnlyOnRight().values());
-                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDFunction>, UDFunction>()
-                {
-                    public UDFunction apply(MapDifference.ValueDifference<UDFunction> pair)
-                    {
-                        return pair.rightValue();
-                    }
-                }));
+                Schema.instance.dropAggregate(createAggregateFromAggregateRow(oldRow));
             }
-            else if (pre.hasColumns())
+
+            public void onAdded(UntypedResultSet.Row newRow)
             {
-                dropped.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)).values());
+                Schema.instance.addAggregate(createAggregateFromAggregateRow(newRow));
             }
-            else if (post.hasColumns())
+
+            public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
             {
-                created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post)).values());
+                Schema.instance.updateAggregate(createAggregateFromAggregateRow(newRow));
             }
-        }
+        });
+    }
 
-        for (UDFunction udf : created)
-            Schema.instance.addFunction(udf);
-        for (UDFunction udf : altered)
-            Schema.instance.updateFunction(udf);
-        for (UDFunction udf : dropped)
-            Schema.instance.dropFunction(udf);
+    public interface Differ
+    {
+        public void onDropped(UntypedResultSet.Row oldRow);
+        public void onAdded(UntypedResultSet.Row newRow);
+        public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow);
     }
 
-    // see the comments for mergeKeyspaces()
-    private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+    private static void diffSchema(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after, Differ differ)
     {
-        List<UDAggregate> created = new ArrayList<>();
-        List<UDAggregate> altered = new ArrayList<>();
-        List<UDAggregate> dropped = new ArrayList<>();
+        for (FilteredPartition newPartition : after.values())
+        {
+            CFMetaData metadata = newPartition.metadata();
+            DecoratedKey key = newPartition.partitionKey();
 
-        MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+            FilteredPartition oldPartition = before.remove(key);
 
-        // New keyspace with functions
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
-            if (entry.getValue().hasColumns())
-                created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), entry.getValue())).values());
+            if (oldPartition == null || oldPartition.isEmpty())
+            {
+                // Means everything is to be added
+                for (Row row : newPartition)
+                    differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, row));
+                continue;
+            }
 
-        for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
-        {
-            ColumnFamily pre = entry.getValue().leftValue();
-            ColumnFamily post = entry.getValue().rightValue();
+            Iterator<Row> oldIter = oldPartition.iterator();
+            Iterator<Row> newIter = newPartition.iterator();
 
-            if (pre.hasColumns() && post.hasColumns())
+            Row oldRow = oldIter.hasNext() ? oldIter.next() : null;
+            Row newRow = newIter.hasNext() ? newIter.next() : null;
+            while (oldRow != null && newRow != null)
             {
-                MapDifference<ByteBuffer, UDAggregate> delta =
-                    Maps.difference(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)),
-                                    createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)));
+                int cmp = metadata.comparator.compare(oldRow.clustering(), newRow.clustering());
+                if (cmp < 0)
+                {
+                    differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
+                    oldRow = oldIter.hasNext() ? oldIter.next() : null;
+                }
+                else if (cmp > 0)
+                {
 
-                dropped.addAll(delta.entriesOnlyOnLeft().values());
-                created.addAll(delta.entriesOnlyOnRight().values());
-                Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>()
+                    differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
+                    newRow = newIter.hasNext() ? newIter.next() : null;
+                }
+                else
                 {
-                    public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair)
-                    {
-                        return pair.rightValue();
-                    }
-                }));
+                    if (!oldRow.equals(newRow))
+                        differ.onUpdated(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow), UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
+
+                    oldRow = oldIter.hasNext() ? oldIter.next() : null;
+                    newRow = newIter.hasNext() ? newIter.next() : null;
+                }
             }
-            else if (pre.hasColumns())
+
+            while (oldRow != null)
             {
-                dropped.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)).values());
+                differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
+                oldRow = oldIter.hasNext() ? oldIter.next() : null;
             }
-            else if (post.hasColumns())
+            while (newRow != null)
             {
-                created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)).values());
+                differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
+                newRow = newIter.hasNext() ? newIter.next() : null;
             }
         }
 
-        for (UDAggregate udf : created)
-            Schema.instance.addAggregate(udf);
-        for (UDAggregate udf : altered)
-            Schema.instance.updateAggregate(udf);
-        for (UDAggregate udf : dropped)
-            Schema.instance.dropAggregate(udf);
+        // What remains is those keys that were only in before.
+        for (FilteredPartition partition : before.values())
+            for (Row row : partition)
+                differ.onDropped(UntypedResultSet.Row.fromInternalRow(partition.metadata(), partition.partitionKey(), row));
     }
 
     /*
@@ -701,14 +653,15 @@ public class LegacySchemaTables
 
     private static Mutation makeCreateKeyspaceMutation(KSMetaData keyspace, long timestamp, boolean withTablesAndTypesAndFunctions)
     {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name));
-        ColumnFamily cells = mutation.addOrGet(Keyspaces);
-        CFRowAdder adder = new CFRowAdder(cells, Keyspaces.comparator.builder().build(), timestamp);
+        // Note that because Keyspaces is a COMPACT TABLE, we're really only setting static columns internally and shouldn't set any clustering.
+        RowUpdateBuilder adder = new RowUpdateBuilder(Keyspaces, timestamp, keyspace.name);
 
         adder.add("durable_writes", keyspace.durableWrites);
         adder.add("strategy_class", keyspace.strategyClass.getName());
         adder.add("strategy_options", json(keyspace.strategyOptions));
 
+        Mutation mutation = adder.build();
+
         if (withTablesAndTypesAndFunctions)
         {
             for (UserType type : keyspace.userTypes.getAllTypes().values())
@@ -723,36 +676,39 @@ public class LegacySchemaTables
 
     public static Mutation makeDropKeyspaceMutation(KSMetaData keyspace, long timestamp)
     {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name));
-        for (String schemaTable : ALL)
-            mutation.delete(schemaTable, timestamp);
-        mutation.delete(SystemKeyspace.BUILT_INDEXES, timestamp);
+        int nowInSec = FBUtilities.nowInSeconds();
+        Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSDecoratedKey(keyspace.name));
+        for (CFMetaData schemaTable : All)
+            mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec));
+        mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.BuiltIndexes, mutation.key(), timestamp, nowInSec));
         return mutation;
     }
 
-    private static KSMetaData createKeyspaceFromSchemaPartitions(Row serializedKeyspace, Row serializedTables, Row serializedTypes)
+    private static KSMetaData createKeyspaceFromSchemaPartitions(RowIterator serializedKeyspace, RowIterator serializedTables, RowIterator serializedTypes)
     {
-        Collection<CFMetaData> tables = createTablesFromTablesPartition(serializedTables).values();
+        Collection<CFMetaData> tables = createTablesFromTablesPartition(serializedTables);
         UTMetaData types = new UTMetaData(createTypesFromPartition(serializedTypes));
         return createKeyspaceFromSchemaPartition(serializedKeyspace).cloneWith(tables, types);
     }
 
     public static KSMetaData createKeyspaceFromName(String keyspace)
     {
-        Row partition = readSchemaPartitionForKeyspace(KEYSPACES, keyspace);
-
-        if (isEmptySchemaPartition(partition))
-            throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES));
+        return readSchemaPartitionForKeyspaceAndApply(KEYSPACES, keyspace, partition ->
+        {
+            if (partition.isEmpty())
+                throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES));
 
-        return createKeyspaceFromSchemaPartition(partition);
+            return createKeyspaceFromSchemaPartition(partition);
+        });
     }
 
+
     /**
      * Deserialize only Keyspace attributes without nested tables or types
      *
      * @param partition Keyspace attributes in serialized form
      */
-    private static KSMetaData createKeyspaceFromSchemaPartition(Row partition)
+    private static KSMetaData createKeyspaceFromSchemaPartition(RowIterator partition)
     {
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, KEYSPACES);
         UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one();
@@ -776,10 +732,8 @@ public class LegacySchemaTables
 
     private static void addTypeToSchemaMutation(UserType type, long timestamp, Mutation mutation)
     {
-        ColumnFamily cells = mutation.addOrGet(Usertypes);
-
-        Composite prefix = Usertypes.comparator.make(type.name);
-        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+        RowUpdateBuilder adder = new RowUpdateBuilder(Usertypes, timestamp, mutation)
+                                 .clustering(type.name);
 
         adder.resetCollection("field_names");
         adder.resetCollection("field_types");
@@ -789,23 +743,18 @@ public class LegacySchemaTables
             adder.addListEntry("field_names", type.fieldName(i));
             adder.addListEntry("field_types", type.fieldType(i).toString());
         }
+
+        adder.build();
     }
 
     public static Mutation dropTypeFromSchemaMutation(KSMetaData keyspace, UserType type, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
-
-        ColumnFamily cells = mutation.addOrGet(Usertypes);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        Composite prefix = Usertypes.comparator.make(type.name);
-        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
-        return mutation;
+        return RowUpdateBuilder.deleteRow(Usertypes, timestamp, mutation, type.name);
     }
 
-    private static Map<ByteBuffer, UserType> createTypesFromPartition(Row partition)
+    private static Map<ByteBuffer, UserType> createTypesFromPartition(RowIterator partition)
     {
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, USERTYPES);
         Map<ByteBuffer, UserType> types = new HashMap<>();
@@ -851,12 +800,11 @@ public class LegacySchemaTables
     {
         // For property that can be null (and can be changed), we insert tombstones, to make sure
         // we don't keep a property the user has removed
-        ColumnFamily cells = mutation.addOrGet(Columnfamilies);
-        Composite prefix = Columnfamilies.comparator.make(table.cfName);
-        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+        RowUpdateBuilder adder = new RowUpdateBuilder(Columnfamilies, timestamp, mutation)
+                                 .clustering(table.cfName);
 
         adder.add("cf_id", table.cfId);
-        adder.add("type", table.cfType.toString());
+        adder.add("type", table.isSuper() ? "Super" : "Standard");
 
         if (table.isSuper())
         {
@@ -864,11 +812,11 @@ public class LegacySchemaTables
             // we won't know at deserialization if the subcomparator should be taken into account
             // TODO: we should implement an on-start migration if we want to get rid of that.
             adder.add("comparator", table.comparator.subtype(0).toString());
-            adder.add("subcomparator", table.comparator.subtype(1).toString());
+            adder.add("subcomparator", ((MapType)table.compactValueColumn().type).getKeysType().toString());
         }
         else
         {
-            adder.add("comparator", table.comparator.toString());
+            adder.add("comparator", LegacyLayout.makeLegacyComparator(table).toString());
         }
 
         adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance());
@@ -878,7 +826,6 @@ public class LegacySchemaTables
         adder.add("compaction_strategy_options", json(table.compactionStrategyOptions));
         adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions()));
         adder.add("default_time_to_live", table.getDefaultTimeToLive());
-        adder.add("default_validator", table.getDefaultValidator().toString());
         adder.add("gc_grace_seconds", table.getGcGraceSeconds());
         adder.add("key_validator", table.getKeyValidator().toString());
         adder.add("local_read_repair_chance", table.getDcLocalReadRepairChance());
@@ -890,10 +837,18 @@ public class LegacySchemaTables
         adder.add("read_repair_chance", table.getReadRepairChance());
         adder.add("speculative_retry", table.getSpeculativeRetry().toString());
 
-        for (Map.Entry<ColumnIdentifier, Long> entry : table.getDroppedColumns().entrySet())
-            adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue());
+        for (Map.Entry<ColumnIdentifier, CFMetaData.DroppedColumn> entry : table.getDroppedColumns().entrySet())
+        {
+            String name = entry.getKey().toString();
+            CFMetaData.DroppedColumn column = entry.getValue();
+            adder.addMapEntry("dropped_columns", name, column.droppedTime);
+            if (column.type != null)
+                adder.addMapEntry("dropped_columns_types", name, column.type.toString());
+        }
+
+        adder.add("is_dense", table.isDense());
 
-        adder.add("is_dense", table.getIsDense());
+        adder.add("default_validator", table.makeLegacyDefaultValidator().toString());
 
         if (withColumnsAndTriggers)
         {
@@ -903,6 +858,8 @@ public class LegacySchemaTables
             for (TriggerDefinition trigger : table.getTriggers().values())
                 addTriggerToSchemaMutation(table, trigger, timestamp, mutation);
         }
+
+        adder.build();
     }
 
     public static Mutation makeUpdateTableMutation(KSMetaData keyspace,
@@ -955,11 +912,7 @@ public class LegacySchemaTables
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
 
-        ColumnFamily cells = mutation.addOrGet(Columnfamilies);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        Composite prefix = Columnfamilies.comparator.make(table.cfName);
-        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+        RowUpdateBuilder.deleteRow(Columnfamilies, timestamp, mutation, table.cfName);
 
         for (ColumnDefinition column : table.allColumns())
             dropColumnFromSchemaMutation(table, column, timestamp, mutation);
@@ -968,21 +921,21 @@ public class LegacySchemaTables
             dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
 
         // TODO: get rid of in #6717
-        ColumnFamily indexCells = mutation.addOrGet(SystemKeyspace.BuiltIndexes);
         for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes())
-            indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp);
+            RowUpdateBuilder.deleteRow(SystemKeyspace.BuiltIndexes, timestamp, mutation, indexName);
 
         return mutation;
     }
 
     public static CFMetaData createTableFromName(String keyspace, String table)
     {
-        Row partition = readSchemaPartitionForTable(COLUMNFAMILIES, keyspace, table);
-
-        if (isEmptySchemaPartition(partition))
-            throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table));
+        return readSchemaPartitionForTableAndApply(COLUMNFAMILIES, keyspace, table, partition ->
+        {
+            if (partition.isEmpty())
+                throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table));
 
-        return createTableFromTablePartition(partition);
+            return createTableFromTablePartition(partition);
+        });
     }
 
     /**
@@ -990,37 +943,34 @@ public class LegacySchemaTables
      *
      * @return map containing name of the table and its metadata for faster lookup
      */
-    private static Map<String, CFMetaData> createTablesFromTablesPartition(Row partition)
+    private static Collection<CFMetaData> createTablesFromTablesPartition(RowIterator partition)
     {
-        if (partition.cf == null)
-            return Collections.emptyMap();
+        if (partition.isEmpty())
+            return Collections.emptyList();
 
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
-        Map<String, CFMetaData> tables = new HashMap<>();
+        List<CFMetaData> tables = new ArrayList<>();
         for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
-        {
-            CFMetaData cfm = createTableFromTableRow(row);
-            tables.put(cfm.cfName, cfm);
-        }
+            tables.add(createTableFromTableRow(row));
         return tables;
     }
 
-    public static CFMetaData createTableFromTablePartitionAndColumnsPartition(Row serializedTable, Row serializedColumns)
+    public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator serializedTable, RowIterator serializedColumns)
     {
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
         return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns);
     }
 
-    private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, Row serializedColumns)
+    private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, RowIterator serializedColumns)
     {
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNS);
         return createTableFromTableRowAndColumnRows(tableRow, QueryProcessor.resultify(query, serializedColumns));
     }
 
-    private static CFMetaData createTableFromTablePartition(Row row)
+    private static CFMetaData createTableFromTablePartition(RowIterator partition)
     {
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
-        return createTableFromTableRow(QueryProcessor.resultify(query, row).one());
+        return createTableFromTableRow(QueryProcessor.resultify(query, partition).one());
     }
 
     /**
@@ -1033,12 +983,11 @@ public class LegacySchemaTables
         String ksName = result.getString("keyspace_name");
         String cfName = result.getString("columnfamily_name");
 
-        Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName);
-        CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns);
+        CFMetaData cfm = readSchemaPartitionForTableAndApply(COLUMNS, ksName, cfName, partition -> createTableFromTableRowAndColumnsPartition(result, partition));
 
-        Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName);
-        for (TriggerDefinition trigger : createTriggersFromTriggersPartition(serializedTriggers))
-            cfm.addTriggerDefinition(trigger);
+        readSchemaPartitionForTableAndApply(TRIGGERS, ksName, cfName,
+            partition -> { createTriggersFromTriggersPartition(partition).forEach(trigger -> cfm.addTriggerDefinition(trigger)); return null; }
+        );
 
         return cfm;
     }
@@ -1051,35 +1000,47 @@ public class LegacySchemaTables
 
         AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
         AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
-        ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type"));
-
-        AbstractType<?> fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator);
-
-        List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions,
-                                                                        ksName,
-                                                                        cfName,
-                                                                        fullRawComparator,
-                                                                        cfType == ColumnFamilyType.Super);
 
-        boolean isDense = result.has("is_dense")
-                        ? result.getBoolean("is_dense")
-                        : CFMetaData.calculateIsDense(fullRawComparator, columnDefs);
+        boolean isSuper = result.getString("type").toLowerCase().equals("super");
+        boolean isDense = result.getBoolean("is_dense");
+        boolean isCompound = rawComparator instanceof CompositeType;
 
-        CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense);
+        // We don't really use the default validator but as we have it for backward compatibility, we use it to know if it's a counter table
+        AbstractType<?> defaultValidator = TypeParser.parse(result.getString("default_validator"));
+        boolean isCounter =  defaultValidator instanceof CounterColumnType;
 
         // if we are upgrading, we use id generated from names initially
         UUID cfId = result.has("cf_id")
                   ? result.getUUID("cf_id")
                   : CFMetaData.generateLegacyCfId(ksName, cfName);
 
-        CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId);
-        cfm.isDense(isDense);
+        boolean isCQLTable = !isSuper && !isDense && isCompound;
+        boolean isStaticCompactTable = !isDense && !isCompound;
+
+        // Internally, compact tables have a specific layout, see CompactTables. But when upgrading from
+        // previous versions, they may not have the expected schema, so detect if we need to upgrade and do
+        // it in createColumnsFromColumnRows.
+        // We can remove this once we don't support upgrade from versions < 3.0.
+        boolean needsUpgrade = isCQLTable ? false : checkNeedsUpgrade(serializedColumnDefinitions, isSuper, isStaticCompactTable);
+
+        List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions,
+                                                                        ksName,
+                                                                        cfName,
+                                                                        rawComparator,
+                                                                        subComparator,
+                                                                        isSuper,
+                                                                        isCQLTable,
+                                                                        isStaticCompactTable,
+                                                                        needsUpgrade);
+
+        if (needsUpgrade)
+            addDefinitionForUpgrade(columnDefs, ksName, cfName, isStaticCompactTable, isSuper, rawComparator, subComparator, defaultValidator);
+
+        CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, columnDefs);
 
         cfm.readRepairChance(result.getDouble("read_repair_chance"));
         cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
         cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
-        cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
-        cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
         cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
         cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
         if (result.has("comment"))
@@ -1107,20 +1068,86 @@ public class LegacySchemaTables
             cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
 
         if (result.has("dropped_columns"))
-            cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
+        {
+            Map<String, String> types = result.has("dropped_columns_types")
+                                      ? result.getMap("dropped_columns_types", UTF8Type.instance, UTF8Type.instance) 
+                                      : Collections.<String, String>emptyMap();
+            addDroppedColumns(cfm, result.getMap("dropped_columns", UTF8Type.instance, LongType.instance), types);
+        }
+
+        return cfm;
+    }
+
+    // Should only be called on compact tables
+    private static boolean checkNeedsUpgrade(UntypedResultSet defs, boolean isSuper, boolean isStaticCompactTable)
+    {
+        if (isSuper)
+        {
+            // Check if we've added the "supercolumn map" column yet or not
+            for (UntypedResultSet.Row row : defs)
+            {
+                if (row.getString("column_name").isEmpty())
+                    return false;
+            }
+            return true;
+        }
+
+        // For static compact tables, we need to upgrade if the regular definitions haven't been converted to static yet,
+        // i.e. if we don't have a static definition yet.
+        if (isStaticCompactTable)
+            return !hasKind(defs, ColumnDefinition.Kind.STATIC);
+
+        // For dense compact tables, we need to upgrade if we don't have a compact value definition
+        return !hasKind(defs, ColumnDefinition.Kind.REGULAR);
+    }
+
+    private static void addDefinitionForUpgrade(List<ColumnDefinition> defs,
+                                                String ksName,
+                                                String cfName,
+                                                boolean isStaticCompactTable,
+                                                boolean isSuper,
+                                                AbstractType<?> rawComparator,
+                                                AbstractType<?> subComparator,
+                                                AbstractType<?> defaultValidator)
+    {
+        CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(defs);
 
-        for (ColumnDefinition cd : columnDefs)
-            cfm.addOrReplaceColumnDefinition(cd);
+        if (isSuper)
+        {
+            defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true), null));
+        }
+        else if (isStaticCompactTable)
+        {
+            defs.add(ColumnDefinition.clusteringKeyDef(ksName, cfName, names.defaultClusteringName(), rawComparator, null));
+            defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator, null));
+        }
+        else
+        {
+            // For dense compact tables, we get here if we don't have a compact value column, in which case we should add it
+            // (we use EmptyType to recognize that the compact value was not declared by the use (see CreateTableStatement too))
+            defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance, null));
+        }
+    }
 
-        return cfm.rebuild();
+    private static boolean hasKind(UntypedResultSet defs, ColumnDefinition.Kind kind)
+    {
+        for (UntypedResultSet.Row row : defs)
+        {
+            if (deserializeKind(row.getString("type")) == kind)
+                return true;
+        }
+        return false;
     }
 
-    private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw)
+    private static void addDroppedColumns(CFMetaData cfm, Map<String, Long> droppedTimes, Map<String, String> types)
     {
-        Map<ColumnIdentifier, Long> converted = Maps.newHashMap();
-        for (Map.Entry<String, Long> entry : raw.entrySet())
-            converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue());
-        return converted;
+        for (Map.Entry<String, Long> entry : droppedTimes.entrySet())
+        {
+            String name = entry.getKey();
+            long time = entry.getValue();
+            AbstractType<?> type = types.containsKey(name) ? TypeParser.parse(types.get(name)) : null;
+            cfm.getDroppedColumns().put(ColumnIdentifier.getInterned(name, true), new CFMetaData.DroppedColumn(type, time));
+        }
     }
 
     /*
@@ -1129,50 +1156,59 @@ public class LegacySchemaTables
 
     private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
     {
-        ColumnFamily cells = mutation.addOrGet(Columns);
-        Composite prefix = Columns.comparator.make(table.cfName, column.name.toString());
-        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+        RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation)
+                                 .clustering(table.cfName, column.name.toString());
 
         adder.add("validator", column.type.toString());
-        adder.add("type", serializeKind(column.kind));
+        adder.add("type", serializeKind(column.kind, table.isDense()));
         adder.add("component_index", column.isOnAllComponents() ? null : column.position());
         adder.add("index_name", column.getIndexName());
         adder.add("index_type", column.getIndexType() == null ? null : column.getIndexType().toString());
         adder.add("index_options", json(column.getIndexOptions()));
+
+        adder.build();
     }
 
-    private static String serializeKind(ColumnDefinition.Kind kind)
+    private static String serializeKind(ColumnDefinition.Kind kind, boolean isDense)
     {
-        // For backward compatibility we need to special case CLUSTERING_COLUMN
-        return kind == ColumnDefinition.Kind.CLUSTERING_COLUMN ? "clustering_key" : kind.toString().toLowerCase();
+        // For backward compatibility, we special case CLUSTERING_COLUMN and the case where the table is dense.
+        if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
+            return "clustering_key";
+
+        if (kind == ColumnDefinition.Kind.REGULAR && isDense)
+            return "compact_value";
+
+        return kind.toString().toLowerCase();
     }
 
-    private static ColumnDefinition.Kind deserializeKind(String kind)
+    public static ColumnDefinition.Kind deserializeKind(String kind)
     {
         if (kind.equalsIgnoreCase("clustering_key"))
             return ColumnDefinition.Kind.CLUSTERING_COLUMN;
+        if (kind.equalsIgnoreCase("compact_value"))
+            return ColumnDefinition.Kind.REGULAR;
         return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
     }
 
     private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
     {
-        ColumnFamily cells = mutation.addOrGet(Columns);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
         // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
-        Composite prefix = Columns.comparator.make(table.cfName, column.name.toString());
-        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+        RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString());
     }
 
     private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows,
                                                                       String keyspace,
                                                                       String table,
                                                                       AbstractType<?> rawComparator,
-                                                                      boolean isSuper)
+                                                                      AbstractType<?> rawSubComparator,
+                                                                      boolean isSuper,
+                                                                      boolean isCQLTable,
+                                                                      boolean isStaticCompactTable,
+                                                                      boolean needsUpgrade)
     {
         List<ColumnDefinition> columns = new ArrayList<>();
         for (UntypedResultSet.Row row : rows)
-            columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, isSuper));
+            columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, rawSubComparator, isSuper, isCQLTable, isStaticCompactTable, needsUpgrade));
         return columns;
     }
 
@@ -1180,22 +1216,26 @@ public class LegacySchemaTables
                                                               String keyspace,
                                                               String table,
                                                               AbstractType<?> rawComparator,
-                                                              boolean isSuper)
+                                                              AbstractType<?> rawSubComparator,
+                                                              boolean isSuper,
+                                                              boolean isCQLTable,
+                                                              boolean isStaticCompactTable,
+                                                              boolean needsUpgrade)
     {
         ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
+        if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR)
+            kind = ColumnDefinition.Kind.STATIC;
 
         Integer componentIndex = null;
         if (row.has("component_index"))
             componentIndex = row.getInt("component_index");
-        else if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN && isSuper)
-            componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
 
         // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
         // we need to use the comparator fromString method
-        AbstractType<?> comparator = kind == ColumnDefinition.Kind.REGULAR
-                                   ? getComponentComparator(rawComparator, componentIndex)
-                                   : UTF8Type.instance;
-        ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString("column_name")), comparator);
+        AbstractType<?> comparator = isCQLTable
+                                   ? UTF8Type.instance
+                                   : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator);
+        ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
 
         AbstractType<?> validator = parseType(row.getString("validator"));
 
@@ -1214,32 +1254,21 @@ public class LegacySchemaTables
         return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind);
     }
 
-    private static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex)
-    {
-        return (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType)))
-               ? rawComparator
-               : ((CompositeType)rawComparator).types.get(componentIndex);
-    }
-
     /*
      * Trigger metadata serialization/deserialization.
      */
 
     private static void addTriggerToSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation)
     {
-        ColumnFamily cells = mutation.addOrGet(Triggers);
-        Composite prefix = Triggers.comparator.make(table.cfName, trigger.name);
-        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
-        adder.addMapEntry("trigger_options", "class", trigger.classOption);
+        new RowUpdateBuilder(Triggers, timestamp, mutation)
+            .clustering(table.cfName, trigger.name)
+            .addMapEntry("trigger_options", "class", trigger.classOption)
+            .build();
     }
 
     private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation)
     {
-        ColumnFamily cells = mutation.addOrGet(Triggers);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        Composite prefix = Triggers.comparator.make(table.cfName, trigger.name);
-        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+        RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name);
     }
 
     /**
@@ -1248,7 +1277,7 @@ public class LegacySchemaTables
      * @param partition storage-level partition containing the trigger definitions
      * @return the list of processed TriggerDefinitions
      */
-    private static List<TriggerDefinition> createTriggersFromTriggersPartition(Row partition)
+    private static List<TriggerDefinition> createTriggersFromTriggersPartition(RowIterator partition)
     {
         List<TriggerDefinition> triggers = new ArrayList<>();
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, TRIGGERS);
@@ -1275,48 +1304,37 @@ public class LegacySchemaTables
 
     private static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
     {
-        ColumnFamily cells = mutation.addOrGet(Functions);
-        Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function));
-        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+        RowUpdateBuilder adder = new RowUpdateBuilder(Functions, timestamp, mutation)
+                                 .clustering(function.name().name, functionSignatureWithTypes(function));
+
+        adder.add("body", function.body());
+        adder.add("language", function.language());
+        adder.add("return_type", function.returnType().toString());
+        adder.add("called_on_null_input", function.isCalledOnNullInput());
 
         adder.resetCollection("argument_names");
         adder.resetCollection("argument_types");
-
         for (int i = 0; i < function.argNames().size(); i++)
         {
             adder.addListEntry("argument_names", function.argNames().get(i).bytes);
             adder.addListEntry("argument_types", function.argTypes().get(i).toString());
         }
-
-        adder.add("body", function.body());
-        adder.add("language", function.language());
-        adder.add("return_type", function.returnType().toString());
-        adder.add("called_on_null_input", function.isCalledOnNullInput());
+        adder.build();
     }
 
     public static Mutation makeDropFunctionMutation(KSMetaData keyspace, UDFunction function, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
-
-        ColumnFamily cells = mutation.addOrGet(Functions);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function));
-        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
-        return mutation;
+        return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionSignatureWithTypes(function));
     }
 
-    private static Map<ByteBuffer, UDFunction> createFunctionsFromFunctionsPartition(Row partition)
+    private static Collection<UDFunction> createFunctionsFromFunctionsPartition(RowIterator partition)
     {
-        Map<ByteBuffer, UDFunction> functions = new HashMap<>();
+        List<UDFunction> functions = new ArrayList<>();
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, FUNCTIONS);
         for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
-        {
-            UDFunction function = createFunctionFromFunctionRow(row);
-            functions.put(functionSignatureWithNameAndTypes(function), function);
-        }
+            functions.add(createFunctionFromFunctionRow(row));
         return functions;
     }
 
@@ -1387,9 +1405,8 @@ public class LegacySchemaTables
 
     private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
     {
-        ColumnFamily cells = mutation.addOrGet(Aggregates);
-        Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate));
-        CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+        RowUpdateBuilder adder = new RowUpdateBuilder(Aggregates, timestamp, mutation)
+                                 .clustering(aggregate.name().name, functionSignatureWithTypes(aggregate));
 
         adder.resetCollection("argument_types");
         adder.add("return_type", aggregate.returnType().toString());
@@ -1403,17 +1420,16 @@ public class LegacySchemaTables
 
         for (AbstractType<?> argType : aggregate.argTypes())
             adder.addListEntry("argument_types", argType.toString());
+
+        adder.build();
     }
 
-    private static Map<ByteBuffer, UDAggregate> createAggregatesFromAggregatesPartition(Row partition)
+    private static Collection<UDAggregate> createAggregatesFromAggregatesPartition(RowIterator partition)
     {
-        Map<ByteBuffer, UDAggregate> aggregates = new HashMap<>();
+        List<UDAggregate> aggregates = new ArrayList<>();
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, AGGREGATES);
         for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
-        {
-            UDAggregate aggregate = createAggregateFromAggregateRow(row);
-            aggregates.put(functionSignatureWithNameAndTypes(aggregate), aggregate);
-        }
+            aggregates.add(createAggregateFromAggregateRow(row));
         return aggregates;
     }
 
@@ -1475,14 +1491,7 @@ public class LegacySchemaTables
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
         Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
-
-        ColumnFamily cells = mutation.addOrGet(Aggregates);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate));
-        cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
-        return mutation;
+        return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionSignatureWithTypes(aggregate));
     }
 
     private static AbstractType<?> parseType(String str)
@@ -1515,5 +1524,4 @@ public class LegacySchemaTables
             strList.add(argType.asCQL3Type().toString());
         return list.decompose(strList);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index aeee2b9..b1c5508 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -143,14 +143,16 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
     {
         StringBuilder sb = new StringBuilder();
         boolean isFirst = true;
+        sb.append('[');
         for (T element : value)
         {
             if (isFirst)
                 isFirst = false;
             else
-                sb.append("; ");
+                sb.append(", ");
             sb.append(elements.toString(element));
         }
+        sb.append(']');
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index 8350f66..7d81598 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -84,7 +84,7 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
         }
         catch (BufferUnderflowException e)
         {
-            throw new MarshalException("Not enough bytes to read a set");
+            throw new MarshalException("Not enough bytes to read a map");
         }
     }
 
@@ -150,19 +150,19 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
     public String toString(Map<K, V> value)
     {
         StringBuilder sb = new StringBuilder();
+        sb.append('{');
         boolean isFirst = true;
         for (Map.Entry<K, V> element : value.entrySet())
         {
             if (isFirst)
                 isFirst = false;
             else
-                sb.append("; ");
-            sb.append('(');
+                sb.append(", ");
             sb.append(keys.toString(element.getKey()));
-            sb.append(", ");
+            sb.append(": ");
             sb.append(values.toString(element.getValue()));
-            sb.append(')');
         }
+        sb.append('}');
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/SetSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java
index 21f5075..7108630 100644
--- a/src/java/org/apache/cassandra/serializers/SetSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java
@@ -94,13 +94,14 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
         }
         catch (BufferUnderflowException e)
         {
-            throw new MarshalException("Not enough bytes to read a list");
+            throw new MarshalException("Not enough bytes to read a set");
         }
     }
 
     public String toString(Set<T> value)
     {
         StringBuilder sb = new StringBuilder();
+        sb.append('{');
         boolean isFirst = true;
         for (T element : value)
         {
@@ -110,10 +111,11 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
             }
             else
             {
-                sb.append("; ");
+                sb.append(", ");
             }
             sb.append(elements.toString(element));
         }
+        sb.append('}');
         return sb.toString();
     }