You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/07/17 23:13:34 UTC

[2/2] cassandra git commit: Modernize system_schema.tables table

Modernize system_schema.tables table

patch by Aleksey Yeschenko; reviewed by Tyler Hobbs and Robert Stupp for
CASSANDRA-6717


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc852381
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc852381
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc852381

Branch: refs/heads/trunk
Commit: dc8523819ff549acd0c902dc1d118cc404718003
Parents: 8d7c608
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jul 17 23:27:14 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sat Jul 18 00:14:08 2015 +0300

----------------------------------------------------------------------
 ...ra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar | Bin 2162223 -> 2163222 bytes
 lib/cassandra-driver-internal-only-2.6.0c2.zip  | Bin 203206 -> 194427 bytes
 .../apache/cassandra/cache/CachingOptions.java  |  16 +-
 .../org/apache/cassandra/config/CFMetaData.java | 124 +++----
 .../apache/cassandra/cql3/UntypedResultSet.java |   5 +
 .../cql3/statements/AlterTableStatement.java    |   2 -
 .../apache/cassandra/db/RowUpdateBuilder.java   |  39 +-
 .../cassandra/schema/LegacySchemaMigrator.java  |   2 +-
 .../apache/cassandra/schema/SchemaKeyspace.java | 355 +++++++++----------
 .../cassandra/schema/SchemaKeyspace.java.rej    |  80 +++++
 .../cassandra/thrift/ThriftConversion.java      |  43 +--
 .../utils/NativeSSTableLoaderClient.java        |  72 ++--
 .../org/apache/cassandra/UpdateBuilder.java     |   6 +-
 .../apache/cassandra/config/CFMetaDataTest.java |   2 +-
 .../cql3/validation/operations/AlterTest.java   |  38 +-
 .../cql3/validation/operations/CreateTest.java  |  24 +-
 .../schema/LegacySchemaMigratorTest.java        |  12 +-
 17 files changed, 422 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar
index 9051202..0d626f5 100644
Binary files a/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar and b/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/lib/cassandra-driver-internal-only-2.6.0c2.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-2.6.0c2.zip b/lib/cassandra-driver-internal-only-2.6.0c2.zip
index be9f162..ce91907 100644
Binary files a/lib/cassandra-driver-internal-only-2.6.0c2.zip and b/lib/cassandra-driver-internal-only-2.6.0c2.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/cache/CachingOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CachingOptions.java b/src/java/org/apache/cassandra/cache/CachingOptions.java
index 1c82f55..686f365 100644
--- a/src/java/org/apache/cassandra/cache/CachingOptions.java
+++ b/src/java/org/apache/cassandra/cache/CachingOptions.java
@@ -17,18 +17,16 @@
  */
 package org.apache.cassandra.cache;
 
+import java.util.*;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
+
 import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
 
 /*
-CQL: { 'keys' : 'ALL|NONE', 'rows_per_partition': '200|NONE|ALL' }
+ * CQL: { 'keys' : 'ALL|NONE', 'rows_per_partition': '200|NONE|ALL' }
  */
 public class CachingOptions
 {
@@ -67,6 +65,14 @@ public class CachingOptions
         return new CachingOptions(KeyCache.fromString(cacheConfig.get("keys")), RowCache.fromString(cacheConfig.get("rows_per_partition")));
     }
 
+    public Map<String, String> asMap()
+    {
+        Map<String, String> map = new HashMap<>(2);
+        map.put("keys", keyCache.toString());
+        map.put("rows_per_partition", rowCache.toString());
+        return map;
+    }
+
     private static void validateCacheConfig(Map<String, String> cacheConfig) throws ConfigurationException
     {
         for (Map.Entry<String, String> entry : cacheConfig.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 4505c6d..81ef217 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -50,10 +51,7 @@ import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.schema.Triggers;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
 import org.github.jamm.Unmetered;
 
 /**
@@ -62,6 +60,11 @@ import org.github.jamm.Unmetered;
 @Unmetered
 public final class CFMetaData
 {
+    public enum Flag
+    {
+        SUPER, COUNTER, DENSE, COMPOUND
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(CFMetaData.class);
 
     public static final Serializer serializer = new Serializer();
@@ -81,15 +84,6 @@ public final class CFMetaData
     // Note that this is the default only for user created tables
     public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
 
-    // Note that this need to come *before* any CFMetaData is defined so before the compile below.
-    private static final Comparator<ColumnDefinition> regularColumnComparator = new Comparator<ColumnDefinition>()
-    {
-        public int compare(ColumnDefinition def1, ColumnDefinition def2)
-        {
-            return ByteBufferUtil.compareUnsigned(def1.name.bytes, def2.name.bytes);
-        }
-    };
-
     public static class SpeculativeRetry
     {
         public enum RetryType
@@ -170,8 +164,13 @@ public final class CFMetaData
     public final UUID cfId;                           // internal id, never exposed to user
     public final String ksName;                       // name of keyspace
     public final String cfName;                       // name of this column family
-    public final boolean isSuper;                     // is a thrift super column family
-    public final boolean isCounter;                   // is a counter table
+
+    private final ImmutableSet<Flag> flags;
+    private final boolean isDense;
+    private final boolean isCompound;
+    private final boolean isSuper;
+    private final boolean isCounter;
+
     public volatile ClusteringComparator comparator;  // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns
 
     private final Serializers serializers;
@@ -201,14 +200,11 @@ public final class CFMetaData
      * clustering key ones, those list are ordered by the "component index" of the
      * elements.
      */
-    private volatile Map<ByteBuffer, ColumnDefinition> columnMetadata = new HashMap<>();
+    private final Map<ByteBuffer, ColumnDefinition> columnMetadata = new ConcurrentHashMap<>(); // not on any hot path
     private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
     private volatile List<ColumnDefinition> clusteringColumns;    // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
     private volatile PartitionColumns partitionColumns;
 
-    private final boolean isDense;
-    private final boolean isCompound;
-
     // For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep
     // that as convenience to access that column more easily (but we could replace calls by partitionColumns().iterator().next()
     // for those tables in practice).
@@ -253,11 +249,23 @@ public final class CFMetaData
         this.cfId = cfId;
         this.ksName = keyspace;
         this.cfName = name;
+
         this.isDense = isDense;
         this.isCompound = isCompound;
         this.isSuper = isSuper;
         this.isCounter = isCounter;
 
+        EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
+        if (isSuper)
+            flags.add(Flag.SUPER);
+        if (isCounter)
+            flags.add(Flag.COUNTER);
+        if (isDense)
+            flags.add(Flag.DENSE);
+        if (isCompound)
+            flags.add(Flag.COMPOUND);
+        this.flags = Sets.immutableEnumSet(flags);
+
         // A compact table should always have a clustering
         assert isCQLTable() || !clusteringColumns.isEmpty() : String.format("For table %s.%s, isDense=%b, isCompound=%b, clustering=%s", ksName, cfName, isDense, isCompound, clusteringColumns);
 
@@ -342,6 +350,11 @@ public final class CFMetaData
         return types;
     }
 
+    public Set<Flag> flags()
+    {
+        return flags;
+    }
+
     /**
      * There is a couple of places in the code where we need a CFMetaData object and don't have one readily available
      * and know that only the keyspace and name matter. This creates such "fake" metadata. Use only if you know what
@@ -421,10 +434,10 @@ public final class CFMetaData
         return copyOpts(new CFMetaData(ksName,
                                        cfName,
                                        newCfId,
-                                       isSuper,
-                                       isCounter,
-                                       isDense,
-                                       isCompound,
+                                       isSuper(),
+                                       isCounter(),
+                                       isDense(),
+                                       isCompound(),
                                        copy(partitionKeyColumns),
                                        copy(clusteringColumns),
                                        copy(partitionColumns)),
@@ -489,11 +502,6 @@ public final class CFMetaData
         return comment;
     }
 
-    public boolean isSuper()
-    {
-        return isSuper;
-    }
-
     /**
      * The '.' char is the only way to identify if the CFMetadata is for a secondary index
      */
@@ -555,12 +563,12 @@ public final class CFMetaData
         return keyValidator;
     }
 
-    public Integer getMinCompactionThreshold()
+    public int getMinCompactionThreshold()
     {
         return minCompactionThreshold;
     }
 
-    public Integer getMaxCompactionThreshold()
+    public int getMaxCompactionThreshold()
     {
         return maxCompactionThreshold;
     }
@@ -721,12 +729,9 @@ public final class CFMetaData
         CFMetaData other = (CFMetaData) o;
 
         return Objects.equal(cfId, other.cfId)
+            && Objects.equal(flags, other.flags)
             && Objects.equal(ksName, other.ksName)
             && Objects.equal(cfName, other.cfName)
-            && Objects.equal(isDense, other.isDense)
-            && Objects.equal(isCompound, other.isCompound)
-            && Objects.equal(isSuper, other.isSuper)
-            && Objects.equal(isCounter, other.isCounter)
             && Objects.equal(comparator, other.comparator)
             && Objects.equal(comment, other.comment)
             && Objects.equal(readRepairChance, other.readRepairChance)
@@ -757,10 +762,7 @@ public final class CFMetaData
             .append(cfId)
             .append(ksName)
             .append(cfName)
-            .append(isDense)
-            .append(isCompound)
-            .append(isSuper)
-            .append(isCounter)
+            .append(flags)
             .append(comparator)
             .append(comment)
             .append(readRepairChance)
@@ -862,7 +864,7 @@ public final class CFMetaData
             throw new ConfigurationException(String.format("Column family ID mismatch (found %s; expected %s)",
                                                            cfm.cfId, cfId));
 
-        if (cfm.isDense != isDense || cfm.isCompound != isCompound || cfm.isCounter != isCounter || cfm.isSuper != isSuper)
+        if (!cfm.flags.equals(flags))
             throw new ConfigurationException("types do not match.");
 
         if (!cfm.comparator.isCompatibleWith(comparator))
@@ -1027,7 +1029,7 @@ public final class CFMetaData
             throw new ConfigurationException("CounterColumnType is not a valid key validator");
 
         // Mixing counter with non counter columns is not supported (#2614)
-        if (isCounter)
+        if (isCounter())
         {
             for (ColumnDefinition def : partitionColumns())
                 if (!(def.type instanceof CounterColumnType) && !CompactTables.isSuperColumnMapColumn(def))
@@ -1051,7 +1053,7 @@ public final class CFMetaData
             }
             else
             {
-                if (isSuper)
+                if (isSuper())
                     throw new ConfigurationException("Secondary indexes are not supported on super column families");
                 if (!isIndexNameValid(c.getIndexName()))
                     throw new ConfigurationException("Illegal index name " + c.getIndexName());
@@ -1172,7 +1174,7 @@ public final class CFMetaData
                 builder.add(def);
                 partitionColumns = builder.build();
                 // If dense, we must have modified the compact value since that's the only one we can have.
-                if (isDense)
+                if (isDense())
                     this.compactValueColumn = def;
                 break;
         }
@@ -1235,23 +1237,7 @@ public final class CFMetaData
 
     public boolean isStaticCompactTable()
     {
-        return !isSuper && !isDense() && !isCompound();
-    }
-
-    private static <T> boolean hasNoNulls(List<T> l)
-    {
-        for (T t : l)
-            if (t == null)
-                return false;
-        return true;
-    }
-
-    private static <T> List<T> nullInitializedList(int size)
-    {
-        List<T> l = new ArrayList<>(size);
-        for (int i = 0; i < size; ++i)
-            l.add(null);
-        return l;
+        return !isSuper() && !isDense() && !isCompound();
     }
 
     /**
@@ -1262,11 +1248,6 @@ public final class CFMetaData
         return isCompactTable();
     }
 
-    public boolean isCounter()
-    {
-        return isCounter;
-    }
-
     public boolean hasStaticColumns()
     {
         return !partitionColumns.statics.isEmpty();
@@ -1280,6 +1261,16 @@ public final class CFMetaData
         return false;
     }
 
+    public boolean isSuper()
+    {
+        return isSuper;
+    }
+
+    public boolean isCounter()
+    {
+        return isCounter;
+    }
+
     // We call dense a CF for which each component of the comparator is a clustering column, i.e. no
     // component is used to store a regular column names. In other words, non-composite static "thrift"
     // and CQL3 CF are *not* dense.
@@ -1312,10 +1303,7 @@ public final class CFMetaData
             .append("cfId", cfId)
             .append("ksName", ksName)
             .append("cfName", cfName)
-            .append("isDense", isDense)
-            .append("isCompound", isCompound)
-            .append("isSuper", isSuper)
-            .append("isCounter", isCounter)
+            .append("flags", flags)
             .append("comparator", comparator)
             .append("partitionColumns", partitionColumns)
             .append("partitionKeyColumns", partitionKeyColumns)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index f481f5c..978cb96 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -335,6 +335,11 @@ public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
             return raw == null ? null : MapType.getInstance(keyType, valueType, true).compose(raw);
         }
 
+        public Map<String, String> getTextMap(String column)
+        {
+            return getMap(column, UTF8Type.instance, UTF8Type.instance);
+        }
+
         public List<ColumnSpecification> getColumns()
         {
             return columns;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index d1888bc..e0c5f4e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.cassandra.auth.Permission;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index c3f3d29..627321e 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -18,10 +18,13 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.partitions.*;
@@ -216,7 +219,7 @@ public class RowUpdateBuilder
     {
         ColumnDefinition c = getDefinition(columnName);
         assert c != null : "Cannot find column " + columnName;
-        assert c.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + c + " since no clustering hasn't been provided";
+        assert c.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + c + " since no clustering has been provided";
         assert c.type.isCollection() && c.type.isMultiCell();
         writer(c).writeComplexDeletion(c, new SimpleDeletionTime(defaultLiveness.timestamp() - 1, deletionTime.localDeletionTime()));
         return this;
@@ -284,11 +287,27 @@ public class RowUpdateBuilder
         return ((AbstractType)type).decompose(value);
     }
 
+    public RowUpdateBuilder map(String columnName, Map<?, ?> map)
+    {
+        resetCollection(columnName);
+        for (Map.Entry<?, ?> entry : map.entrySet())
+            addMapEntry(columnName, entry.getKey(), entry.getValue());
+        return this;
+    }
+
+    public RowUpdateBuilder set(String columnName, Set<?> set)
+    {
+        resetCollection(columnName);
+        for (Object element : set)
+            addSetEntry(columnName, element);
+        return this;
+    }
+
     public RowUpdateBuilder addMapEntry(String columnName, Object key, Object value)
     {
         ColumnDefinition c = getDefinition(columnName);
-        assert c.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + c + " since no clustering hasn't been provided";
-        assert c.type instanceof MapType;
+        assert c.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + c + " since no clustering has been provided";
+        assert c.type instanceof MapType && c.type.isMultiCell();
         MapType mt = (MapType)c.type;
         writer(c).writeCell(c, false, bb(value, mt.getValuesType()), defaultLiveness, CellPath.create(bb(key, mt.getKeysType())));
         return this;
@@ -297,13 +316,23 @@ public class RowUpdateBuilder
     public RowUpdateBuilder addListEntry(String columnName, Object value)
     {
         ColumnDefinition c = getDefinition(columnName);
-        assert c.isStatic() || hasSetClustering : "Cannot set non static column " + c + " since no clustering hasn't been provided";
-        assert c.type instanceof ListType;
+        assert c.isStatic() || hasSetClustering : "Cannot set non static column " + c + " since no clustering has been provided";
+        assert c.type instanceof ListType && c.type.isMultiCell();
         ListType lt = (ListType)c.type;
         writer(c).writeCell(c, false, bb(value, lt.getElementsType()), defaultLiveness, CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())));
         return this;
     }
 
+    public RowUpdateBuilder addSetEntry(String columnName, Object value)
+    {
+        ColumnDefinition c = getDefinition(columnName);
+        assert c.isStatic() || hasSetClustering : "Cannot set non static column " + c + " since no clustering has been provided";
+        assert c.type instanceof SetType && c.type.isMultiCell();
+        SetType st = (SetType)c.type;
+        writer(c).writeCell(c, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, defaultLiveness, CellPath.create(bb(value, st.getElementsType())));
+        return this;
+    }
+
     private ColumnDefinition getDefinition(String name)
     {
         return update.metadata().getColumnDefinition(new ColumnIdentifier(name, true));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index e8f8222..159396b 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -484,7 +484,7 @@ public final class LegacySchemaMigrator
     private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row)
     {
         String name = row.getString("trigger_name");
-        String classOption = row.getMap("trigger_options", UTF8Type.instance, UTF8Type.instance).get("class");
+        String classOption = row.getTextMap("trigger_options").get("class");
         return new TriggerMetadata(name, classOption);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 90755fb..739d8a3 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -37,8 +37,10 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.cql3.statements.CFPropDefs;
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
@@ -50,6 +52,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
+import static java.util.stream.Collectors.toSet;
+
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
 import static org.apache.cassandra.utils.FBUtilities.json;
@@ -93,31 +97,26 @@ public final class SchemaKeyspace
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "table_name text,"
+                + "id uuid,"
                 + "bloom_filter_fp_chance double,"
-                + "caching text,"
-                + "cf_id uuid," // post-2.1 UUID cfid
+                + "caching map<text, text>,"
                 + "comment text,"
-                + "compaction_strategy_class text,"
-                + "compaction_strategy_options text,"
-                + "comparator text,"
-                + "compression_parameters text,"
+                + "compaction map<text, text>,"
+                + "compression map<text, text>,"
+                + "dclocal_read_repair_chance double,"
                 + "default_time_to_live int,"
-                + "default_validator text,"
-                + "dropped_columns map<text, bigint>,"
-                + "dropped_columns_types map<text, text>,"
+                + "flags set<text>," // SUPER, COUNTER, DENSE, COMPOUND
                 + "gc_grace_seconds int,"
-                + "is_dense boolean,"
-                + "key_validator text,"
-                + "local_read_repair_chance double,"
-                + "max_compaction_threshold int,"
                 + "max_index_interval int,"
                 + "memtable_flush_period_in_ms int,"
-                + "min_compaction_threshold int,"
                 + "min_index_interval int,"
                 + "read_repair_chance double,"
                 + "speculative_retry text,"
-                + "subcomparator text,"
-                + "type text,"
+
+                // TODO: move into a separate table
+                + "dropped_columns map<text, bigint>,"
+                + "dropped_columns_types map<text, text>,"
+
                 + "PRIMARY KEY ((keyspace_name), table_name))");
 
     private static final CFMetaData Columns =
@@ -127,6 +126,7 @@ public final class SchemaKeyspace
                 + "keyspace_name text,"
                 + "table_name text,"
                 + "column_name text,"
+                + "column_name_bytes blob,"
                 + "component_index int,"
                 + "index_name text,"
                 + "index_options text,"
@@ -416,8 +416,7 @@ public final class SchemaKeyspace
 
     private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey)
     {
-        return getSchemaKSKey(SystemKeyspace.NAME).equals(partitionKey.getKey()) ||
-               getSchemaKSKey(NAME).equals(partitionKey.getKey());
+        return Schema.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey()));
     }
 
     /**
@@ -659,14 +658,9 @@ public final class SchemaKeyspace
     public static Mutation makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp)
     {
         RowUpdateBuilder adder = new RowUpdateBuilder(Keyspaces, timestamp, name).clustering();
-
-        adder.add("durable_writes", params.durableWrites);
-
-        adder.resetCollection("replication");
-        for (Map.Entry<String, String> option : params.replication.asMap().entrySet())
-            adder.addMapEntry("replication", option.getKey(), option.getValue());
-
-        return adder.build();
+        return adder.add(KeyspaceParams.Option.DURABLE_WRITES.toString(), params.durableWrites)
+                    .map(KeyspaceParams.Option.REPLICATION.toString(), params.replication.asMap())
+                    .build();
     }
 
     public static Mutation makeCreateKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp)
@@ -720,10 +714,8 @@ public final class SchemaKeyspace
         String query = String.format("SELECT * FROM %s.%s", NAME, KEYSPACES);
         UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one();
 
-        boolean durableWrites = row.getBoolean("durable_writes");
-        Map<String, String> replication= row.getMap("replication", UTF8Type.instance, UTF8Type.instance);
-
-        return KeyspaceParams.create(durableWrites, replication);
+        return KeyspaceParams.create(row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()),
+                                     row.getTextMap(KeyspaceParams.Option.REPLICATION.toString()));
     }
 
     /*
@@ -802,44 +794,23 @@ public final class SchemaKeyspace
 
     static void addTableToSchemaMutation(CFMetaData table, long timestamp, boolean withColumnsAndTriggers, Mutation mutation)
     {
-        // For property that can be null (and can be changed), we insert tombstones, to make sure
-        // we don't keep a property the user has removed
-        RowUpdateBuilder adder = new RowUpdateBuilder(Tables, timestamp, mutation)
-                                 .clustering(table.cfName);
-
-        adder.add("cf_id", table.cfId)
-             .add("type", table.isSuper() ? "Super" : "Standard");
-
-        if (table.isSuper())
-        {
-            // We need to continue saving the comparator and subcomparator separatly, otherwise
-            // we won't know at deserialization if the subcomparator should be taken into account
-            // TODO: we should implement an on-start migration if we want to get rid of that.
-            adder.add("comparator", table.comparator.subtype(0).toString())
-                 .add("subcomparator", ((MapType)table.compactValueColumn().type).getKeysType().toString());
-        }
-        else
-        {
-            adder.add("comparator", LegacyLayout.makeLegacyComparator(table).toString());
-        }
+        RowUpdateBuilder adder = new RowUpdateBuilder(Tables, timestamp, mutation).clustering(table.cfName);
 
         adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance())
-             .add("caching", table.getCaching().toString())
              .add("comment", table.getComment())
-             .add("compaction_strategy_class", table.compactionStrategyClass.getName())
-             .add("compaction_strategy_options", json(table.compactionStrategyOptions))
-             .add("compression_parameters", json(table.compressionParameters.asMap()))
+             .add("dclocal_read_repair_chance", table.getDcLocalReadRepairChance())
              .add("default_time_to_live", table.getDefaultTimeToLive())
              .add("gc_grace_seconds", table.getGcGraceSeconds())
-             .add("key_validator", table.getKeyValidator().toString())
-             .add("local_read_repair_chance", table.getDcLocalReadRepairChance())
-             .add("max_compaction_threshold", table.getMaxCompactionThreshold())
+             .add("id", table.cfId)
              .add("max_index_interval", table.getMaxIndexInterval())
              .add("memtable_flush_period_in_ms", table.getMemtableFlushPeriod())
-             .add("min_compaction_threshold", table.getMinCompactionThreshold())
              .add("min_index_interval", table.getMinIndexInterval())
              .add("read_repair_chance", table.getReadRepairChance())
-             .add("speculative_retry", table.getSpeculativeRetry().toString());
+             .add("speculative_retry", table.getSpeculativeRetry().toString())
+             .map("caching", table.getCaching().asMap())
+             .map("compaction", buildCompactionMap(table))
+             .map("compression", table.compressionParameters().asMap())
+             .set("flags", flagsToStrings(table.flags()));
 
         for (Map.Entry<ByteBuffer, CFMetaData.DroppedColumn> entry : table.getDroppedColumns().entrySet())
         {
@@ -850,10 +821,6 @@ public final class SchemaKeyspace
                 adder.addMapEntry("dropped_columns_types", name, column.type.toString());
         }
 
-        adder.add("is_dense", table.isDense());
-
-        adder.add("default_validator", table.makeLegacyDefaultValidator().toString());
-
         if (withColumnsAndTriggers)
         {
             for (ColumnDefinition column : table.allColumns())
@@ -866,6 +833,40 @@ public final class SchemaKeyspace
         adder.build();
     }
 
+    /*
+     * The method is needed - temporarily - to migrate max_compaction_threshold and min_compaction_threshold
+     * to the compaction map, where they belong.
+     *
+     * We must use reflection to validate the options because not every compaction strategy respects and supports
+     * the threshold params (LCS doesn't, STCS and DTCS don't).
+     */
+    @SuppressWarnings("unchecked")
+    private static Map<String, String> buildCompactionMap(CFMetaData cfm)
+    {
+        Map<String, String> options = new HashMap<>(cfm.compactionStrategyOptions);
+
+        Map<String, String> optionsWithThresholds = new HashMap<>(options);
+        options.putIfAbsent(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, Integer.toString(cfm.getMinCompactionThreshold()));
+        options.putIfAbsent(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, Integer.toString(cfm.getMaxCompactionThreshold()));
+
+        try
+        {
+            Map<String, String> unrecognizedOptions = (Map<String, String>) cfm.compactionStrategyClass
+                                                                               .getMethod("validateOptions", Map.class)
+                                                                               .invoke(null, optionsWithThresholds);
+            if (unrecognizedOptions.isEmpty())
+                options = optionsWithThresholds;
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        options.put("class", cfm.compactionStrategyClass.getName());
+
+        return options;
+    }
+
     public static Mutation makeUpdateTableMutation(KeyspaceMetadata keyspace,
                                                    CFMetaData oldTable,
                                                    CFMetaData newTable,
@@ -960,16 +961,10 @@ public final class SchemaKeyspace
         return tables.build();
     }
 
-    public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator serializedTable, RowIterator serializedColumns)
-    {
-        String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
-        return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns);
-    }
-
-    private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, RowIterator serializedColumns)
+    private static List<ColumnDefinition> createColumnsFromColumnsPartition(RowIterator serializedColumns)
     {
         String query = String.format("SELECT * FROM %s.%s", NAME, COLUMNS);
-        return createTableFromTableRowAndColumnRows(tableRow, QueryProcessor.resultify(query, serializedColumns));
+        return createColumnsFromColumnRows(QueryProcessor.resultify(query, serializedColumns));
     }
 
     private static CFMetaData createTableFromTablePartition(RowIterator partition)
@@ -978,94 +973,106 @@ public final class SchemaKeyspace
         return createTableFromTableRow(QueryProcessor.resultify(query, partition).one());
     }
 
+    public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator tablePartition,
+                                                                              RowIterator columnsPartition)
+    {
+        List<ColumnDefinition> columns = createColumnsFromColumnsPartition(columnsPartition);
+        String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
+        return createTableFromTableRowAndColumns(QueryProcessor.resultify(query, tablePartition).one(), columns);
+    }
+
     /**
      * Deserialize table metadata from low-level representation
      *
      * @return Metadata deserialized from schema
      */
-    private static CFMetaData createTableFromTableRow(UntypedResultSet.Row result)
+    private static CFMetaData createTableFromTableRow(UntypedResultSet.Row row)
     {
-        String ksName = result.getString("keyspace_name");
-        String cfName = result.getString("table_name");
+        String keyspace = row.getString("keyspace_name");
+        String table = row.getString("table_name");
 
-        CFMetaData cfm = readSchemaPartitionForTableAndApply(COLUMNS, ksName, cfName, partition -> createTableFromTableRowAndColumnsPartition(result, partition));
+        List<ColumnDefinition> columns =
+            readSchemaPartitionForTableAndApply(COLUMNS, keyspace, table, SchemaKeyspace::createColumnsFromColumnsPartition);
 
-        readSchemaPartitionForTableAndApply(TRIGGERS, ksName, cfName, partition -> cfm.triggers(createTriggersFromTriggersPartition(partition)));
+        Triggers triggers =
+            readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition);
 
-        return cfm;
+        return createTableFromTableRowAndColumns(row, columns).triggers(triggers);
     }
 
-    public static CFMetaData createTableFromTableRowAndColumnRows(UntypedResultSet.Row result,
-                                                                  UntypedResultSet serializedColumnDefinitions)
+    public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns)
     {
-        String ksName = result.getString("keyspace_name");
-        String cfName = result.getString("table_name");
-
-        AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
-        AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
-
-        boolean isSuper = "super".equals(result.getString("type").toLowerCase());
-        boolean isDense = result.getBoolean("is_dense");
-        boolean isCompound = rawComparator instanceof CompositeType;
-
-        // We don't really use the default validator but as we have it for backward compatibility, we use it to know if it's a counter table
-        AbstractType<?> defaultValidator = TypeParser.parse(result.getString("default_validator"));
-        boolean isCounter = defaultValidator instanceof CounterColumnType;
-
-        UUID cfId = result.getUUID("cf_id");
-
-        boolean isCQLTable = !isSuper && !isDense && isCompound;
-
-        List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions,
-                                                                        ksName,
-                                                                        cfName,
-                                                                        rawComparator,
-                                                                        subComparator,
-                                                                        isSuper,
-                                                                        isCQLTable);
-
-        CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, columnDefs);
-
-        cfm.readRepairChance(result.getDouble("read_repair_chance"));
-        cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
-        cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
-        cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
-        cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
-        if (result.has("comment"))
-            cfm.comment(result.getString("comment"));
-        if (result.has("memtable_flush_period_in_ms"))
-            cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
-        cfm.caching(CachingOptions.fromString(result.getString("caching")));
-        if (result.has("default_time_to_live"))
-            cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
-        if (result.has("speculative_retry"))
-            cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry")));
-        cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class")));
-        cfm.compressionParameters(CompressionParameters.fromMap(fromJsonMap(result.getString("compression_parameters"))));
-        cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
-
-        if (result.has("min_index_interval"))
-            cfm.minIndexInterval(result.getInt("min_index_interval"));
-
-        if (result.has("max_index_interval"))
-            cfm.maxIndexInterval(result.getInt("max_index_interval"));
-
-        if (result.has("bloom_filter_fp_chance"))
-            cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
-        else
-            cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
-
-        if (result.has("dropped_columns"))
+        String keyspace = row.getString("keyspace_name");
+        String table = row.getString("table_name");
+        UUID id = row.getUUID("id");
+
+        Set<CFMetaData.Flag> flags = row.has("flags")
+                                   ? flagsFromStrings(row.getSet("flags", UTF8Type.instance))
+                                   : Collections.emptySet();
+
+        boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
+        boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
+        boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
+        boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+
+        CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, columns);
+
+        Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction"));
+        Class<? extends AbstractCompactionStrategy> compactionStrategyClass =
+            CFMetaData.createCompactionStrategy(compaction.remove("class"));
+
+        int minCompactionThreshold = compaction.containsKey(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD)
+                                   ? Integer.parseInt(compaction.get(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD))
+                                   : CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD;
+
+        int maxCompactionThreshold = compaction.containsKey(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD)
+                                   ? Integer.parseInt(compaction.get(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD))
+                                   : CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD;
+
+        cfm.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
+           .caching(CachingOptions.fromMap(row.getTextMap("caching")))
+           .comment(row.getString("comment"))
+           .compactionStrategyClass(compactionStrategyClass)
+           .compactionStrategyOptions(compaction)
+           .compressionParameters(CompressionParameters.fromMap(row.getTextMap("compression")))
+           .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
+           .defaultTimeToLive(row.getInt("default_time_to_live"))
+           .gcGraceSeconds(row.getInt("gc_grace_seconds"))
+           .maxCompactionThreshold(maxCompactionThreshold)
+           .maxIndexInterval(row.getInt("max_index_interval"))
+           .memtableFlushPeriod(row.getInt("memtable_flush_period_in_ms"))
+           .minCompactionThreshold(minCompactionThreshold)
+           .minIndexInterval(row.getInt("min_index_interval"))
+           .readRepairChance(row.getDouble("read_repair_chance"))
+           .speculativeRetry(CFMetaData.SpeculativeRetry.fromString(row.getString("speculative_retry")));
+
+        if (row.has("dropped_columns"))
         {
-            Map<String, String> types = result.has("dropped_columns_types")
-                                      ? result.getMap("dropped_columns_types", UTF8Type.instance, UTF8Type.instance) 
+            Map<String, String> types = row.has("dropped_columns_types")
+                                      ? row.getTextMap("dropped_columns_types")
                                       : Collections.<String, String>emptyMap();
-            addDroppedColumns(cfm, result.getMap("dropped_columns", UTF8Type.instance, LongType.instance), types);
+            addDroppedColumns(cfm, row.getMap("dropped_columns", UTF8Type.instance, LongType.instance), types);
         }
 
         return cfm;
     }
 
+    public static Set<CFMetaData.Flag> flagsFromStrings(Set<String> strings)
+    {
+        return strings.stream()
+                      .map(String::toUpperCase)
+                      .map(CFMetaData.Flag::valueOf)
+                      .collect(toSet());
+    }
+
+    private static Set<String> flagsToStrings(Set<CFMetaData.Flag> flags)
+    {
+        return flags.stream()
+                    .map(CFMetaData.Flag::toString)
+                    .map(String::toLowerCase)
+                    .collect(toSet());
+    }
+
     private static void addDroppedColumns(CFMetaData cfm, Map<String, Long> droppedTimes, Map<String, String> types)
     {
         for (Map.Entry<String, Long> entry : droppedTimes.entrySet())
@@ -1083,11 +1090,11 @@ public final class SchemaKeyspace
 
     private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
     {
-        RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation)
-                                 .clustering(table.cfName, column.name.toString());
+        RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString());
 
-        adder.add("validator", column.type.toString())
-             .add("type", serializeKind(column.kind, table.isDense()))
+        adder.add("column_name_bytes", column.name.bytes)
+             .add("validator", column.type.toString())
+             .add("type", column.kind.toString().toLowerCase())
              .add("component_index", column.isOnAllComponents() ? null : column.position())
              .add("index_name", column.getIndexName())
              .add("index_type", column.getIndexType() == null ? null : column.getIndexType().toString())
@@ -1095,68 +1102,32 @@ public final class SchemaKeyspace
              .build();
     }
 
-    private static String serializeKind(ColumnDefinition.Kind kind, boolean isDense)
-    {
-        // For backward compatibility, we special case CLUSTERING and the case where the table is dense.
-        if (kind == ColumnDefinition.Kind.CLUSTERING)
-            return "clustering_key";
-
-        if (kind == ColumnDefinition.Kind.REGULAR && isDense)
-            return "compact_value";
-
-        return kind.toString().toLowerCase();
-    }
-
-    public static ColumnDefinition.Kind deserializeKind(String kind)
-    {
-        if ("clustering_key".equalsIgnoreCase(kind))
-            return ColumnDefinition.Kind.CLUSTERING;
-        if ("compact_value".equalsIgnoreCase(kind))
-            return ColumnDefinition.Kind.REGULAR;
-        return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
-    }
-
     private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
     {
         // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
         RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString());
     }
 
-    private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows,
-                                                                      String keyspace,
-                                                                      String table,
-                                                                      AbstractType<?> rawComparator,
-                                                                      AbstractType<?> rawSubComparator,
-                                                                      boolean isSuper,
-                                                                      boolean isCQLTable)
-    {
-        List<ColumnDefinition> columns = new ArrayList<>();
-        for (UntypedResultSet.Row row : rows)
-            columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, rawSubComparator, isSuper, isCQLTable));
+    private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows)
+{
+        List<ColumnDefinition> columns = new ArrayList<>(rows.size());
+        rows.forEach(row -> columns.add(createColumnFromColumnRow(row)));
         return columns;
     }
 
-    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
-                                                              String keyspace,
-                                                              String table,
-                                                              AbstractType<?> rawComparator,
-                                                              AbstractType<?> rawSubComparator,
-                                                              boolean isSuper,
-                                                              boolean isCQLTable)
+    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row)
     {
-        ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
+        String keyspace = row.getString("keyspace_name");
+        String table = row.getString("table_name");
+
+        ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));
+
+        ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("type").toUpperCase());
 
         Integer componentIndex = null;
         if (row.has("component_index"))
             componentIndex = row.getInt("component_index");
 
-        // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
-        // we need to use the comparator fromString method
-        AbstractType<?> comparator = isCQLTable
-                                   ? UTF8Type.instance
-                                   : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator);
-        ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
-
         AbstractType<?> validator = parseType(row.getString("validator"));
 
         IndexType indexType = null;
@@ -1208,7 +1179,7 @@ public final class SchemaKeyspace
     private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row)
     {
         String name = row.getString("trigger_name");
-        String classOption = row.getMap("trigger_options", UTF8Type.instance, UTF8Type.instance).get("class");
+        String classOption = row.getTextMap("trigger_options").get("class");
         return new TriggerMetadata(name, classOption);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/schema/SchemaKeyspace.java.rej
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java.rej b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java.rej
new file mode 100644
index 0000000..460fc3a
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java.rej
@@ -0,0 +1,80 @@
+diff a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java	(rejected hunks)
+@@ -1095,68 +1095,32 @@ public final class SchemaKeyspace
+              .build();
+     }
+ 
+-    private static String serializeKind(ColumnDefinition.Kind kind, boolean isDense)
+-    {
+-        // For backward compatibility, we special case CLUSTERING_COLUMN and the case where the table is dense.
+-        if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
+-            return "clustering_key";
+-
+-        if (kind == ColumnDefinition.Kind.REGULAR && isDense)
+-            return "compact_value";
+-
+-        return kind.toString().toLowerCase();
+-    }
+-
+-    public static ColumnDefinition.Kind deserializeKind(String kind)
+-    {
+-        if ("clustering_key".equalsIgnoreCase(kind))
+-            return ColumnDefinition.Kind.CLUSTERING_COLUMN;
+-        if ("compact_value".equalsIgnoreCase(kind))
+-            return ColumnDefinition.Kind.REGULAR;
+-        return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
+-    }
+-
+     private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
+     {
+         // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
+         RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString());
+     }
+ 
+-    private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows,
+-                                                                      String keyspace,
+-                                                                      String table,
+-                                                                      AbstractType<?> rawComparator,
+-                                                                      AbstractType<?> rawSubComparator,
+-                                                                      boolean isSuper,
+-                                                                      boolean isCQLTable)
++    private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows)
+     {
+-        List<ColumnDefinition> columns = new ArrayList<>();
+-        for (UntypedResultSet.Row row : rows)
+-            columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, rawSubComparator, isSuper, isCQLTable));
++        List<ColumnDefinition> columns = new ArrayList<>(rows.size());
++        rows.forEach(row -> columns.add(createColumnFromColumnRow(row)));
+         return columns;
+     }
+ 
+-    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
+-                                                              String keyspace,
+-                                                              String table,
+-                                                              AbstractType<?> rawComparator,
+-                                                              AbstractType<?> rawSubComparator,
+-                                                              boolean isSuper,
+-                                                              boolean isCQLTable)
++    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row)
+     {
+-        ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
++        String keyspace = row.getString("keyspace_name");
++        String table = row.getString("table_name");
++
++        ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));
++
++        ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("type").toUpperCase());
+ 
+         Integer componentIndex = null;
+         if (row.has("component_index"))
+             componentIndex = row.getInt("component_index");
+ 
+-        // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
+-        // we need to use the comparator fromString method
+-        AbstractType<?> comparator = isCQLTable
+-                                   ? UTF8Type.instance
+-                                   : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator);
+-        ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
+-
+         AbstractType<?> validator = parseType(row.getString("validator"));
+ 
+         IndexType indexType = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 0afc778..d99217d 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.thrift;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -32,7 +31,6 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.LegacyLayout;
 import org.apache.cassandra.db.WriteType;
@@ -42,20 +40,13 @@ import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.schema.Tables;
-import org.apache.cassandra.schema.TriggerMetadata;
-import org.apache.cassandra.schema.Triggers;
+import org.apache.cassandra.schema.*;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
  * Static utility methods to convert internal structure to and from thrift ones.
- *
- * 
  */
 public class ThriftConversion
 {
@@ -270,7 +261,7 @@ public class ThriftConversion
             if (cfId == null)
                 cfId = UUIDGen.getTimeUUID();
 
-            boolean isCompound = isSuper ? false : (rawComparator instanceof CompositeType);
+            boolean isCompound = !isSuper && (rawComparator instanceof CompositeType);
             boolean isCounter = defaultValidator instanceof CounterColumnType;
 
             // If it's a thrift table creation, adds the default CQL metadata for the new table
@@ -374,7 +365,8 @@ public class ThriftConversion
         }
     }
 
-    /** applies implicit defaults to cf definition. useful in updates */
+    /* applies implicit defaults to cf definition. useful in updates */
+    @SuppressWarnings("deprecation")
     private static void applyImplicitDefaults(org.apache.cassandra.thrift.CfDef cf_def)
     {
         if (!cf_def.isSetComment())
@@ -410,32 +402,6 @@ public class ThriftConversion
         }
     }
 
-    /**
-     * Create CFMetaData from thrift {@link CqlRow} that contains columns from schema_columnfamilies.
-     *
-     * @param columnsRes CqlRow containing columns from schema_columnfamilies.
-     * @return CFMetaData derived from CqlRow
-     */
-    public static CFMetaData fromThriftCqlRow(CqlRow cf, CqlResult columnsRes)
-    {
-        UntypedResultSet.Row cfRow = new UntypedResultSet.Row(convertThriftCqlRow(cf));
-
-        List<Map<String, ByteBuffer>> cols = new ArrayList<>(columnsRes.rows.size());
-        for (CqlRow row : columnsRes.rows)
-            cols.add(convertThriftCqlRow(row));
-        UntypedResultSet colsRows = UntypedResultSet.create(cols);
-
-        return SchemaKeyspace.createTableFromTableRowAndColumnRows(cfRow, colsRows);
-    }
-
-    private static Map<String, ByteBuffer> convertThriftCqlRow(CqlRow row)
-    {
-        Map<String, ByteBuffer> m = new HashMap<>();
-        for (org.apache.cassandra.thrift.Column column : row.getColumns())
-            m.put(UTF8Type.instance.getString(column.bufferForName()), column.value);
-        return m;
-    }
-
     public static CfDef toThrift(CFMetaData cfm)
     {
         CfDef def = new CfDef(cfm.ksName, cfm.cfName);
@@ -574,6 +540,7 @@ public class ThriftConversion
         return thriftDefs;
     }
 
+    @SuppressWarnings("deprecation")
     public static Map<String, String> compressionParametersToThrift(CompressionParameters parameters)
     {
         if (!parameters.isEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index fa415ae..d9c268b 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -25,7 +25,6 @@ import com.datastax.driver.core.*;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Token;
@@ -95,43 +94,38 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
         tables.put(cfm.cfName, cfm);
     }
 
+    /*
+     * The following is a slightly simplified but otherwise duplicated version of
+     * SchemaKeyspace.createTableFromTableRowAndColumnRows().
+     * It might be safer to have a simple wrapper of the driver ResultSet/Row implementing
+     * UntypedResultSet/UntypedResultSet.Row and reuse the original method.
+     */
     private static Map<String, CFMetaData> fetchTablesMetadata(String keyspace, Session session)
     {
         Map<String, CFMetaData> tables = new HashMap<>();
+        String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaKeyspace.NAME, SchemaKeyspace.TABLES);
 
-        String query = String.format("SELECT table_name, cf_id, type, comparator, subcomparator, is_dense, default_validator FROM %s.%s WHERE keyspace_name = '%s'",
-                                     SchemaKeyspace.NAME,
-                                     SchemaKeyspace.TABLES,
-                                     keyspace);
-
-
-        // The following is a slightly simplified but otherwise duplicated version of LegacySchemaTables.createTableFromTableRowAndColumnRows. It might
-        // be safer to have a simple wrapper of the driver ResultSet/Row implementing UntypedResultSet/UntypedResultSet.Row and reuse the original method.
-        for (Row row : session.execute(query))
+        for (Row row : session.execute(query, keyspace))
         {
             String name = row.getString("table_name");
-            UUID id = row.getUUID("cf_id");
-            boolean isSuper = row.getString("type").toLowerCase().equals("super");
-            AbstractType rawComparator = TypeParser.parse(row.getString("comparator"));
-            AbstractType subComparator = row.isNull("subcomparator")
-                                       ? null
-                                       : TypeParser.parse(row.getString("subcomparator"));
-            boolean isDense = row.getBool("is_dense");
-            boolean isCompound = rawComparator instanceof CompositeType;
-
-            AbstractType<?> defaultValidator = TypeParser.parse(row.getString("default_validator"));
-            boolean isCounter =  defaultValidator instanceof CounterColumnType;
-            boolean isCQLTable = !isSuper && !isDense && isCompound;
-
-            String columnsQuery = String.format("SELECT column_name, component_index, type, validator FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s'",
+            UUID id = row.getUUID("id");
+
+            Set<CFMetaData.Flag> flags = row.isNull("flags")
+                                       ? Collections.emptySet()
+                                       : SchemaKeyspace.flagsFromStrings(row.getSet("flags", String.class));
+
+            boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
+            boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
+            boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
+            boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+
+            String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
                                                 SchemaKeyspace.NAME,
-                                                SchemaKeyspace.COLUMNS,
-                                                keyspace,
-                                                name);
+                                                SchemaKeyspace.COLUMNS);
 
             List<ColumnDefinition> defs = new ArrayList<>();
-            for (Row colRow : session.execute(columnsQuery))
-                defs.add(createDefinitionFromRow(colRow, keyspace, name, rawComparator, subComparator, isSuper, isCQLTable));
+            for (Row colRow : session.execute(columnsQuery, keyspace, name))
+                defs.add(createDefinitionFromRow(colRow, keyspace, name));
 
             tables.put(name, CFMetaData.create(keyspace, name, id, isDense, isCompound, isSuper, isCounter, defs));
         }
@@ -139,28 +133,16 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
         return tables;
     }
 
-    // A slightly simplified version of LegacySchemaTables.
-    private static ColumnDefinition createDefinitionFromRow(Row row,
-                                                            String keyspace,
-                                                            String table,
-                                                            AbstractType<?> rawComparator,
-                                                            AbstractType<?> rawSubComparator,
-                                                            boolean isSuper,
-                                                            boolean isCQLTable)
+    private static ColumnDefinition createDefinitionFromRow(Row row, String keyspace, String table)
     {
-        ColumnDefinition.Kind kind = SchemaKeyspace.deserializeKind(row.getString("type"));
+        ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));
+
+        ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("type").toUpperCase());
 
         Integer componentIndex = null;
         if (!row.isNull("component_index"))
             componentIndex = row.getInt("component_index");
 
-        // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
-        // we need to use the comparator fromString method
-        AbstractType<?> comparator = isCQLTable
-                                   ? UTF8Type.instance
-                                   : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator);
-        ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
-
         AbstractType<?> validator = TypeParser.parse(row.getString("validator"));
 
         return new ColumnDefinition(keyspace, table, name, validator, null, null, null, componentIndex, kind);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/test/unit/org/apache/cassandra/UpdateBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/UpdateBuilder.java b/test/unit/org/apache/cassandra/UpdateBuilder.java
index dc6b859..b2d1d7f 100644
--- a/test/unit/org/apache/cassandra/UpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/UpdateBuilder.java
@@ -79,7 +79,7 @@ public class UpdateBuilder
     public IMutation makeMutation()
     {
         Mutation m = new Mutation(build());
-        return update.metadata().isCounter
+        return update.metadata().isCounter()
              ? new CounterMutation(m, ConsistencyLevel.ONE)
              : m;
     }
@@ -87,7 +87,7 @@ public class UpdateBuilder
     public void apply()
     {
         Mutation m = new Mutation(build());
-        if (update.metadata().isCounter)
+        if (update.metadata().isCounter())
             new CounterMutation(m, ConsistencyLevel.ONE).apply();
         else
             m.apply();
@@ -95,7 +95,7 @@ public class UpdateBuilder
 
     public void applyUnsafe()
     {
-        assert !update.metadata().isCounter : "Counters have currently no applyUnsafe() option";
+        assert !update.metadata().isCounter() : "Counters have currently no applyUnsafe() option";
         new Mutation(build()).applyUnsafe();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index fdbf132..cf05fe8 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.config;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.HashMap;
 import java.util.HashSet;
 
 import org.apache.cassandra.SchemaLoader;
@@ -146,6 +145,7 @@ public class CFMetaDataTest
         Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros());
         PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.TABLES));
         PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS));
+
         CFMetaData newCfm = SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition(
                 UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds()),
                 UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index e12794c..93f1973 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.cql3.validation.operations;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -144,22 +142,18 @@ public class AlterTest extends CQLTester
         execute("ALTER KEYSPACE ks2 WITH durable_writes=true");
 
         assertRows(execute("SELECT keyspace_name, durable_writes, replication FROM system_schema.keyspaces"),
-                   row("ks1", false, ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy",
-                                                     "dc1", "1")),
-                   row(KEYSPACE, true, ImmutableMap.of("class", "org.apache.cassandra.locator.SimpleStrategy",
-                                                       "replication_factor", "1")),
-                   row(KEYSPACE_PER_TEST, true, ImmutableMap.of("class", "org.apache.cassandra.locator.SimpleStrategy",
-                                                                "replication_factor", "1")),
-                   row("ks2", true, ImmutableMap.of("class", "org.apache.cassandra.locator.SimpleStrategy",
-                                                    "replication_factor", "1")));
+                   row("ks1", false, map("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "dc1", "1")),
+                   row(KEYSPACE, true, map("class", "org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")),
+                   row(KEYSPACE_PER_TEST, true, map("class", "org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")),
+                   row("ks2", true, map("class", "org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")));
 
         execute("USE ks1");
 
         assertInvalidThrow(ConfigurationException.class, "CREATE TABLE cf1 (a int PRIMARY KEY, b int) WITH compaction = { 'min_threshold' : 4 }");
 
         execute("CREATE TABLE cf1 (a int PRIMARY KEY, b int) WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 7 }");
-        assertRows(execute("SELECT table_name, min_compaction_threshold FROM system_schema.tables WHERE keyspace_name='ks1'"),
-                   row("cf1", 7));
+        assertRows(execute("SELECT table_name, compaction FROM system_schema.tables WHERE keyspace_name='ks1'"),
+                   row("cf1", map("class", "org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy", "min_threshold", "7")));
 
         // clean-up
         execute("DROP KEYSPACE ks1");
@@ -217,49 +211,49 @@ public class AlterTest extends CQLTester
     {
         createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"chunk_length_in_kb\":\"64\",\"class\":\"org.apache.cassandra.io.compress.LZ4Compressor\"}"));
+                   row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
 
         execute("ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32 };");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"chunk_length_in_kb\":\"32\",\"class\":\"org.apache.cassandra.io.compress.SnappyCompressor\"}"));
+                   row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
 
         execute("ALTER TABLE %s WITH compression = { 'sstable_compression' : 'LZ4Compressor', 'chunk_length_kb' : 64 };");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"chunk_length_in_kb\":\"64\",\"class\":\"org.apache.cassandra.io.compress.LZ4Compressor\"}"));
+                   row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
 
         execute("ALTER TABLE %s WITH compression = { 'sstable_compression' : '', 'chunk_length_kb' : 32 };");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"enabled\":\"false\"}"));
+                   row(map("enabled", "false")));
 
         execute("ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32 };");
         execute("ALTER TABLE %s WITH compression = { 'enabled' : 'false'};");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"enabled\":\"false\"}"));
+                   row(map("enabled", "false")));
 
         assertThrowsConfigurationException("Missing sub-option 'class' for the 'compression' option.",
                                            "ALTER TABLE %s WITH  compression = {'chunk_length_in_kb' : 32};");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index 5143480..33a41d8 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -510,62 +510,62 @@ public class CreateTest extends CQLTester
     {
         createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"chunk_length_in_kb\":\"64\",\"class\":\"org.apache.cassandra.io.compress.LZ4Compressor\"}"));
+                   row(map("chunk_length_in_kb", "64", "class", "org.apache.cassandra.io.compress.LZ4Compressor")));
 
         createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
                 + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32 };");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"chunk_length_in_kb\":\"32\",\"class\":\"org.apache.cassandra.io.compress.SnappyCompressor\"}"));
+                   row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
 
         createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
                 + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32, 'enabled' : true };");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"chunk_length_in_kb\":\"32\",\"class\":\"org.apache.cassandra.io.compress.SnappyCompressor\"}"));
+                   row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
 
         createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
                 + " WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'chunk_length_kb' : 32 };");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"chunk_length_in_kb\":\"32\",\"class\":\"org.apache.cassandra.io.compress.SnappyCompressor\"}"));
+                   row(map("chunk_length_in_kb", "32", "class", "org.apache.cassandra.io.compress.SnappyCompressor")));
 
         createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
                 + " WITH compression = { 'sstable_compression' : '', 'chunk_length_kb' : 32 };");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"enabled\":\"false\"}"));
+                   row(map("enabled", "false")));
 
         createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
                 + " WITH compression = { 'enabled' : 'false'};");
 
-        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+        assertRows(execute(format("SELECT compression FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
                                   SchemaKeyspace.NAME,
                                   SchemaKeyspace.TABLES),
                            KEYSPACE,
                            currentTable()),
-                   row("{\"enabled\":\"false\"}"));
+                   row(map("enabled", "false")));
 
         assertThrowsConfigurationException("Missing sub-option 'class' for the 'compression' option.",
                                            "CREATE TABLE %s (a text, b int, c int, primary key (a, b))"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc852381/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index df76eb5..659b6c6 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.schema;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -27,6 +28,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.*;
@@ -56,8 +58,10 @@ public class LegacySchemaMigratorTest
      * 5. Validate that the legacy schema tables are now empty
      */
     @Test
-    public void testMigrate()
+    public void testMigrate() throws IOException
     {
+        CQLTester.cleanupAndLeaveDirs();
+
         List<KeyspaceMetadata> expected = keyspaceToMigrate();
         expected.sort((k1, k2) -> k1.name.compareTo(k2.name));
 
@@ -71,9 +75,6 @@ public class LegacySchemaMigratorTest
         List<KeyspaceMetadata> actual = SchemaKeyspace.readSchemaFromSystemTables();
         actual.sort((k1, k2) -> k1.name.compareTo(k2.name));
 
-        // make sure that we've read *exactly* the same set of keyspaces/tables/types/functions
-        assertEquals(expected, actual);
-
         // need to load back CFMetaData of those tables (CFS instances will still be loaded)
         loadLegacySchemaTables();
 
@@ -84,6 +85,9 @@ public class LegacySchemaMigratorTest
             //noinspection ConstantConditions
             assertTrue(executeOnceInternal(query).isEmpty());
         }
+
+        // make sure that we've read *exactly* the same set of keyspaces/tables/types/functions
+        assertEquals(expected, actual);
     }
 
     private static void loadLegacySchemaTables()