You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/03/13 14:16:57 UTC

[3/17] git commit: Add type information to new schema_ columnfamilies and remove thrift validation

Add type information to new schema_ columnfamilies and remove thrift validation

patch by jbellis and slebresne; reviewed by jbellis and slebresne for CASSANDRA-3792


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccb00289
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccb00289
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccb00289

Branch: refs/heads/trunk
Commit: ccb0028931c9de665d521b4798a9f63f7dee7497
Parents: ac8d60b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Feb 29 11:50:27 2012 -0600
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Mar 13 14:04:25 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 NEWS.txt                                           |    2 +
 src/java/org/apache/cassandra/config/Avro.java     |  193 ++++
 .../org/apache/cassandra/config/CFMetaData.java    |  727 +++++++--------
 .../apache/cassandra/config/ColumnDefinition.java  |  220 ++---
 .../org/apache/cassandra/config/KSMetaData.java    |  242 ++----
 .../cassandra/cql/CreateColumnFamilyStatement.java |    2 +-
 .../apache/cassandra/cql/DropIndexStatement.java   |    2 +-
 .../org/apache/cassandra/cql/QueryProcessor.java   |    4 +-
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   29 +
 .../apache/cassandra/cql3/UntypedResultSet.java    |  110 +++
 .../cql3/statements/AlterTableStatement.java       |    2 +-
 .../statements/CreateColumnFamilyStatement.java    |    2 +-
 .../cql3/statements/CreateIndexStatement.java      |    2 +-
 .../cql3/statements/DropIndexStatement.java        |    2 +-
 .../cql3/statements/ModificationStatement.java     |    1 -
 .../cassandra/cql3/statements/SelectStatement.java |   19 +-
 .../cassandra/cql3/statements/UpdateStatement.java |   29 +-
 src/java/org/apache/cassandra/db/Column.java       |   48 +-
 src/java/org/apache/cassandra/db/DefsTable.java    |   71 +-
 .../org/apache/cassandra/db/DeletedColumn.java     |    5 +
 src/java/org/apache/cassandra/db/RowMutation.java  |   19 +
 .../apache/cassandra/db/marshal/AbstractType.java  |    8 +-
 .../cassandra/db/marshal/DynamicCompositeType.java |    5 +
 .../cassandra/db/migration/AddColumnFamily.java    |    4 +-
 .../apache/cassandra/db/migration/AddKeyspace.java |    4 +-
 .../cassandra/db/migration/DropColumnFamily.java   |    4 +-
 .../cassandra/db/migration/DropKeyspace.java       |    4 +-
 .../apache/cassandra/db/migration/Migration.java   |   12 +-
 .../cassandra/db/migration/MigrationHelper.java    |  258 +-----
 .../cassandra/db/migration/UpdateColumnFamily.java |   14 +-
 .../cassandra/db/migration/UpdateKeyspace.java     |   13 +-
 .../io/compress/CompressionParameters.java         |   15 -
 .../apache/cassandra/service/MigrationManager.java |    2 +-
 .../apache/cassandra/thrift/CassandraServer.java   |    4 +-
 .../org/apache/cassandra/utils/ByteBufferUtil.java |    5 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |   41 +
 .../apache/cassandra/config/CFMetaDataTest.java    |   15 +-
 .../unit/org/apache/cassandra/config/DefsTest.java |  590 ++++++++++++
 test/unit/org/apache/cassandra/db/DefsTest.java    |  622 ------------
 40 files changed, 1678 insertions(+), 1675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb00289/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 77111c4..65008d1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,8 @@
  * support trickling fsync() on writes (CASSANDRA-3950)
  * expose counters for unavailable/timeout exceptions given to thrift clients (CASSANDRA-3671)
  * avoid quadratic startup time in LeveledManifest (CASSANDRA-3952)
+ * Add type information to new schema_ columnfamilies and remove thrift
+   serialization for schema (CASSANDRA-3792)
 Merged from 1.0:
  * remove the wait on hint future during write (CASSANDRA-3870)
  * (cqlsh) ignore missing CfDef opts (CASSANDRA-3933)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb00289/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 03b34be..592458d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -14,6 +14,8 @@ by version X, but the inverse is not necessarily the case.)
 
 Upgrading
 ---------
+    - Non-string column aliases are no longer supported; that is, the
+      column names for defined metadata must be strings.
     - The KsDef.replication_factor field (deprecated since 0.8) has
       been removed.  Older clients will need to be updated to be able
       to continue to created and update keyspaces.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb00289/src/java/org/apache/cassandra/config/Avro.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Avro.java b/src/java/org/apache/cassandra/config/Avro.java
new file mode 100644
index 0000000..b34d37a
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/Avro.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.config;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.migration.avro.CfDef;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.thrift.IndexType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * methods to load schema definitions from old-style Avro serialization
+ */
+public class Avro
+{
+    @Deprecated
+    public static KSMetaData ksFromAvro(org.apache.cassandra.db.migration.avro.KsDef ks)
+    {
+        Class<? extends AbstractReplicationStrategy> repStratClass;
+        try
+        {
+            String strategyClassName = KSMetaData.convertOldStrategyName(ks.strategy_class.toString());
+            repStratClass = (Class<AbstractReplicationStrategy>)Class.forName(strategyClassName);
+        }
+        catch (Exception ex)
+        {
+            throw new RuntimeException("Could not create ReplicationStrategy of type " + ks.strategy_class, ex);
+        }
+
+        Map<String, String> strategyOptions = new HashMap<String, String>();
+        if (ks.strategy_options != null)
+        {
+            for (Map.Entry<CharSequence, CharSequence> e : ks.strategy_options.entrySet())
+            {
+                String name = e.getKey().toString();
+                // Silently discard a replication_factor option to NTS.
+                // The history is, people were creating CFs with the default settings (which in the CLI is NTS) and then
+                // giving it a replication_factor option, which is nonsensical.  Initially our strategy was to silently
+                // ignore this option, but that turned out to confuse people more.  So in 0.8.2 we switched to throwing
+                // an exception in the NTS constructor, which would be turned into an InvalidRequestException for the
+                // client.  But, it also prevented startup for anyone upgrading without first cleaning that option out.
+                if (repStratClass == NetworkTopologyStrategy.class && name.trim().toLowerCase().equals("replication_factor"))
+                    continue;
+                strategyOptions.put(name, e.getValue().toString());
+            }
+        }
+
+        int cfsz = ks.cf_defs.size();
+        List<CFMetaData> cfMetaData = new ArrayList<CFMetaData>(cfsz);
+        Iterator<CfDef> cfiter = ks.cf_defs.iterator();
+        for (CfDef cf_def : ks.cf_defs)
+            cfMetaData.add(cfFromAvro(cfiter.next()));
+
+        return new KSMetaData(ks.name.toString(), repStratClass, strategyOptions, ks.durable_writes, cfMetaData);
+    }
+
+    @Deprecated
+    public static CFMetaData cfFromAvro(CfDef cf)
+    {
+        AbstractType<?> comparator;
+        AbstractType<?> subcolumnComparator = null;
+        AbstractType<?> validator;
+        AbstractType<?> keyValidator;
+
+        try
+        {
+            comparator = TypeParser.parse(cf.comparator_type.toString());
+            if (cf.subcomparator_type != null)
+                subcolumnComparator = TypeParser.parse(cf.subcomparator_type);
+            validator = TypeParser.parse(cf.default_validation_class);
+            keyValidator = TypeParser.parse(cf.key_validation_class);
+        }
+        catch (Exception ex)
+        {
+            throw new RuntimeException("Could not inflate CFMetaData for " + cf, ex);
+        }
+        Map<ByteBuffer, ColumnDefinition> column_metadata = new TreeMap<ByteBuffer, ColumnDefinition>(BytesType.instance);
+        for (org.apache.cassandra.db.migration.avro.ColumnDef aColumn_metadata : cf.column_metadata)
+        {
+            ColumnDefinition cd = columnFromAvro(aColumn_metadata);
+            if (cd.getIndexType() != null && cd.getIndexName() == null)
+                cd.setIndexName(CFMetaData.getDefaultIndexName(cf.name.toString(), comparator, cd.name));
+            column_metadata.put(cd.name, cd);
+        }
+
+        CFMetaData newCFMD = new CFMetaData(cf.keyspace.toString(),
+                                            cf.name.toString(),
+                                            ColumnFamilyType.create(cf.column_type.toString()),
+                                            comparator,
+                                            subcolumnComparator,
+                                            cf.id);
+
+        // When we pull up an old avro CfDef which doesn't have these arguments,
+        //  it doesn't default them correctly. Without explicit defaulting,
+        //  grandfathered metadata becomes wrong or causes crashes.
+        //  Isn't AVRO supposed to handle stuff like this?
+        if (cf.min_compaction_threshold != null) { newCFMD.minCompactionThreshold(cf.min_compaction_threshold); }
+        if (cf.max_compaction_threshold != null) { newCFMD.maxCompactionThreshold(cf.max_compaction_threshold); }
+        if (cf.key_alias != null) { newCFMD.keyAlias(cf.key_alias); }
+        if (cf.column_aliases != null)
+            newCFMD.columnAliases(new ArrayList<ByteBuffer>(cf.column_aliases)); // fix avro stupidity
+        if (cf.value_alias != null) { newCFMD.valueAlias(cf.value_alias); }
+        if (cf.compaction_strategy != null)
+        {
+            try
+            {
+                newCFMD.compactionStrategyClass = CFMetaData.createCompactionStrategy(cf.compaction_strategy.toString());
+            }
+            catch (ConfigurationException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        if (cf.compaction_strategy_options != null)
+        {
+            for (Map.Entry<CharSequence, CharSequence> e : cf.compaction_strategy_options.entrySet())
+                newCFMD.compactionStrategyOptions.put(e.getKey().toString(), e.getValue().toString());
+        }
+
+        CompressionParameters cp;
+        try
+        {
+            cp = CompressionParameters.create(cf.compression_options);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        CFMetaData.Caching caching;
+
+        try
+        {
+            caching = cf.caching == null ? CFMetaData.Caching.KEYS_ONLY : CFMetaData.Caching.fromString(cf.caching.toString());
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        return newCFMD.comment(cf.comment.toString())
+                      .readRepairChance(cf.read_repair_chance)
+                      .dcLocalReadRepairChance(cf.dclocal_read_repair_chance)
+                      .replicateOnWrite(cf.replicate_on_write)
+                      .gcGraceSeconds(cf.gc_grace_seconds)
+                      .defaultValidator(validator)
+                      .keyValidator(keyValidator)
+                      .columnMetadata(column_metadata)
+                      .compressionParameters(cp)
+                      .bloomFilterFpChance(cf.bloom_filter_fp_chance)
+                      .caching(caching);
+    }
+
+    @Deprecated
+    public static ColumnDefinition columnFromAvro(org.apache.cassandra.db.migration.avro.ColumnDef cd)
+    {
+        IndexType index_type = cd.index_type == null ? null : Enum.valueOf(IndexType.class, cd.index_type.name());
+        String index_name = cd.index_name == null ? null : cd.index_name.toString();
+        try
+        {
+            AbstractType<?> validatorType = TypeParser.parse(cd.validation_class);
+            return new ColumnDefinition(ByteBufferUtil.clone(cd.name), validatorType, index_type, ColumnDefinition.getStringMap(cd.index_options), index_name);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb00289/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index fb77366..8198f76 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -24,17 +24,19 @@ import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.base.Objects;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.migration.Migration;
@@ -42,15 +44,11 @@ import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.thrift.CfDef;
-import org.apache.cassandra.thrift.ColumnDef;
-import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.cassandra.db.migration.MigrationHelper.*;
+import static org.apache.cassandra.utils.FBUtilities.*;
 
 public final class CFMetaData
 {
@@ -89,35 +87,69 @@ public final class CFMetaData
                                                                           null,
                                                                           null,
                                                                           null)));
-    public static final CFMetaData SchemaKeyspacesCf = schemaCFDefinition(SystemTable.SCHEMA_KEYSPACES_CF, 8, "keyspace attributes of the schema", AsciiType.instance, 1);
-    public static final CFMetaData SchemaColumnFamiliesCf = schemaCFDefinition(SystemTable.SCHEMA_COLUMNFAMILIES_CF, 9, "ColumnFamily attributes of the schema", AsciiType.instance, 2);
-    public static final CFMetaData SchemaColumnsCf = schemaCFDefinition(SystemTable.SCHEMA_COLUMNS_CF, 10, "ColumnFamily column attributes of the schema", AsciiType.instance, 3);
 
-    private static CFMetaData schemaCFDefinition(String name, int index, String comment, AbstractType<?> comp, int nestingLevel)
+    // new-style schema
+    public static final CFMetaData SchemaKeyspacesCf;
+    public static final CFMetaData SchemaColumnFamiliesCf;
+    public static final CFMetaData SchemaColumnsCf;
+    static
     {
-        AbstractType<?> comparator;
-
-        if (nestingLevel == 1)
-        {
-            comparator = comp;
-        }
-        else
-        {
-            List<AbstractType<?>> composite = new ArrayList<AbstractType<?>>(nestingLevel);
-
-            for (int i = 0; i < nestingLevel; i++)
-                composite.add(comp);
-
-            comparator = CompositeType.getInstance(composite);
-        }
-
-        return newSystemMetadata(name,
-                                 index,
-                                 comment,
-                                 comparator,
-                                 null)
+        SchemaKeyspacesCf = newSchemaMetadata(SystemTable.SCHEMA_KEYSPACES_CF,
+                                              8,
+                                              "Keyspace definitions",
+                                              AsciiType.instance,
+                                              null)
+                            .keyValidator(AsciiType.instance)
+                            .keyAlias("keyspace");
+        SchemaKeyspacesCf.columnMetadata(ColumnDefinition.utf8("name"),
+                                         ColumnDefinition.bool("durable_writes"),
+                                         ColumnDefinition.ascii("strategy_class"),
+                                         ColumnDefinition.ascii("strategy_options"));
+
+        SchemaColumnFamiliesCf = newSchemaMetadata(SystemTable.SCHEMA_COLUMNFAMILIES_CF,
+                                                   9,
+                                                   "ColumnFamily definitions",
+                                                   CompositeType.getInstance(Arrays.<AbstractType<?>>asList(AsciiType.instance, AsciiType.instance)),
+                                                   null)
                                  .keyValidator(AsciiType.instance)
-                                 .defaultValidator(UTF8Type.instance);
+                                 .keyAlias("keyspace")
+                                 .columnAliases(Arrays.asList(ByteBufferUtil.bytes("columnfamily")))
+                                 .columnMetadata(ColumnDefinition.int32("id"),
+                                                 ColumnDefinition.ascii("type"),
+                                                 ColumnDefinition.ascii("comparator"),
+                                                 ColumnDefinition.ascii("subcomparator"),
+                                                 ColumnDefinition.utf8("comment"),
+                                                 ColumnDefinition.double_("read_repair_chance"),
+                                                 ColumnDefinition.double_("local_read_repair_chance"),
+                                                 ColumnDefinition.bool("replicate_on_write"),
+                                                 ColumnDefinition.int32("gc_grace_seconds"),
+                                                 ColumnDefinition.ascii("default_validator"),
+                                                 ColumnDefinition.ascii("key_validator"),
+                                                 ColumnDefinition.int32("min_compaction_threshold"),
+                                                 ColumnDefinition.int32("max_compaction_threshold"),
+                                                 ColumnDefinition.ascii("key_alias"),
+                                                 ColumnDefinition.double_("bloom_filter_fp_chance"),
+                                                 ColumnDefinition.ascii("caching"),
+                                                 ColumnDefinition.ascii("compaction_strategy_class"),
+                                                 ColumnDefinition.ascii("compression_parameters"),
+                                                 ColumnDefinition.utf8("value_alias"),
+                                                 ColumnDefinition.utf8("column_aliases"),
+                                                 ColumnDefinition.ascii("compaction_strategy_options"));
+
+        SchemaColumnsCf = newSchemaMetadata(SystemTable.SCHEMA_COLUMNS_CF,
+                                            10,
+                                            "ColumnFamily column attributes",
+                                            CompositeType.getInstance(Arrays.<AbstractType<?>>asList(AsciiType.instance,
+                                                                                                     AsciiType.instance,
+                                                                                                     UTF8Type.instance)),
+                                            null)
+                          .keyValidator(AsciiType.instance)
+                          .keyAlias("keyspace")
+                          .columnAliases(Arrays.asList(ByteBufferUtil.bytes("columnfamily"), ByteBufferUtil.bytes("column")))
+                          .columnMetadata(ColumnDefinition.ascii("validator"),
+                                          ColumnDefinition.ascii("index_type"),
+                                          ColumnDefinition.ascii("index_options"),
+                                          ColumnDefinition.ascii("index_name"));
     }
 
     public enum Caching
@@ -161,7 +193,7 @@ public final class CFMetaData
     private Double bloomFilterFpChance;               // default NULL
     private Caching caching;                          // default KEYS_ONLY (possible: all, key_only, row_only, none)
 
-    private Map<ByteBuffer, ColumnDefinition> column_metadata;
+    Map<ByteBuffer, ColumnDefinition> column_metadata;
     public Class<? extends AbstractCompactionStrategy> compactionStrategyClass;
     public Map<String, String> compactionStrategyOptions;
 
@@ -174,7 +206,7 @@ public final class CFMetaData
 
     public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;}
     public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
-    public CFMetaData dclocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
+    public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
     public CFMetaData replicateOnWrite(boolean prop) {replicateOnWrite = prop; return this;}
     public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
     public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; updateCfDef(); return this;}
@@ -182,9 +214,18 @@ public final class CFMetaData
     public CFMetaData minCompactionThreshold(int prop) {minCompactionThreshold = prop; return this;}
     public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
     public CFMetaData keyAlias(ByteBuffer prop) {keyAlias = prop; updateCfDef(); return this;}
+    public CFMetaData keyAlias(String alias) { return keyAlias(ByteBufferUtil.bytes(alias)); }
     public CFMetaData columnAliases(List<ByteBuffer> prop) {columnAliases = prop; updateCfDef(); return this;}
     public CFMetaData valueAlias(ByteBuffer prop) {valueAlias = prop; updateCfDef(); return this;}
     public CFMetaData columnMetadata(Map<ByteBuffer,ColumnDefinition> prop) {column_metadata = prop; updateCfDef(); return this;}
+    private CFMetaData columnMetadata(ColumnDefinition... cds)
+    {
+        Map<ByteBuffer, ColumnDefinition> map = new HashMap<ByteBuffer, ColumnDefinition>();
+        for (ColumnDefinition cd : cds)
+            map.put(cd.name, cd);
+
+        return columnMetadata(map);
+    }
     public CFMetaData compactionStrategyClass(Class<? extends AbstractCompactionStrategy> prop) {compactionStrategyClass = prop; return this;}
     public CFMetaData compactionStrategyOptions(Map<String, String> prop) {compactionStrategyOptions = prop; return this;}
     public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
@@ -196,7 +237,7 @@ public final class CFMetaData
         this(keyspace, name, type, comp, subcc, Schema.instance.nextCFId());
     }
 
-    private CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc, int id)
+    CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc, int id)
     {
         // Final fields must be set in constructor
         ksName = keyspace;
@@ -262,16 +303,27 @@ public final class CFMetaData
 
         return newCFMD.comment(comment)
                       .readRepairChance(0)
-                      .dclocalReadRepairChance(0)
+                      .dcLocalReadRepairChance(0)
                       .gcGraceSeconds(0);
     }
 
+    private static CFMetaData newSchemaMetadata(String cfName, int cfId, String comment, AbstractType<?> comparator, AbstractType<?> subcc)
+    {
+        /*
+         * Schema column families needs a gc_grace (since they are replicated
+         * on every node). That gc_grace should be high enough that no node
+         * could be dead for that long a time.
+         */
+        int gcGrace = 120 * 24 * 3600; // 3 months
+        return newSystemMetadata(cfName, cfId, comment, comparator, subcc).gcGraceSeconds(gcGrace);
+    }
+
     public static CFMetaData newIndexMetadata(CFMetaData parent, ColumnDefinition info, AbstractType<?> columnComparator)
     {
         return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, columnComparator, null)
                              .keyValidator(info.getValidator())
                              .readRepairChance(0.0)
-                             .dclocalReadRepairChance(0.0)
+                             .dcLocalReadRepairChance(0.0)
                              .reloadSecondaryIndexMetadata(parent);
     }
 
@@ -282,27 +334,26 @@ public final class CFMetaData
         maxCompactionThreshold(parent.maxCompactionThreshold);
         compactionStrategyClass(parent.compactionStrategyClass);
         compactionStrategyOptions(parent.compactionStrategyOptions);
-        compressionParameters(parent.compressionParameters);;
+        compressionParameters(parent.compressionParameters);
         return this;
     }
 
-    // Create a new CFMD by changing just the cfName
-    public static CFMetaData rename(CFMetaData cfm, String newName)
+    public CFMetaData clone()
     {
-        return copyOpts(new CFMetaData(cfm.ksName, newName, cfm.cfType, cfm.comparator, cfm.subcolumnComparator, cfm.cfId), cfm);
+        return copyOpts(new CFMetaData(ksName, cfName, cfType, comparator, subcolumnComparator, cfId), this);
     }
 
-    // Create a new CFMD by changing just the ksName
-    public static CFMetaData renameTable(CFMetaData cfm, String ksName)
+    // Create a new CFMD by changing just the cfName
+    public static CFMetaData rename(CFMetaData cfm, String newName)
     {
-        return copyOpts(new CFMetaData(ksName, cfm.cfName, cfm.cfType, cfm.comparator, cfm.subcolumnComparator, cfm.cfId), cfm);
+        return copyOpts(new CFMetaData(cfm.ksName, newName, cfm.cfType, cfm.comparator, cfm.subcolumnComparator, cfm.cfId), cfm);
     }
 
-    private static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
+    static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
     {
         return newCFMD.comment(oldCFMD.comment)
                       .readRepairChance(oldCFMD.readRepairChance)
-                      .dclocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
+                      .dcLocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
                       .replicateOnWrite(oldCFMD.replicateOnWrite)
                       .gcGraceSeconds(oldCFMD.gcGraceSeconds)
                       .defaultValidator(oldCFMD.defaultValidator)
@@ -330,114 +381,6 @@ public final class CFMetaData
         return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name) : info.getIndexName());
     }
 
-    @Deprecated
-    public static CFMetaData fromAvro(org.apache.cassandra.db.migration.avro.CfDef cf)
-    {
-        AbstractType<?> comparator;
-        AbstractType<?> subcolumnComparator = null;
-        AbstractType<?> validator;
-        AbstractType<?> keyValidator;
-
-        try
-        {
-            comparator = TypeParser.parse(cf.comparator_type.toString());
-            if (cf.subcomparator_type != null)
-                subcolumnComparator = TypeParser.parse(cf.subcomparator_type);
-            validator = TypeParser.parse(cf.default_validation_class);
-            keyValidator = TypeParser.parse(cf.key_validation_class);
-        }
-        catch (Exception ex)
-        {
-            throw new RuntimeException("Could not inflate CFMetaData for " + cf, ex);
-        }
-        Map<ByteBuffer, ColumnDefinition> column_metadata = new TreeMap<ByteBuffer, ColumnDefinition>(BytesType.instance);
-        for (org.apache.cassandra.db.migration.avro.ColumnDef aColumn_metadata : cf.column_metadata)
-        {
-            ColumnDefinition cd = ColumnDefinition.fromAvro(aColumn_metadata);
-            if (cd.getIndexType() != null && cd.getIndexName() == null)
-                cd.setIndexName(getDefaultIndexName(cf.name.toString(), comparator, cd.name));
-            column_metadata.put(cd.name, cd);
-        }
-
-        CFMetaData newCFMD = new CFMetaData(cf.keyspace.toString(),
-                                            cf.name.toString(),
-                                            ColumnFamilyType.create(cf.column_type.toString()),
-                                            comparator,
-                                            subcolumnComparator,
-                                            cf.id);
-
-        // When we pull up an old avro CfDef which doesn't have these arguments,
-        //  it doesn't default them correctly. Without explicit defaulting,
-        //  grandfathered metadata becomes wrong or causes crashes.
-        //  Isn't AVRO supposed to handle stuff like this?
-        if (cf.min_compaction_threshold != null) { newCFMD.minCompactionThreshold(cf.min_compaction_threshold); }
-        if (cf.max_compaction_threshold != null) { newCFMD.maxCompactionThreshold(cf.max_compaction_threshold); }
-        if (cf.key_alias != null) { newCFMD.keyAlias(cf.key_alias); }
-        if (cf.column_aliases != null) { newCFMD.columnAliases(fixAvroRetardation(cf.column_aliases)); }
-        if (cf.value_alias != null) { newCFMD.valueAlias(cf.value_alias); }
-        if (cf.compaction_strategy != null)
-        {
-            try
-            {
-                newCFMD.compactionStrategyClass = createCompactionStrategy(cf.compaction_strategy.toString());
-            }
-            catch (ConfigurationException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-        if (cf.compaction_strategy_options != null)
-        {
-            for (Map.Entry<CharSequence, CharSequence> e : cf.compaction_strategy_options.entrySet())
-                newCFMD.compactionStrategyOptions.put(e.getKey().toString(), e.getValue().toString());
-        }
-
-        CompressionParameters cp;
-        try
-        {
-            cp = CompressionParameters.create(cf.compression_options);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        Caching caching;
-
-        try
-        {
-            caching = cf.caching == null ? Caching.KEYS_ONLY : Caching.fromString(cf.caching.toString());
-        }
-        catch (ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        return newCFMD.comment(cf.comment.toString())
-                      .readRepairChance(cf.read_repair_chance)
-                      .dclocalReadRepairChance(cf.dclocal_read_repair_chance)
-                      .replicateOnWrite(cf.replicate_on_write)
-                      .gcGraceSeconds(cf.gc_grace_seconds)
-                      .defaultValidator(validator)
-                      .keyValidator(keyValidator)
-                      .columnMetadata(column_metadata)
-                      .compressionParameters(cp)
-                      .bloomFilterFpChance(cf.bloom_filter_fp_chance)
-                      .caching(caching);
-    }
-
-    /*
-     * Avro handles array with it's own class, GenericArray, that extends
-     * AbstractList but redefine equals() in a way that violate List.equals()
-     * specification (basically only a GenericArray can ever be equal to a
-     * GenericArray).
-     * (Concretely, keeping the list returned by avro breaks DefsTest.saveAndRestore())
-     */
-    private static <T> List<T> fixAvroRetardation(List<T> array)
-    {
-        return new ArrayList<T>(array);
-    }
-
     public String getComment()
     {
         return comment;
@@ -669,7 +612,7 @@ public final class CFMetaData
         if (cf_def.isSetCaching())
             newCFMD.caching(Caching.fromString(cf_def.caching));
         if (cf_def.isSetDclocal_read_repair_chance())
-            newCFMD.dclocalReadRepairChance(cf_def.dclocal_read_repair_chance);
+            newCFMD.dcLocalReadRepairChance(cf_def.dclocal_read_repair_chance);
 
         CompressionParameters cp = CompressionParameters.create(cf_def.compression_options);
 
@@ -692,7 +635,7 @@ public final class CFMetaData
 
         try
         {
-            apply(fromSchema(cfDefRow.cf));
+            apply(fromSchema(cfDefRow));
         }
         catch (ConfigurationException e)
         {
@@ -707,135 +650,86 @@ public final class CFMetaData
      *
      * @throws ConfigurationException if ks/cf names or cf ids didn't match
      */
-    public void apply(CfDef cf_def) throws ConfigurationException
+    public void apply(CFMetaData cfm) throws ConfigurationException
     {
-        logger.debug("applying {} to {}", cf_def, this);
+        logger.debug("applying {} to {}", cfm, this);
         // validate
-        if (!cf_def.keyspace.equals(ksName))
+        if (!cfm.ksName.equals(ksName))
             throw new ConfigurationException(String.format("Keyspace mismatch (found %s; expected %s)",
-                                                           cf_def.keyspace, ksName));
-        if (!cf_def.name.equals(cfName))
+                                                           cfm.ksName, ksName));
+        if (!cfm.cfName.equals(cfName))
             throw new ConfigurationException(String.format("Column family mismatch (found %s; expected %s)",
-                                                           cf_def.name, cfName));
-        if (cf_def.id != cfId)
+                                                           cfm.cfName, cfName));
+        if (!cfm.cfId.equals(cfId))
             throw new ConfigurationException(String.format("Column family ID mismatch (found %s; expected %s)",
-                                                           cf_def.id, cfId));
+                                                           cfm.cfId, cfId));
 
-        if (!cf_def.column_type.equals(cfType.name()))
+        if (!cfm.cfType.equals(cfType))
             throw new ConfigurationException("types do not match.");
 
-        AbstractType<?> newComparator = TypeParser.parse(cf_def.comparator_type);
-        AbstractType<?> newSubComparator = (cf_def.subcomparator_type == null || cf_def.subcomparator_type.equals(""))
-                                         ? null
-                                         : TypeParser.parse(cf_def.subcomparator_type);
-
-        if (!newComparator.isCompatibleWith(comparator))
+        if (!cfm.comparator.isCompatibleWith(comparator))
             throw new ConfigurationException("comparators do not match or are not compatible.");
-        if (newSubComparator == null)
+        if (cfm.subcolumnComparator == null)
         {
             if (subcolumnComparator != null)
                 throw new ConfigurationException("subcolumncomparators do not match.");
             // else, it's null and we're good.
         }
-        else if (!newSubComparator.isCompatibleWith(subcolumnComparator))
+        else if (!cfm.subcolumnComparator.isCompatibleWith(subcolumnComparator))
             throw new ConfigurationException("subcolumncomparators do not match or are note compatible.");
 
         // TODO: this method should probably return a new CFMetaData so that
         // 1) we can keep comparator and subcolumnComparator final
         // 2) updates are applied atomically
-        comparator = newComparator;
-        subcolumnComparator = newSubComparator;
-
-        validateMinMaxCompactionThresholds(cf_def);
-
-        comment = enforceCommentNotNull(cf_def.comment);
-        readRepairChance = cf_def.read_repair_chance;
-        if (cf_def.isSetDclocal_read_repair_chance())
-            dcLocalReadRepairChance = cf_def.dclocal_read_repair_chance;
-        replicateOnWrite = cf_def.replicate_on_write;
-        gcGraceSeconds = cf_def.gc_grace_seconds;
-        defaultValidator = TypeParser.parse(cf_def.default_validation_class);
-        keyValidator = TypeParser.parse(cf_def.key_validation_class);
-        minCompactionThreshold = cf_def.min_compaction_threshold;
-        maxCompactionThreshold = cf_def.max_compaction_threshold;
-        keyAlias = cf_def.key_alias;
-        columnAliases = cf_def.column_aliases;
-        valueAlias = cf_def.value_alias;
-        if (cf_def.isSetBloom_filter_fp_chance())
-            bloomFilterFpChance = cf_def.bloom_filter_fp_chance;
-        caching = Caching.fromString(cf_def.caching);
-
-        if (!cf_def.isSetColumn_metadata())
-            cf_def.setColumn_metadata(new ArrayList<ColumnDef>());
-
-        // adjust column definitions. figure out who is coming and going.
-        Set<ByteBuffer> toRemove = new HashSet<ByteBuffer>();
-        Set<ByteBuffer> newColumns = new HashSet<ByteBuffer>();
-        Set<ColumnDef> toAdd = new HashSet<ColumnDef>();
-        for (ColumnDef def : cf_def.column_metadata)
-        {
-            newColumns.add(def.name);
-            if (!column_metadata.containsKey(def.name))
-                toAdd.add(def);
-        }
-        for (ByteBuffer name : column_metadata.keySet())
-            if (!newColumns.contains(name))
-                toRemove.add(name);
-
-        // remove the ones leaving.
-        for (ByteBuffer indexName : toRemove)
-        {
-            column_metadata.remove(indexName);
-        }
-        // update the ones staying
-        for (ColumnDef def : cf_def.column_metadata)
-        {
-            ColumnDefinition oldDef = column_metadata.get(def.name);
-            if (oldDef == null)
-                continue;
-            oldDef.setValidator(TypeParser.parse(def.validation_class));
-            oldDef.setIndexType(def.index_type == null ? null : IndexType.valueOf(def.index_type.name()),
-                                def.index_options);
-            oldDef.setIndexName(def.index_name == null ? null : def.index_name);
-        }
-        // add the new ones coming in.
-        for (ColumnDef def : toAdd)
-        {
-            AbstractType<?> dValidClass = TypeParser.parse(def.validation_class);
-            ColumnDefinition cd = new ColumnDefinition(def.name,
-                                                       dValidClass,
-                                                       def.index_type == null ? null : IndexType.valueOf(def.index_type.name()),
-                                                       def.index_options,
-                                                       def.index_name == null ? null : def.index_name);
+        comparator = cfm.comparator;
+        subcolumnComparator = cfm.subcolumnComparator;
+
+        // compaction thresholds are checked by ThriftValidation. We shouldn't be doing
+        // validation on the apply path; it's too late for that.
+
+        comment = enforceCommentNotNull(cfm.comment);
+        readRepairChance = cfm.readRepairChance;
+        dcLocalReadRepairChance = cfm.dcLocalReadRepairChance;
+        replicateOnWrite = cfm.replicateOnWrite;
+        gcGraceSeconds = cfm.gcGraceSeconds;
+        defaultValidator = cfm.defaultValidator;
+        keyValidator = cfm.keyValidator;
+        minCompactionThreshold = cfm.minCompactionThreshold;
+        maxCompactionThreshold = cfm.maxCompactionThreshold;
+        keyAlias = cfm.keyAlias;
+        columnAliases = cfm.columnAliases;
+        valueAlias = cfm.valueAlias;
+        bloomFilterFpChance = cfm.bloomFilterFpChance;
+        caching = cfm.caching;
+
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, cfm.column_metadata);
+        // columns that are no longer needed
+        for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
+            column_metadata.remove(cd.name);
+        // newly added columns
+        for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
             column_metadata.put(cd.name, cd);
-        }
-
-        if (cf_def.compaction_strategy != null)
-            compactionStrategyClass = createCompactionStrategy(cf_def.compaction_strategy);
-
-        if (null != cf_def.compaction_strategy_options)
+        // old columns with updated attributes
+        for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
         {
-            compactionStrategyOptions = new HashMap<String, String>();
-            for (Map.Entry<String, String> e : cf_def.compaction_strategy_options.entrySet())
-                compactionStrategyOptions.put(e.getKey(), e.getValue());
+            ColumnDefinition oldDef = column_metadata.get(name);
+            ColumnDefinition def = cfm.column_metadata.get(name);
+            oldDef.apply(def, comparator);
         }
 
-        compressionParameters = CompressionParameters.create(cf_def.compression_options);
+        compactionStrategyClass = cfm.compactionStrategyClass;
+        compactionStrategyOptions = cfm.compactionStrategyOptions;
 
+        compressionParameters = cfm.compressionParameters();
+
+        updateCfDef();
         logger.debug("application result is {}", this);
     }
 
     public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException
     {
         className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
-        try
-        {
-            return (Class<? extends AbstractCompactionStrategy>) Class.forName(className);
-        }
-        catch (Exception e)
-        {
-            throw new ConfigurationException("Could not create Compaction Strategy of type " + className, e);
-        }
+        return FBUtilities.classForName(className, "compaction strategy");
     }
 
     public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
@@ -846,7 +740,7 @@ public final class CFMetaData
                 ColumnFamilyStore.class,
                 Map.class // options
             });
-            return (AbstractCompactionStrategy)constructor.newInstance(cfs, compactionStrategyOptions);
+            return constructor.newInstance(cfs, compactionStrategyOptions);
         }
         catch (NoSuchMethodException e)
         {
@@ -910,36 +804,6 @@ public final class CFMetaData
         return def;
     }
 
-    public static void validateMinMaxCompactionThresholds(CfDef cf_def) throws ConfigurationException
-    {
-        if (cf_def.isSetMin_compaction_threshold() && cf_def.isSetMax_compaction_threshold())
-        {
-            if ((cf_def.min_compaction_threshold > cf_def.max_compaction_threshold) &&
-                    cf_def.max_compaction_threshold != 0)
-            {
-                throw new ConfigurationException("min_compaction_threshold cannot be greater than max_compaction_threshold");
-            }
-        }
-        else if (cf_def.isSetMin_compaction_threshold())
-        {
-            if (cf_def.min_compaction_threshold > DEFAULT_MAX_COMPACTION_THRESHOLD)
-            {
-                throw new ConfigurationException("min_compaction_threshold cannot be greater than max_compaction_threshold (default " +
-                                                  DEFAULT_MAX_COMPACTION_THRESHOLD + ")");
-            }
-        }
-        else if (cf_def.isSetMax_compaction_threshold())
-        {
-            if (cf_def.max_compaction_threshold < DEFAULT_MIN_COMPACTION_THRESHOLD && cf_def.max_compaction_threshold != 0) {
-                throw new ConfigurationException("max_compaction_threshold cannot be less than min_compaction_threshold");
-            }
-        }
-        else
-        {
-            //Defaults are valid.
-        }
-    }
-
     public ColumnDefinition getColumnDefinition(ByteBuffer name)
     {
         return column_metadata.get(name);
@@ -1028,47 +892,30 @@ public final class CFMetaData
      *
      * @throws ConfigurationException if any of the attributes didn't pass validation
      */
-    public RowMutation diff(CfDef newState, long modificationTimestamp) throws ConfigurationException
+    public RowMutation diff(CFMetaData newState, long modificationTimestamp) throws ConfigurationException
     {
-        CfDef curState = toThrift();
-        RowMutation m = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
-
-        for (CfDef._Fields field : CfDef._Fields.values())
-        {
-            if (field.equals(CfDef._Fields.COLUMN_METADATA))
-                continue; // deal with columns after main attributes
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
 
-            Object curValue = curState.isSet(field) ? curState.getFieldValue(field) : null;
-            Object newValue = newState.isSet(field) ? newState.getFieldValue(field) : null;
+        newState.toSchemaNoColumns(rm, modificationTimestamp);
 
-            if (Objects.equal(curValue, newValue))
-                continue;
-
-            m.add(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(curState.name, field.getFieldName())),
-                  valueAsBytes(newValue),
-                  modificationTimestamp);
-        }
-
-        AbstractType nameComparator = cfType.equals(ColumnFamilyType.Super)
-                                        ? subcolumnComparator
-                                        : comparator;
-
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, ColumnDefinition.fromThrift(newState.column_metadata));
-        Map<ByteBuffer, ColumnDef> columnDefMap = ColumnDefinition.toMap(newState.column_metadata);
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, newState.column_metadata);
 
         // columns that are no longer needed
-        for (ByteBuffer name : columnDiff.entriesOnlyOnLeft().keySet())
-            ColumnDefinition.deleteFromSchema(m, curState.name, nameComparator, name, modificationTimestamp);
+        for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
+            cd.deleteFromSchema(rm, cfName, comparator, modificationTimestamp);
 
         // newly added columns
-        for (ByteBuffer name : columnDiff.entriesOnlyOnRight().keySet())
-            ColumnDefinition.addToSchema(m, curState.name, nameComparator, columnDefMap.get(name), modificationTimestamp);
+        for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
+            cd.toSchema(rm, cfName, comparator, modificationTimestamp);
 
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
-            ColumnDefinition.addToSchema(m, curState.name, nameComparator, columnDefMap.get(name), modificationTimestamp);
+        {
+            ColumnDefinition cd = newState.getColumnDefinition(name);
+            cd.toSchema(rm, cfName, comparator, modificationTimestamp);
+        }
 
-        return m;
+        return rm;
     }
 
     /**
@@ -1080,73 +927,180 @@ public final class CFMetaData
      */
     public RowMutation dropFromSchema(long timestamp)
     {
-        RowMutation m = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+        ColumnFamily cf = rm.addOrGet(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "id"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "type"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "comparator"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "subcomparator"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "comment"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "read_repair_chance"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "local_read_repair_chance"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "replicate_on_write"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "gc_grace_seconds"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "default_validator"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_validator"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "min_compaction_threshold"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "max_compaction_threshold"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_alias"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "bloom_filter_fp_chance"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "caching"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "compaction_strategy_class"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "compression_parameters"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "value_alias"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "column_aliases"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "compaction_strategy_options"));
 
-        for (CfDef._Fields field : CfDef._Fields.values())
-            m.delete(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(cfName, field.getFieldName())), timestamp);
+        for (ColumnDefinition cd : column_metadata.values())
+            cd.deleteFromSchema(rm, cfName, comparator, timestamp);
+
+        return rm;
+    }
 
-        for (ColumnDefinition columnDefinition : column_metadata.values())
-            ColumnDefinition.deleteFromSchema(m, cfName, comparator, columnDefinition.name, timestamp);
+    public void toSchema(RowMutation rm, long timestamp)
+    {
+        toSchemaNoColumns(rm, timestamp);
 
-        return m;
+        for (ColumnDefinition cd : column_metadata.values())
+            cd.toSchema(rm, cfName, comparator, timestamp);
+    }
+
+    private void toSchemaNoColumns(RowMutation rm, long timestamp)
+    {
+        // For property that can be null (and can be changed), we insert tombstones, to make sure
+        // we don't keep a property the user has removed
+        ColumnFamily cf = rm.addOrGet(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        cf.addColumn(Column.create(cfId, timestamp, cfName, "id"));
+        cf.addColumn(Column.create(cfType.toString(), timestamp, cfName, "type"));
+        cf.addColumn(Column.create(comparator.toString(), timestamp, cfName, "comparator"));
+        if (subcolumnComparator != null)
+            cf.addColumn(Column.create(subcolumnComparator.toString(), timestamp, cfName, "subcomparator"));
+        cf.addColumn(comment == null ? DeletedColumn.create(ldt, timestamp, cfName, "comment")
+                                     : Column.create(comment, timestamp, cfName, "comment"));
+        cf.addColumn(Column.create(readRepairChance, timestamp, cfName, "read_repair_chance"));
+        cf.addColumn(Column.create(dcLocalReadRepairChance, timestamp, cfName, "local_read_repair_chance"));
+        cf.addColumn(Column.create(replicateOnWrite, timestamp, cfName, "replicate_on_write"));
+        cf.addColumn(Column.create(gcGraceSeconds, timestamp, cfName, "gc_grace_seconds"));
+        cf.addColumn(Column.create(defaultValidator.toString(), timestamp, cfName, "default_validator"));
+        cf.addColumn(Column.create(keyValidator.toString(), timestamp, cfName, "key_validator"));
+        cf.addColumn(Column.create(minCompactionThreshold, timestamp, cfName, "min_compaction_threshold"));
+        cf.addColumn(Column.create(maxCompactionThreshold, timestamp, cfName, "max_compaction_threshold"));
+        cf.addColumn(keyAlias == null ? DeletedColumn.create(ldt, timestamp, cfName, "key_alias")
+                                      : Column.create(keyAlias, timestamp, cfName, "key_alias"));
+        cf.addColumn(bloomFilterFpChance == null ? DeletedColumn.create(ldt, timestamp, cfName, "bloomFilterFpChance")
+                                                 : Column.create(bloomFilterFpChance, timestamp, cfName, "bloom_filter_fp_chance"));
+        cf.addColumn(Column.create(caching.toString(), timestamp, cfName, "caching"));
+        cf.addColumn(Column.create(compactionStrategyClass.getName(), timestamp, cfName, "compaction_strategy_class"));
+        cf.addColumn(Column.create(json(compressionParameters.asThriftOptions()), timestamp, cfName, "compression_parameters"));
+        cf.addColumn(valueAlias == null ? DeletedColumn.create(ldt, timestamp, cfName, "value_alias")
+                                        : Column.create(valueAlias, timestamp, cfName, "value_alias"));
+        cf.addColumn(Column.create(json(columnAliasesAsStrings()), timestamp, cfName, "column_aliases"));
+        cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options"));
+    }
+
+    // Package protected for use by tests
+    static CFMetaData fromSchemaNoColumns(UntypedResultSet.Row result)
+    {
+        try
+        {
+            CFMetaData cfm = new CFMetaData(result.getString("keyspace"),
+                                            result.getString("columnfamily"),
+                                            ColumnFamilyType.valueOf(result.getString("type")),
+                                            TypeParser.parse(result.getString("comparator")),
+                                            result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null,
+                                            result.getInt("id"));
+            cfm.readRepairChance(result.getDouble("read_repair_chance"));
+            cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
+            cfm.replicateOnWrite(result.getBoolean("replicate_on_write"));
+            cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
+            cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
+            cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
+            cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
+            cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
+            if (result.has("comment"))
+                cfm.comment(result.getString("comment"));
+            if (result.has("key_alias"))
+                cfm.keyAlias(result.getBytes("key_alias"));
+            if (result.has("bloom_filter_fp_chance"))
+                cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
+            cfm.caching(Caching.valueOf(result.getString("caching")));
+            cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class")));
+            cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
+            if (result.has("value_alias"))
+                cfm.valueAlias(result.getBytes("value_alias"));
+            cfm.columnAliases(columnAliasesFromStrings(fromJsonList(result.getString("column_aliases"))));
+            cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
+
+            return cfm;
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
-     * Convert current metadata into schema mutation
-     *
-     * @param timestamp Timestamp to use
+     * Deserialize CF metadata from low-level representation
      *
-     * @return Low-level representation of the CF
+     * @return Thrift-based metadata deserialized from schema
      *
-     * @throws ConfigurationException if any of the attributes didn't pass validation
+     * @throws IOException on any I/O related error
      */
-    public RowMutation toSchema(long timestamp) throws ConfigurationException
+    public static CFMetaData fromSchema(UntypedResultSet.Row result)
     {
-        RowMutation mutation = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+        CFMetaData cfDef = fromSchemaNoColumns(result);
 
-        toSchema(mutation, toThrift(), timestamp);
+        Row serializedColumnDefinitions = ColumnDefinition.readSchema(cfDef.ksName, cfDef.cfName);
+        return addColumnDefinitionSchema(cfDef, serializedColumnDefinitions);
+    }
 
-        return mutation;
+    private static CFMetaData fromSchema(Row row)
+    {
+        UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies", row).one();
+        return fromSchema(result);
+    }
+
+    private List<String> columnAliasesAsStrings()
+    {
+        List<String> aliases = new ArrayList<String>(columnAliases.size());
+        for (ByteBuffer rawAlias : columnAliases)
+            aliases.add(UTF8Type.instance.compose(rawAlias));
+        return aliases;
+    }
+
+    private static List<ByteBuffer> columnAliasesFromStrings(List<String> aliases)
+    {
+        List<ByteBuffer> rawAliases = new ArrayList<ByteBuffer>(aliases.size());
+        for (String alias : aliases)
+            rawAliases.add(UTF8Type.instance.decompose(alias));
+        return rawAliases;
     }
 
     /**
-     * Convert given Thrift-serialized metadata into schema mutation
+     * Convert current metadata into schema mutation
      *
-     * @param mutation The mutation to include ColumnFamily attributes into (can contain keyspace attributes already)
-     * @param cfDef Thrift-serialized metadata to use as source for schema mutation
      * @param timestamp Timestamp to use
      *
+     * @return Low-level representation of the CF
+     *
      * @throws ConfigurationException if any of the attributes didn't pass validation
      */
-    public static void toSchema(RowMutation mutation, CfDef cfDef, long timestamp) throws ConfigurationException
+    public RowMutation toSchema(long timestamp) throws ConfigurationException
     {
-        applyImplicitDefaults(cfDef);
-
-        for (CfDef._Fields field : CfDef._Fields.values())
-        {
-            if (field.equals(CfDef._Fields.COLUMN_METADATA))
-                continue;
-
-            Object value = cfDef.isSet(field) ? cfDef.getFieldValue(field) : null;
-            mutation.add(new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF, null, compositeNameFor(cfDef.name, field.getFieldName())),
-                         valueAsBytes(value),
-                         timestamp);
-        }
-
-        if (!cfDef.isSetColumn_metadata())
-            return;
-
-        AbstractType comparator = getColumnDefinitionComparator(cfDef);
-
-        for (ColumnDef columnDef : cfDef.column_metadata)
-            ColumnDefinition.addToSchema(mutation, cfDef.name, comparator, columnDef, timestamp);
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, SystemTable.getSchemaKSKey(ksName));
+        toSchema(rm, timestamp);
+        return rm;
     }
 
     public static AbstractType<?> getColumnDefinitionComparator(CfDef cfDef) throws ConfigurationException
     {
         AbstractType<?> cfComparator = TypeParser.parse(cfDef.column_type.equals("Super")
-                                     ? cfDef.subcomparator_type
-                                     : cfDef.comparator_type);
+                                                        ? cfDef.subcomparator_type
+                                                        : cfDef.comparator_type);
 
         if (cfComparator instanceof CompositeType)
         {
@@ -1159,59 +1113,22 @@ public final class CFMetaData
         }
     }
 
-    /**
-     * Deserialize CF metadata from low-level representation
-     *
-     * @param serializedCfDef The data to use for deserialization
-     *
-     * @return Thrift-based metadata deserialized from schema
-     *
-     * @throws IOException on any I/O related error
-     */
-    public static CfDef fromSchema(ColumnFamily serializedCfDef) throws IOException
+    // Package protected for use by tests
+    static CFMetaData addColumnDefinitionSchema(CFMetaData cfDef, Row serializedColumnDefinitions)
     {
-        CfDef cfDef = fromSchemaNoColumnDefinition(serializedCfDef);
-
-        ColumnFamily serializedColumnDefinitions = ColumnDefinition.readSchema(cfDef.keyspace, cfDef.name);
-        return addColumnDefinitionSchema(cfDef, serializedColumnDefinitions);
+        for (ColumnDefinition cd : ColumnDefinition.fromSchema(serializedColumnDefinitions, cfDef.comparator))
+            cfDef.column_metadata.put(cd.name, cd);
+        return cfDef;
     }
 
-    // Package protected for use by tests
-    static CfDef fromSchemaNoColumnDefinition(ColumnFamily serializedCfDef)
+    public void addColumnDefinition(ColumnDefinition def)
     {
-        assert serializedCfDef != null;
-
-        CfDef cfDef = new CfDef();
-
-        AbstractType sysComparator = serializedCfDef.getComparator();
-
-        for (IColumn cfAttr : serializedCfDef.getSortedColumns())
-        {
-            if (cfAttr == null || cfAttr.isMarkedForDelete())
-                continue;
-
-            // column name format is <cf>:<attribute name>
-            String[] attr = sysComparator.getString(cfAttr.name()).split(":");
-            assert attr.length == 2;
-
-            CfDef._Fields field = CfDef._Fields.findByName(attr[1]);
-
-            // this means that given field was deprecated
-            // but still exists in the serialized schema
-            if (field == null)
-                continue;
-
-            cfDef.setFieldValue(field, deserializeValue(cfAttr.value(), getValueClass(CfDef.class, field.getFieldName())));
-        }
-        return cfDef;
+        column_metadata.put(def.name, def);
     }
 
-    // Package protected for use by tests
-    static CfDef addColumnDefinitionSchema(CfDef cfDef, ColumnFamily serializedColumnDefinitions)
+    public boolean removeColumnDefinition(ColumnDefinition def)
     {
-        for (ColumnDef columnDef : ColumnDefinition.fromSchema(serializedColumnDefinitions))
-            cfDef.addToColumn_metadata(columnDef);
-        return cfDef;
+        return column_metadata.remove(def.name) != null;
     }
 
     private void updateCfDef()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb00289/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index da006c2..e47ac07 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -22,21 +22,25 @@ package org.apache.cassandra.config;
 
 
 import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
 import com.google.common.collect.Maps;
 
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.migration.MigrationHelper;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.db.migration.MigrationHelper.*;
+import static org.apache.cassandra.utils.FBUtilities.json;
 
 public class ColumnDefinition
 {
@@ -48,6 +52,7 @@ public class ColumnDefinition
 
     public ColumnDefinition(ByteBuffer name, AbstractType<?> validator, IndexType index_type, Map<String, String> index_options, String index_name)
     {
+        assert name != null && validator != null;
         this.name = name;
         this.index_name = index_name;
         this.validator = validator;
@@ -55,6 +60,31 @@ public class ColumnDefinition
         this.setIndexType(index_type, index_options);
     }
 
+    public static ColumnDefinition ascii(String name)
+    {
+        return new ColumnDefinition(ByteBufferUtil.bytes(name), AsciiType.instance, null, null, null);
+    }
+
+    public static ColumnDefinition bool(String name)
+    {
+        return new ColumnDefinition(ByteBufferUtil.bytes(name), BooleanType.instance, null, null, null);
+    }
+
+    public static ColumnDefinition utf8(String name)
+    {
+        return new ColumnDefinition(ByteBufferUtil.bytes(name), UTF8Type.instance, null, null, null);
+    }
+
+    public static ColumnDefinition int32(String name)
+    {
+        return new ColumnDefinition(ByteBufferUtil.bytes(name), Int32Type.instance, null, null, null);
+    }
+
+    public static ColumnDefinition double_(String name)
+    {
+        return new ColumnDefinition(ByteBufferUtil.bytes(name), DoubleType.instance, null, null, null);
+    }
+
     @Override
     public boolean equals(Object o)
     {
@@ -86,22 +116,6 @@ public class ColumnDefinition
         return result;
     }
 
-    @Deprecated
-    public static ColumnDefinition fromAvro(org.apache.cassandra.db.migration.avro.ColumnDef cd)
-    {
-        IndexType index_type = cd.index_type == null ? null : Enum.valueOf(IndexType.class, cd.index_type.name());
-        String index_name = cd.index_name == null ? null : cd.index_name.toString();
-        try
-        {
-            AbstractType<?> validatorType = TypeParser.parse(cd.validation_class);
-            return new ColumnDefinition(ByteBufferUtil.clone(cd.name), validatorType, index_type, getStringMap(cd.index_options), index_name);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     public ColumnDef toThrift()
     {
         ColumnDef cd = new ColumnDef();
@@ -139,138 +153,102 @@ public class ColumnDefinition
         return cds;
     }
 
-    public static Map<ByteBuffer, ColumnDef> toMap(List<ColumnDef> columnDefs)
-    {
-        Map<ByteBuffer, ColumnDef> map = new HashMap<ByteBuffer, ColumnDef>();
-
-        if (columnDefs == null)
-            return map;
-
-        for (ColumnDef columnDef : columnDefs)
-            map.put(columnDef.name, columnDef);
-
-        return map;
-    }
-
     /**
-     * Drop specified column from the schema using given row mutation.
+     * Drop specified column from the schema using given row.
      *
-     * @param mutation   The schema row mutation
+     * @param rm         The schema row mutation
      * @param cfName     The name of the parent ColumnFamily
-     * @param comparator The comparator to serialize column name in human-readable format
-     * @param columnName The column name as String
      * @param timestamp  The timestamp to use for column modification
      */
-    public static void deleteFromSchema(RowMutation mutation, String cfName, AbstractType comparator, ByteBuffer columnName, long timestamp)
+    public void deleteFromSchema(RowMutation rm, String cfName, AbstractType<?> comparator, long timestamp)
     {
-        toSchema(mutation, comparator, cfName, columnName, null, timestamp, true);
-    }
+        ColumnFamily cf = rm.addOrGet(SystemTable.SCHEMA_COLUMNS_CF);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
 
-    /**
-     * Add new/update column to/in the schema.
-     *
-     * @param mutation   The schema row mutation
-     * @param cfName     The name of the parent ColumnFamily
-     * @param comparator The comparator to serialize column name in human-readable format
-     * @param columnDef  The Thrift-based column definition that contains all attributes
-     * @param timestamp  The timestamp to use for column modification
-     */
-    public static void addToSchema(RowMutation mutation, String cfName, AbstractType comparator, ColumnDef columnDef, long timestamp)
-    {
-        toSchema(mutation, comparator, cfName, columnDef.name, columnDef, timestamp, false);
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), "validator"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), "index_type"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), "index_options"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), "index_name"));
     }
 
-    /**
-     * Serialize given ColumnDef into given schema row mutation to add or drop it.
-     *
-     * @param mutation   The mutation to use for serialization
-     * @param comparator The comparator to serialize column name in human-readable format
-     * @param cfName     The name of the parent ColumnFamily
-     * @param columnName The column name as String
-     * @param columnDef  The Thrift-based column definition that contains all attributes
-     * @param timestamp  The timestamp to use for column modification
-     * @param delete     The flag which indicates if column should be deleted or added to the schema
-     */
-    private static void toSchema(RowMutation mutation, AbstractType comparator, String cfName, ByteBuffer columnName, ColumnDef columnDef, long timestamp, boolean delete)
+    public void toSchema(RowMutation rm, String cfName, AbstractType<?> comparator, long timestamp)
     {
-        for (ColumnDef._Fields field : ColumnDef._Fields.values())
-        {
-            QueryPath path = new QueryPath(SystemTable.SCHEMA_COLUMNS_CF,
-                                           null,
-                                           compositeNameFor(cfName,
-                                                            readableColumnName(columnName, comparator),
-                                                            field.getFieldName()));
-
-            if (delete)
-                mutation.delete(path, timestamp);
-            else
-                mutation.add(path, valueAsBytes(columnDef.getFieldValue(field)), timestamp);
-        }
+        ColumnFamily cf = rm.addOrGet(SystemTable.SCHEMA_COLUMNS_CF);
+        int ldt = (int) (System.currentTimeMillis() / 1000);
+
+        cf.addColumn(Column.create(validator.toString(), timestamp, cfName, comparator.getString(name), "validator"));
+        cf.addColumn(index_type == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), "index_type")
+                                        : Column.create(index_type.toString(), timestamp, cfName, comparator.getString(name), "index_type"));
+        cf.addColumn(index_options == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), "index_options")
+                                           : Column.create(json(index_options), timestamp, cfName, comparator.getString(name), "index_options"));
+        cf.addColumn(index_name == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), "index_name")
+                                        : Column.create(index_name, timestamp, cfName, comparator.getString(name), "index_name"));
     }
 
-    public static ColumnFamily readSchema(String ksName, String cfName)
+    public void apply(ColumnDefinition def, AbstractType<?> comparator)  throws ConfigurationException
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(SystemTable.getSchemaKSKey(ksName));
-        ColumnFamilyStore columnsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF);
-        return columnsStore.getColumnFamily(key,
-                                            new QueryPath(SystemTable.SCHEMA_COLUMNS_CF),
-                                            MigrationHelper.searchComposite(cfName, true),
-                                            MigrationHelper.searchComposite(cfName, false),
-                                            false,
-                                            Integer.MAX_VALUE);
+        // If an index is set (and not drop by this update), the validator shouldn't be change to a non-compatible one
+        if (getIndexType() != null && def.getIndexType() != null && !def.validator.isCompatibleWith(validator))
+            throw new ConfigurationException(String.format("Cannot modify validator to a non-compatible one for column %s since an index is set", comparator.getString(name)));
+
+        setValidator(def.getValidator());
+        setIndexType(def.getIndexType(), def.getIndexOptions());
+        setIndexName(def.getIndexName());
     }
 
     /**
      * Deserialize columns from low-level representation
      *
      * @return Thrift-based deserialized representation of the column
+     * @param row
      */
-    public static List<ColumnDef> fromSchema(ColumnFamily columns)
+    public static List<ColumnDefinition> fromSchema(Row row, AbstractType<?> comparator)
     {
-
-        if (columns == null || columns.isEmpty())
+        if (row.cf == null)
             return Collections.emptyList();
 
-        // contenders to be a valid columns, re-check is done after all attributes
-        // were read from serialized state, if ColumnDef has all required fields it gets promoted to be returned
-        Map<String, ColumnDef> contenders = new HashMap<String, ColumnDef>();
-
-        for (IColumn column : columns.getSortedColumns())
+        List<ColumnDefinition> cds = new ArrayList<ColumnDefinition>();
+        for (UntypedResultSet.Row result : QueryProcessor.resultify("SELECT * FROM system.schema_columns", row))
         {
-            if (column.isMarkedForDelete())
-                continue;
-
-            // column name format <cf>:<column name>:<attribute name>
-            String[] components = columns.getComparator().getString(column.name()).split(":");
-            assert components.length == 3;
-
-            ColumnDef columnDef = contenders.get(components[1]);
-
-            if (columnDef == null)
+            try
             {
-                columnDef = new ColumnDef();
-                contenders.put(components[1], columnDef);
+                IndexType index_type = null;
+                Map<String,String> index_options = null;
+                String index_name = null;
+
+                if (result.has("index_type"))
+                    index_type = IndexType.valueOf(result.getString("index_type"));
+                if (result.has("index_options"))
+                    index_options = FBUtilities.fromJsonMap(result.getString("index_options"));
+                if (result.has("index_name"))
+                    index_name = result.getString("index_name");
+
+                cds.add(new ColumnDefinition(comparator.fromString(result.getString("column")),
+                                             TypeParser.parse(result.getString("validator")),
+                                             index_type,
+                                             index_options,
+                                             index_name));
+            }
+            catch (ConfigurationException e)
+            {
+                throw new RuntimeException(e);
             }
-
-            ColumnDef._Fields field = ColumnDef._Fields.findByName(components[2]);
-
-            // this means that given field was deprecated
-            // but still exists in the serialized schema
-            if (field == null)
-                continue;
-
-            columnDef.setFieldValue(field, deserializeValue(column.value(), getValueClass(ColumnDef.class, field.getFieldName())));
         }
 
-        List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
-
-        for (ColumnDef columnDef : contenders.values())
-        {
-            if (columnDef.isSetName() && columnDef.isSetValidation_class())
-                columnDefs.add(columnDef);
-        }
+        return cds;
+    }
 
-        return columnDefs;
+    public static Row readSchema(String ksName, String cfName)
+    {
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(SystemTable.getSchemaKSKey(ksName));
+        ColumnFamilyStore columnsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF);
+        ColumnFamily cf = columnsStore.getColumnFamily(key,
+                                                       new QueryPath(SystemTable.SCHEMA_COLUMNS_CF),
+                                                       MigrationHelper.searchComposite(cfName, true),
+                                                       MigrationHelper.searchComposite(cfName, false),
+                                                       false,
+                                                       Integer.MAX_VALUE);
+        return new Row(key, cf);
     }
 
     @Override