You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:14 UTC

[50/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 6bff44d..4db53e7 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.config;
 
 import java.io.DataInput;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
@@ -33,6 +34,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.github.jamm.Unmetered;
 
 import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -41,19 +43,17 @@ import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.LZ4Compressor;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
-import org.github.jamm.Unmetered;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
@@ -63,6 +63,8 @@ public final class CFMetaData
 {
     private static final Logger logger = LoggerFactory.getLogger(CFMetaData.class);
 
+    public static final Serializer serializer = new Serializer();
+
     public final static double DEFAULT_READ_REPAIR_CHANCE = 0.0;
     public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.1;
     public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
@@ -167,15 +169,17 @@ public final class CFMetaData
     public final UUID cfId;                           // internal id, never exposed to user
     public final String ksName;                       // name of keyspace
     public final String cfName;                       // name of this column family
-    public final ColumnFamilyType cfType;             // standard, super
-    public volatile CellNameType comparator;          // bytes, long, timeuuid, utf8, etc.
+    public final boolean isSuper;                     // is a thrift super column family
+    public final boolean isCounter;                   // is a counter table
+    public volatile ClusteringComparator comparator;  // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns
+
+    private final Serializers serializers;
 
     //OPTIONAL
     private volatile String comment = "";
     private volatile double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
     private volatile double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
     private volatile int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
-    private volatile AbstractType<?> defaultValidator = BytesType.instance;
     private volatile AbstractType<?> keyValidator = BytesType.instance;
     private volatile int minCompactionThreshold = DEFAULT_MIN_COMPACTION_THRESHOLD;
     private volatile int maxCompactionThreshold = DEFAULT_MAX_COMPACTION_THRESHOLD;
@@ -186,7 +190,7 @@ public final class CFMetaData
     private volatile int memtableFlushPeriod = 0;
     private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
     private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
-    private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
+    private volatile Map<ColumnIdentifier, DroppedColumn> droppedColumns = new HashMap();
     private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
     private volatile boolean isPurged = false;
     /*
@@ -196,20 +200,18 @@ public final class CFMetaData
      * clustering key ones, those list are ordered by the "component index" of the
      * elements.
      */
-    public static final String DEFAULT_KEY_ALIAS = "key";
-    public static final String DEFAULT_COLUMN_ALIAS = "column";
-    public static final String DEFAULT_VALUE_ALIAS = "value";
-
-    // We call dense a CF for which each component of the comparator is a clustering column, i.e. no
-    // component is used to store a regular column names. In other words, non-composite static "thrift"
-    // and CQL3 CF are *not* dense.
-    private volatile Boolean isDense; // null means "we don't know and need to infer from other data"
 
     private volatile Map<ByteBuffer, ColumnDefinition> columnMetadata = new HashMap<>();
     private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
     private volatile List<ColumnDefinition> clusteringColumns;    // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
-    private volatile SortedSet<ColumnDefinition> regularColumns;  // We use a sorted set so iteration is of predictable order (for SELECT for instance)
-    private volatile SortedSet<ColumnDefinition> staticColumns;   // Same as above
+    private volatile PartitionColumns partitionColumns;
+
+    private final boolean isDense;
+    private final boolean isCompound;
+
+    // For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep
+    // that as convenience to access that column more easily (but we could replace calls by partitionColumns().iterator().next()
+    // for those tables in practice).
     private volatile ColumnDefinition compactValueColumn;
 
     public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -222,8 +224,6 @@ public final class CFMetaData
     public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
     public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
     public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
-    public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; return this;}
-    public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;}
     public CFMetaData minCompactionThreshold(int prop) {minCompactionThreshold = prop; return this;}
     public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
     public CFMetaData compactionStrategyClass(Class<? extends AbstractCompactionStrategy> prop) {compactionStrategyClass = prop; return this;}
@@ -236,52 +236,120 @@ public final class CFMetaData
     public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
     public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
     public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
-    public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
+    public CFMetaData droppedColumns(Map<ColumnIdentifier, DroppedColumn> cols) {droppedColumns = cols; return this;}
     public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
-    public CFMetaData isDense(Boolean prop) {isDense = prop; return this;}
 
-    /**
-     * Create new ColumnFamily metadata with generated random ID.
-     * When loading from existing schema, use CFMetaData
-     *
-     * @param keyspace keyspace name
-     * @param name column family name
-     * @param comp default comparator
-     */
-    public CFMetaData(String keyspace, String name, ColumnFamilyType type, CellNameType comp)
-    {
-        this(keyspace, name, type, comp, UUIDGen.getTimeUUID());
+    private CFMetaData(String keyspace,
+                       String name,
+                       UUID cfId,
+                       boolean isSuper,
+                       boolean isCounter,
+                       boolean isDense,
+                       boolean isCompound,
+                       List<ColumnDefinition> partitionKeyColumns,
+                       List<ColumnDefinition> clusteringColumns,
+                       PartitionColumns partitionColumns)
+    {
+        this.cfId = cfId;
+        this.ksName = keyspace;
+        this.cfName = name;
+        this.isDense = isDense;
+        this.isCompound = isCompound;
+        this.isSuper = isSuper;
+        this.isCounter = isCounter;
+
+        // A compact table should always have a clustering
+        assert isCQLTable() || !clusteringColumns.isEmpty() : String.format("For table %s.%s, isDense=%b, isCompound=%b, clustering=%s", ksName, cfName, isDense, isCompound, clusteringColumns);
+
+        this.partitionKeyColumns = partitionKeyColumns;
+        this.clusteringColumns = clusteringColumns;
+        this.partitionColumns = partitionColumns;
+
+        this.serializers = new Serializers(this);
+        rebuild();
     }
 
-    public CFMetaData(String keyspace, String name, ColumnFamilyType type, CellNameType comp, UUID id)
+    // This rebuild informations that are intrinsically duplicate of the table definition but
+    // are kept because they are often useful in a different format.
+    private void rebuild()
     {
-        cfId = id;
-        ksName = keyspace;
-        cfName = name;
-        cfType = type;
-        comparator = comp;
-    }
+        this.comparator = new ClusteringComparator(extractTypes(clusteringColumns));
 
-    public static CFMetaData denseCFMetaData(String keyspace, String name, AbstractType<?> comp, AbstractType<?> subcc)
-    {
-        CellNameType cellNameType = CellNames.fromAbstractType(makeRawAbstractType(comp, subcc), true);
-        return new CFMetaData(keyspace, name, subcc == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, cellNameType);
+        this.columnMetadata.clear();
+        for (ColumnDefinition def : partitionKeyColumns)
+            this.columnMetadata.put(def.name.bytes, def);
+        for (ColumnDefinition def : clusteringColumns)
+            this.columnMetadata.put(def.name.bytes, def);
+        for (ColumnDefinition def : partitionColumns)
+            this.columnMetadata.put(def.name.bytes, def);
+
+        List<AbstractType<?>> keyTypes = extractTypes(partitionKeyColumns);
+        this.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
+
+        if (isCompactTable())
+            this.compactValueColumn = CompactTables.getCompactValueColumn(partitionColumns, isSuper());
     }
 
-    public static CFMetaData sparseCFMetaData(String keyspace, String name, AbstractType<?> comp)
+    public static CFMetaData create(String ksName,
+                                    String name,
+                                    UUID cfId,
+                                    boolean isDense,
+                                    boolean isCompound,
+                                    boolean isSuper,
+                                    boolean isCounter,
+                                    List<ColumnDefinition> columns)
     {
-        CellNameType cellNameType = CellNames.fromAbstractType(comp, false);
-        return new CFMetaData(keyspace, name, ColumnFamilyType.Standard, cellNameType);
+        List<ColumnDefinition> partitions = new ArrayList<>();
+        List<ColumnDefinition> clusterings = new ArrayList<>();
+        PartitionColumns.Builder builder = PartitionColumns.builder();
+
+        for (ColumnDefinition column : columns)
+        {
+            switch (column.kind)
+            {
+                case PARTITION_KEY:
+                    partitions.add(column);
+                    break;
+                case CLUSTERING_COLUMN:
+                    clusterings.add(column);
+                    break;
+                default:
+                    builder.add(column);
+                    break;
+            }
+        }
+
+        Collections.sort(partitions);
+        Collections.sort(clusterings);
+
+        return new CFMetaData(ksName,
+                              name,
+                              cfId,
+                              isSuper,
+                              isCounter,
+                              isDense,
+                              isCompound,
+                              partitions,
+                              clusterings,
+                              builder.build());
     }
 
-    public static CFMetaData denseCFMetaData(String keyspace, String name, AbstractType<?> comp)
+    private static List<AbstractType<?>> extractTypes(List<ColumnDefinition> clusteringColumns)
     {
-        return denseCFMetaData(keyspace, name, comp, null);
+        List<AbstractType<?>> types = new ArrayList<>(clusteringColumns.size());
+        for (ColumnDefinition def : clusteringColumns)
+            types.add(def.type);
+        return types;
     }
 
-    public static AbstractType<?> makeRawAbstractType(AbstractType<?> comparator, AbstractType<?> subComparator)
+    /**
+     * There is a couple of places in the code where we need a CFMetaData object and don't have one readily available
+     * and know that only the keyspace and name matter. This creates such "fake" metadata. Use only if you know what
+     * you're doing.
+     */
+    public static CFMetaData createFake(String keyspace, String name)
     {
-        return subComparator == null ? comparator : CompositeType.getInstance(Arrays.asList(comparator, subComparator));
+        return CFMetaData.Builder.create(keyspace, name).addPartitionKey("key", BytesType.instance).build();
     }
 
     public Map<String, TriggerDefinition> getTriggers()
@@ -289,14 +357,21 @@ public final class CFMetaData
         return triggers;
     }
 
+    // Compiles a system metadata
     public static CFMetaData compile(String cql, String keyspace)
     {
         CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(cql);
         parsed.prepareKeyspace(keyspace);
         CreateTableStatement statement = (CreateTableStatement) parsed.prepare().statement;
-        CFMetaData cfm = newSystemMetadata(keyspace, statement.columnFamily(), "", statement.comparator);
+        CFMetaData.Builder builder = statement.metadataBuilder();
+        builder.withId(generateLegacyCfId(keyspace, statement.columnFamily()));
+        CFMetaData cfm = builder.build();
         statement.applyPropertiesTo(cfm);
-        return cfm.rebuild();
+
+        return cfm.readRepairChance(0)
+                  .dcLocalReadRepairChance(0)
+                  .gcGraceSeconds(0)
+                  .memtableFlushPeriod(3600 * 1000);
     }
 
     /**
@@ -310,26 +385,7 @@ public final class CFMetaData
         return UUID.nameUUIDFromBytes(ArrayUtils.addAll(ksName.getBytes(), cfName.getBytes()));
     }
 
-    private static CFMetaData newSystemMetadata(String keyspace, String cfName, String comment, CellNameType comparator)
-    {
-        return new CFMetaData(keyspace, cfName, ColumnFamilyType.Standard, comparator, generateLegacyCfId(keyspace, cfName))
-                             .comment(comment)
-                             .readRepairChance(0)
-                             .dcLocalReadRepairChance(0)
-                             .gcGraceSeconds(0)
-                             .memtableFlushPeriod(3600 * 1000);
-    }
-
-    /**
-     * Creates CFMetaData for secondary index CF.
-     * Secondary index CF has the same CF ID as parent's.
-     *
-     * @param parent Parent CF where secondary index is created
-     * @param info Column definition containing secondary index definition
-     * @param indexComparator Comparator for secondary index
-     * @return CFMetaData for secondary index
-     */
-    public static CFMetaData newIndexMetadata(CFMetaData parent, ColumnDefinition info, CellNameType indexComparator)
+    public CFMetaData reloadIndexMetadataProperties(CFMetaData parent)
     {
         // Depends on parent's cache setting, turn on its index CF's cache.
         // Row caching is never enabled; see CASSANDRA-5732
@@ -337,32 +393,21 @@ public final class CFMetaData
                                     ? CachingOptions.KEYS_ONLY
                                     : CachingOptions.NONE;
 
-        return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, indexComparator, parent.cfId)
-                             .keyValidator(info.type)
-                             .readRepairChance(0.0)
-                             .dcLocalReadRepairChance(0.0)
-                             .gcGraceSeconds(0)
-                             .caching(indexCaching)
-                             .speculativeRetry(parent.speculativeRetry)
-                             .compactionStrategyClass(parent.compactionStrategyClass)
-                             .compactionStrategyOptions(parent.compactionStrategyOptions)
-                             .reloadSecondaryIndexMetadata(parent)
-                             .rebuild();
-    }
-
-    public CFMetaData reloadSecondaryIndexMetadata(CFMetaData parent)
-    {
-        minCompactionThreshold(parent.minCompactionThreshold);
-        maxCompactionThreshold(parent.maxCompactionThreshold);
-        compactionStrategyClass(parent.compactionStrategyClass);
-        compactionStrategyOptions(parent.compactionStrategyOptions);
-        compressionParameters(parent.compressionParameters);
-        return this;
+        return this.readRepairChance(0.0)
+                   .dcLocalReadRepairChance(0.0)
+                   .gcGraceSeconds(0)
+                   .caching(indexCaching)
+                   .speculativeRetry(parent.speculativeRetry)
+                   .minCompactionThreshold(parent.minCompactionThreshold)
+                   .maxCompactionThreshold(parent.maxCompactionThreshold)
+                   .compactionStrategyClass(parent.compactionStrategyClass)
+                   .compactionStrategyOptions(parent.compactionStrategyOptions)
+                   .compressionParameters(parent.compressionParameters);
     }
 
     public CFMetaData copy()
     {
-        return copyOpts(new CFMetaData(ksName, cfName, cfType, comparator, cfId), this);
+        return copy(cfId);
     }
 
     /**
@@ -373,23 +418,42 @@ public final class CFMetaData
      */
     public CFMetaData copy(UUID newCfId)
     {
-        return copyOpts(new CFMetaData(ksName, cfName, cfType, comparator, newCfId), this);
+        return copyOpts(new CFMetaData(ksName,
+                                       cfName,
+                                       newCfId,
+                                       isSuper,
+                                       isCounter,
+                                       isDense,
+                                       isCompound,
+                                       copy(partitionKeyColumns),
+                                       copy(clusteringColumns),
+                                       copy(partitionColumns)),
+                        this);
+    }
+
+    private static List<ColumnDefinition> copy(List<ColumnDefinition> l)
+    {
+        List<ColumnDefinition> copied = new ArrayList<>(l.size());
+        for (ColumnDefinition cd : l)
+            copied.add(cd.copy());
+        return copied;
+    }
+
+    private static PartitionColumns copy(PartitionColumns columns)
+    {
+        PartitionColumns.Builder newColumns = PartitionColumns.builder();
+        for (ColumnDefinition cd : columns)
+            newColumns.add(cd.copy());
+        return newColumns.build();
     }
 
     @VisibleForTesting
     public static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
     {
-        List<ColumnDefinition> clonedColumns = new ArrayList<>(oldCFMD.allColumns().size());
-        for (ColumnDefinition cd : oldCFMD.allColumns())
-            clonedColumns.add(cd.copy());
-
-        return newCFMD.addAllColumnDefinitions(clonedColumns)
-                      .comment(oldCFMD.comment)
+        return newCFMD.comment(oldCFMD.comment)
                       .readRepairChance(oldCFMD.readRepairChance)
                       .dcLocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
                       .gcGraceSeconds(oldCFMD.gcGraceSeconds)
-                      .defaultValidator(oldCFMD.defaultValidator)
-                      .keyValidator(oldCFMD.keyValidator)
                       .minCompactionThreshold(oldCFMD.minCompactionThreshold)
                       .maxCompactionThreshold(oldCFMD.maxCompactionThreshold)
                       .compactionStrategyClass(oldCFMD.compactionStrategyClass)
@@ -403,9 +467,7 @@ public final class CFMetaData
                       .speculativeRetry(oldCFMD.speculativeRetry)
                       .memtableFlushPeriod(oldCFMD.memtableFlushPeriod)
                       .droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
-                      .triggers(new HashMap<>(oldCFMD.triggers))
-                      .isDense(oldCFMD.isDense)
-                      .rebuild();
+                      .triggers(new HashMap<>(oldCFMD.triggers));
     }
 
     /**
@@ -429,7 +491,7 @@ public final class CFMetaData
 
     public boolean isSuper()
     {
-        return cfType == ColumnFamilyType.Super;
+        return isSuper;
     }
 
     /**
@@ -476,14 +538,16 @@ public final class CFMetaData
         return ReadRepairDecision.NONE;
     }
 
-    public int getGcGraceSeconds()
+    public AbstractType<?> getColumnDefinitionNameComparator(ColumnDefinition.Kind kind)
     {
-        return gcGraceSeconds;
+        return (isSuper() && kind == ColumnDefinition.Kind.REGULAR) || (isStaticCompactTable() && kind == ColumnDefinition.Kind.STATIC)
+             ? thriftColumnNameType()
+             : UTF8Type.instance;
     }
 
-    public AbstractType<?> getDefaultValidator()
+    public int getGcGraceSeconds()
     {
-        return defaultValidator;
+        return gcGraceSeconds;
     }
 
     public AbstractType<?> getKeyValidator()
@@ -512,15 +576,21 @@ public final class CFMetaData
     }
 
     // An iterator over all column definitions but that respect the order of a SELECT *.
+    // This also "hide" the clustering/regular columns for a non-CQL3 non-dense table for backward compatibility
+    // sake (those are accessible through thrift but not through CQL currently).
     public Iterator<ColumnDefinition> allColumnsInSelectOrder()
     {
+        final boolean isStaticCompactTable = isStaticCompactTable();
+        final boolean noNonPkColumns = isCompactTable() && CompactTables.hasEmptyCompactValue(this);
         return new AbstractIterator<ColumnDefinition>()
         {
             private final Iterator<ColumnDefinition> partitionKeyIter = partitionKeyColumns.iterator();
-            private final Iterator<ColumnDefinition> clusteringIter = clusteringColumns.iterator();
-            private boolean valueDone;
-            private final Iterator<ColumnDefinition> staticIter = staticColumns.iterator();
-            private final Iterator<ColumnDefinition> regularIter = regularColumns.iterator();
+            private final Iterator<ColumnDefinition> clusteringIter = isStaticCompactTable ? Collections.<ColumnDefinition>emptyIterator() : clusteringColumns.iterator();
+            private final Iterator<ColumnDefinition> otherColumns = noNonPkColumns
+                                                                  ? Collections.<ColumnDefinition>emptyIterator()
+                                                                  : (isStaticCompactTable
+                                                                     ?  partitionColumns.statics.selectOrderIterator()
+                                                                     :  partitionColumns.selectOrderIterator());
 
             protected ColumnDefinition computeNext()
             {
@@ -530,22 +600,7 @@ public final class CFMetaData
                 if (clusteringIter.hasNext())
                     return clusteringIter.next();
 
-                if (staticIter.hasNext())
-                    return staticIter.next();
-
-                if (compactValueColumn != null && !valueDone)
-                {
-                    valueDone = true;
-                    // If the compactValueColumn is empty, this means we have a dense table but
-                    // with only a PK. As far as selects are concerned, we should ignore the value.
-                    if (compactValueColumn.name.bytes.hasRemaining())
-                        return compactValueColumn;
-                }
-
-                if (regularIter.hasNext())
-                    return regularIter.next();
-
-                return endOfData();
+                return otherColumns.hasNext() ? otherColumns.next() : endOfData();
             }
         };
     }
@@ -560,33 +615,38 @@ public final class CFMetaData
         return clusteringColumns;
     }
 
-    public Set<ColumnDefinition> regularColumns()
+    public PartitionColumns partitionColumns()
     {
-        return regularColumns;
+        return partitionColumns;
     }
 
-    public Set<ColumnDefinition> staticColumns()
+    public ColumnDefinition compactValueColumn()
     {
-        return staticColumns;
+        return compactValueColumn;
     }
 
-    public Iterable<ColumnDefinition> regularAndStaticColumns()
+    public ClusteringComparator getKeyValidatorAsClusteringComparator()
     {
-        return Iterables.concat(staticColumns, regularColumns);
+        boolean isCompound = keyValidator instanceof CompositeType;
+        List<AbstractType<?>> types = isCompound
+                                    ? ((CompositeType) keyValidator).types
+                                    : Collections.<AbstractType<?>>singletonList(keyValidator);
+        return new ClusteringComparator(types);
     }
 
-    public ColumnDefinition compactValueColumn()
+    public static ByteBuffer serializePartitionKey(ClusteringPrefix keyAsClustering)
     {
-        return compactValueColumn;
-    }
+        // TODO: we should stop using Clustering for partition keys. Maybe we can add
+        // a few methods to DecoratedKey so we don't have to (note that while using a Clustering
+        // allows to use buildBound(), it's actually used for partition keys only when every restriction
+        // is an equal, so we could easily create a specific method for keys for that.
+        if (keyAsClustering.size() == 1)
+            return keyAsClustering.get(0);
 
-    // TODO: we could use CType for key validation too to make this unnecessary but
-    // it's unclear it would be a win overall
-    public CType getKeyValidatorAsCType()
-    {
-        return keyValidator instanceof CompositeType
-             ? new CompoundCType(((CompositeType) keyValidator).types)
-             : new SimpleCType(keyValidator);
+        ByteBuffer[] values = new ByteBuffer[keyAsClustering.size()];
+        for (int i = 0; i < keyAsClustering.size(); i++)
+            values[i] = keyAsClustering.get(i);
+        return CompositeType.build(values);
     }
 
     public double getBloomFilterFpChance()
@@ -627,14 +687,26 @@ public final class CFMetaData
         return defaultTimeToLive;
     }
 
-    public Map<ColumnIdentifier, Long> getDroppedColumns()
+    public Map<ColumnIdentifier, DroppedColumn> getDroppedColumns()
     {
         return droppedColumns;
     }
 
-    public Boolean getIsDense()
+    /**
+     * Returns a "fake" ColumnDefinition corresponding to the dropped column {@code name}
+     * of {@code null} if there is no such dropped column.
+     */
+    public ColumnDefinition getDroppedColumnDefinition(ByteBuffer name)
     {
-        return isDense;
+        DroppedColumn dropped = droppedColumns.get(name);
+        if (dropped == null)
+            return null;
+
+        // We need the type for deserialization purpose. If we don't have the type however,
+        // it means that it's a dropped column from before 3.0, and in that case using
+        // BytesType is fine for what we'll be using it for, even if that's a hack.
+        AbstractType<?> type = dropped.type == null ? BytesType.instance : dropped.type;
+        return ColumnDefinition.regularDef(this, name, type, null);
     }
 
     @Override
@@ -651,13 +723,15 @@ public final class CFMetaData
         return Objects.equal(cfId, other.cfId)
             && Objects.equal(ksName, other.ksName)
             && Objects.equal(cfName, other.cfName)
-            && Objects.equal(cfType, other.cfType)
+            && Objects.equal(isDense, other.isDense)
+            && Objects.equal(isCompound, other.isCompound)
+            && Objects.equal(isSuper, other.isSuper)
+            && Objects.equal(isCounter, other.isCounter)
             && Objects.equal(comparator, other.comparator)
             && Objects.equal(comment, other.comment)
             && Objects.equal(readRepairChance, other.readRepairChance)
             && Objects.equal(dcLocalReadRepairChance, other.dcLocalReadRepairChance)
             && Objects.equal(gcGraceSeconds, other.gcGraceSeconds)
-            && Objects.equal(defaultValidator, other.defaultValidator)
             && Objects.equal(keyValidator, other.keyValidator)
             && Objects.equal(minCompactionThreshold, other.minCompactionThreshold)
             && Objects.equal(maxCompactionThreshold, other.maxCompactionThreshold)
@@ -673,8 +747,7 @@ public final class CFMetaData
             && Objects.equal(maxIndexInterval, other.maxIndexInterval)
             && Objects.equal(speculativeRetry, other.speculativeRetry)
             && Objects.equal(droppedColumns, other.droppedColumns)
-            && Objects.equal(triggers, other.triggers)
-            && Objects.equal(isDense, other.isDense);
+            && Objects.equal(triggers, other.triggers);
     }
 
     @Override
@@ -684,13 +757,15 @@ public final class CFMetaData
             .append(cfId)
             .append(ksName)
             .append(cfName)
-            .append(cfType)
+            .append(isDense)
+            .append(isCompound)
+            .append(isSuper)
+            .append(isCounter)
             .append(comparator)
             .append(comment)
             .append(readRepairChance)
             .append(dcLocalReadRepairChance)
             .append(gcGraceSeconds)
-            .append(defaultValidator)
             .append(keyValidator)
             .append(minCompactionThreshold)
             .append(maxCompactionThreshold)
@@ -707,16 +782,9 @@ public final class CFMetaData
             .append(speculativeRetry)
             .append(droppedColumns)
             .append(triggers)
-            .append(isDense)
             .toHashCode();
     }
 
-    public AbstractType<?> getValueValidator(CellName cellName)
-    {
-        ColumnDefinition def = getColumnDefinition(cellName);
-        return def == null ? defaultValidator : def.type;
-    }
-
     /**
      * Updates this object in place to match the definition in the system schema tables.
      * @return true if any columns were added, removed, or altered; otherwise, false is returned
@@ -739,10 +807,13 @@ public final class CFMetaData
 
         validateCompatility(cfm);
 
-        // TODO: this method should probably return a new CFMetaData so that
-        // 1) we can keep comparator final
-        // 2) updates are applied atomically
-        comparator = cfm.comparator;
+        partitionKeyColumns = cfm.partitionKeyColumns;
+        clusteringColumns = cfm.clusteringColumns;
+
+        boolean hasColumnChange = !partitionColumns.equals(cfm.partitionColumns);
+        partitionColumns = cfm.partitionColumns;
+
+        rebuild();
 
         // compaction thresholds are checked by ThriftValidation. We shouldn't be doing
         // validation on the apply path; it's too late for that.
@@ -751,7 +822,6 @@ public final class CFMetaData
         readRepairChance = cfm.readRepairChance;
         dcLocalReadRepairChance = cfm.dcLocalReadRepairChance;
         gcGraceSeconds = cfm.gcGraceSeconds;
-        defaultValidator = cfm.defaultValidator;
         keyValidator = cfm.keyValidator;
         minCompactionThreshold = cfm.minCompactionThreshold;
         maxCompactionThreshold = cfm.maxCompactionThreshold;
@@ -767,21 +837,6 @@ public final class CFMetaData
         if (!cfm.droppedColumns.isEmpty())
             droppedColumns = cfm.droppedColumns;
 
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, cfm.columnMetadata);
-        // columns that are no longer needed
-        for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
-            removeColumnDefinition(cd);
-        // newly added columns
-        for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
-            addColumnDefinition(cd);
-        // old columns with updated attributes
-        for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
-        {
-            ColumnDefinition oldDef = columnMetadata.get(name);
-            ColumnDefinition def = cfm.columnMetadata.get(name);
-            addOrReplaceColumnDefinition(oldDef.apply(def));
-        }
-
         compactionStrategyClass = cfm.compactionStrategyClass;
         compactionStrategyOptions = cfm.compactionStrategyOptions;
 
@@ -789,14 +844,9 @@ public final class CFMetaData
 
         triggers = cfm.triggers;
 
-        isDense(cfm.isDense);
-
-        rebuild();
         logger.debug("application result is {}", this);
 
-        return !columnDiff.entriesOnlyOnLeft().isEmpty() ||
-               !columnDiff.entriesOnlyOnRight().isEmpty() ||
-               !columnDiff.entriesDiffering().isEmpty();
+        return hasColumnChange;
     }
 
     public void validateCompatility(CFMetaData cfm) throws ConfigurationException
@@ -812,8 +862,8 @@ public final class CFMetaData
             throw new ConfigurationException(String.format("Column family ID mismatch (found %s; expected %s)",
                                                            cfm.cfId, cfId));
 
-        if (cfm.cfType != cfType)
-            throw new ConfigurationException(String.format("Column family types do not match (found %s; expected %s).", cfm.cfType, cfType));
+        if (cfm.isDense != isDense || cfm.isCompound != isCompound || cfm.isCounter != isCounter || cfm.isSuper != isSuper)
+            throw new ConfigurationException("types do not match.");
 
         if (!cfm.comparator.isCompatibleWith(comparator))
             throw new ConfigurationException(String.format("Column family comparators do not match or are not compatible (found %s; expected %s).", cfm.comparator.getClass().getSimpleName(), comparator.getClass().getSimpleName()));
@@ -891,27 +941,6 @@ public final class CFMetaData
         return columnMetadata.get(name);
     }
 
-    /**
-     * Returns a ColumnDefinition given a cell name.
-     */
-    public ColumnDefinition getColumnDefinition(CellName cellName)
-    {
-        ColumnIdentifier id = cellName.cql3ColumnName(this);
-        ColumnDefinition def = id == null
-                             ? getColumnDefinition(cellName.toByteBuffer())  // Means a dense layout, try the full column name
-                             : getColumnDefinition(id);
-
-        // It's possible that the def is a PRIMARY KEY or COMPACT_VALUE one in case a concrete cell
-        // name conflicts with a CQL column name, which can happen in 2 cases:
-        // 1) because the user inserted a cell through Thrift that conflicts with a default "alias" used
-        //    by CQL for thrift tables (see #6892).
-        // 2) for COMPACT STORAGE tables with a single utf8 clustering column, the cell name can be anything,
-        //    including a CQL column name (without this being a problem).
-        // In any case, this is fine, this just mean that columnDefinition is not the ColumnDefinition we are
-        // looking for.
-        return def != null && def.isPartOfCellName() ? def : null;
-    }
-
     public ColumnDefinition getColumnDefinitionForIndex(String indexName)
     {
         for (ColumnDefinition def : allColumns())
@@ -970,21 +999,6 @@ public final class CFMetaData
         return (cfName + "_" + columnName + "_idx").replaceAll("\\W", "");
     }
 
-    public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, Version version)
-    {
-        return getOnDiskIterator(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
-    }
-
-    public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version)
-    {
-        return version.getSSTableFormat().getOnDiskIterator(in, flag, expireBefore, this, version);
-    }
-
-    public AtomDeserializer getOnDiskDeserializer(DataInput in, Version version)
-    {
-        return new AtomDeserializer(comparator, in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
-    }
-
     public static boolean isNameValid(String name)
     {
         return name != null && !name.isEmpty() && name.length() <= Schema.NAME_LENGTH && name.matches("\\w+");
@@ -1004,9 +1018,6 @@ public final class CFMetaData
         if (!isNameValid(cfName))
             throw new ConfigurationException(String.format("ColumnFamily name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", Schema.NAME_LENGTH, cfName));
 
-        if (cfType == null)
-            throw new ConfigurationException(String.format("Invalid column family type for %s", cfName));
-
         for (int i = 0; i < comparator.size(); i++)
         {
             if (comparator.subtype(i) instanceof CounterColumnType)
@@ -1016,10 +1027,10 @@ public final class CFMetaData
             throw new ConfigurationException("CounterColumnType is not a valid key validator");
 
         // Mixing counter with non counter columns is not supported (#2614)
-        if (defaultValidator instanceof CounterColumnType)
+        if (isCounter)
         {
-            for (ColumnDefinition def : regularAndStaticColumns())
-                if (!(def.type instanceof CounterColumnType))
+            for (ColumnDefinition def : partitionColumns())
+                if (!(def.type instanceof CounterColumnType) && !CompactTables.isSuperColumnMapColumn(def))
                     throw new ConfigurationException("Cannot add a non counter column (" + def.name + ") in a counter column family");
         }
         else
@@ -1040,7 +1051,7 @@ public final class CFMetaData
             }
             else
             {
-                if (cfType == ColumnFamilyType.Super)
+                if (isSuper)
                     throw new ConfigurationException("Secondary indexes are not supported on super column families");
                 if (!isIndexNameValid(c.getIndexName()))
                     throw new ConfigurationException("Illegal index name " + c.getIndexName());
@@ -1117,27 +1128,16 @@ public final class CFMetaData
     }
 
     // The comparator to validate the definition name.
-
-    public AbstractType<?> getColumnDefinitionComparator(ColumnDefinition def)
-    {
-        return getComponentComparator(def.isOnAllComponents() ? null : def.position(), def.kind);
-    }
-
-    public AbstractType<?> getComponentComparator(Integer componentIndex, ColumnDefinition.Kind kind)
+    public AbstractType<?> thriftColumnNameType()
     {
-        switch (kind)
+        if (isSuper())
         {
-            case REGULAR:
-                if (componentIndex == null)
-                    return comparator.asAbstractType();
-
-                AbstractType<?> t = comparator.subtype(componentIndex);
-                assert t != null : "Non-sensical component index";
-                return t;
-            default:
-                // CQL3 column names are UTF8
-                return UTF8Type.instance;
+            ColumnDefinition def = compactValueColumn();
+            assert def != null && def.type instanceof MapType;
+            return ((MapType)def.type).nameComparator();
         }
+
+        return UTF8Type.instance;
     }
 
     public CFMetaData addAllColumnDefinitions(Collection<ColumnDefinition> defs)
@@ -1159,17 +1159,43 @@ public final class CFMetaData
     // know this cannot happen.
     public CFMetaData addOrReplaceColumnDefinition(ColumnDefinition def)
     {
-        if (def.kind == ColumnDefinition.Kind.REGULAR)
-            comparator.addCQL3Column(def.name);
-        columnMetadata.put(def.name.bytes, def);
+        // Adds the definition and rebuild what is necessary. We could call rebuild() but it's not too hard to
+        // only rebuild the necessary bits.
+        switch (def.kind)
+        {
+            case PARTITION_KEY:
+                partitionKeyColumns.set(def.position(), def);
+                List<AbstractType<?>> keyTypes = extractTypes(partitionKeyColumns);
+                keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
+                break;
+            case CLUSTERING_COLUMN:
+                clusteringColumns.set(def.position(), def);
+                comparator = new ClusteringComparator(extractTypes(clusteringColumns));
+                break;
+            case REGULAR:
+            case STATIC:
+                PartitionColumns.Builder builder = PartitionColumns.builder();
+                for (ColumnDefinition column : partitionColumns)
+                    if (!column.name.equals(def.name))
+                        builder.add(column);
+                builder.add(def);
+                partitionColumns = builder.build();
+                // If dense, we must have modified the compact value since that's the only one we can have.
+                if (isDense)
+                    this.compactValueColumn = def;
+                break;
+        }
+        this.columnMetadata.put(def.name.bytes, def);
         return this;
     }
 
     public boolean removeColumnDefinition(ColumnDefinition def)
     {
-        if (def.kind == ColumnDefinition.Kind.REGULAR)
-            comparator.removeCQL3Column(def.name);
-        return columnMetadata.remove(def.name.bytes) != null;
+        assert !def.isPartitionKey();
+        boolean removed = columnMetadata.remove(def.name.bytes) != null;
+        if (removed)
+            partitionColumns = partitionColumns.without(def);
+        return removed;
     }
 
     public void addTriggerDefinition(TriggerDefinition def) throws InvalidRequestException
@@ -1192,8 +1218,7 @@ public final class CFMetaData
 
     public void recordColumnDrop(ColumnDefinition def)
     {
-        assert !def.isOnAllComponents();
-        droppedColumns.put(def.name, FBUtilities.timestampMicros());
+        droppedColumns.put(def.name, new DroppedColumn(def.type, FBUtilities.timestampMicros()));
     }
 
     public void renameColumn(ColumnIdentifier from, ColumnIdentifier to) throws InvalidRequestException
@@ -1205,7 +1230,7 @@ public final class CFMetaData
         if (getColumnDefinition(to) != null)
             throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", from, to, cfName));
 
-        if (def.isPartOfCellName())
+        if (def.isPartOfCellName(isCQLTable(), isSuper()))
         {
             throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", from));
         }
@@ -1215,188 +1240,37 @@ public final class CFMetaData
         }
 
         ColumnDefinition newDef = def.withNewName(to);
-        // don't call addColumnDefinition/removeColumnDefition because we want to avoid recomputing
-        // the CQL3 cfDef between those two operation
         addOrReplaceColumnDefinition(newDef);
-        removeColumnDefinition(def);
-    }
-
-    public CFMetaData rebuild()
-    {
-        if (isDense == null)
-            isDense(calculateIsDense(comparator.asAbstractType(), allColumns()));
-
-        List<ColumnDefinition> pkCols = nullInitializedList(keyValidator.componentsCount());
-        List<ColumnDefinition> ckCols = nullInitializedList(comparator.clusteringPrefixSize());
-        // We keep things sorted to get consistent/predictable order in select queries
-        SortedSet<ColumnDefinition> regCols = new TreeSet<>(regularColumnComparator);
-        SortedSet<ColumnDefinition> statCols = new TreeSet<>(regularColumnComparator);
-        ColumnDefinition compactCol = null;
 
-        for (ColumnDefinition def : allColumns())
-        {
-            switch (def.kind)
-            {
-                case PARTITION_KEY:
-                    assert !(def.isOnAllComponents() && keyValidator instanceof CompositeType);
-                    pkCols.set(def.position(), def);
-                    break;
-                case CLUSTERING_COLUMN:
-                    assert !(def.isOnAllComponents() && comparator.isCompound());
-                    ckCols.set(def.position(), def);
-                    break;
-                case REGULAR:
-                    regCols.add(def);
-                    break;
-                case STATIC:
-                    statCols.add(def);
-                    break;
-                case COMPACT_VALUE:
-                    assert compactCol == null : "There shouldn't be more than one compact value defined: got " + compactCol + " and " + def;
-                    compactCol = def;
-                    break;
-            }
-        }
-
-        // Now actually assign the correct value. This is not atomic, but then again, updating CFMetaData is never atomic anyway.
-        partitionKeyColumns = addDefaultKeyAliases(pkCols);
-        clusteringColumns = addDefaultColumnAliases(ckCols);
-        regularColumns = regCols;
-        staticColumns = statCols;
-        compactValueColumn = addDefaultValueAlias(compactCol);
-        return this;
-    }
-
-    private List<ColumnDefinition> addDefaultKeyAliases(List<ColumnDefinition> pkCols)
-    {
-        for (int i = 0; i < pkCols.size(); i++)
-        {
-            if (pkCols.get(i) == null)
-            {
-                Integer idx = null;
-                AbstractType<?> type = keyValidator;
-                if (keyValidator instanceof CompositeType)
-                {
-                    idx = i;
-                    type = ((CompositeType)keyValidator).types.get(i);
-                }
-                // For compatibility sake, we call the first alias 'key' rather than 'key1'. This
-                // is inconsistent with column alias, but it's probably not worth risking breaking compatibility now.
-                ByteBuffer name = ByteBufferUtil.bytes(i == 0 ? DEFAULT_KEY_ALIAS : DEFAULT_KEY_ALIAS + (i + 1));
-                ColumnDefinition newDef = ColumnDefinition.partitionKeyDef(this, name, type, idx);
-                addOrReplaceColumnDefinition(newDef);
-                pkCols.set(i, newDef);
-            }
-        }
-        return pkCols;
+        // removeColumnDefinition doesn't work for partition key (expectedly) but renaming one is fine so we still
+        // want to update columnMetadata.
+        if (def.isPartitionKey())
+            columnMetadata.remove(def.name.bytes);
+        else
+            removeColumnDefinition(def);
     }
 
-    private List<ColumnDefinition> addDefaultColumnAliases(List<ColumnDefinition> ckCols)
+    public boolean isCQLTable()
     {
-        for (int i = 0; i < ckCols.size(); i++)
-        {
-            if (ckCols.get(i) == null)
-            {
-                Integer idx;
-                AbstractType<?> type;
-                if (comparator.isCompound())
-                {
-                    idx = i;
-                    type = comparator.subtype(i);
-                }
-                else
-                {
-                    idx = null;
-                    type = comparator.asAbstractType();
-                }
-                ByteBuffer name = ByteBufferUtil.bytes(DEFAULT_COLUMN_ALIAS + (i + 1));
-                ColumnDefinition newDef = ColumnDefinition.clusteringKeyDef(this, name, type, idx);
-                addOrReplaceColumnDefinition(newDef);
-                ckCols.set(i, newDef);
-            }
-        }
-        return ckCols;
+        return !isSuper() && !isDense() && isCompound();
     }
 
-    private ColumnDefinition addDefaultValueAlias(ColumnDefinition compactValueDef)
+    public boolean isCompactTable()
     {
-        if (comparator.isDense())
-        {
-            if (compactValueDef != null)
-                return compactValueDef;
-
-            ColumnDefinition newDef = ColumnDefinition.compactValueDef(this, ByteBufferUtil.bytes(DEFAULT_VALUE_ALIAS), defaultValidator);
-            addOrReplaceColumnDefinition(newDef);
-            return newDef;
-        }
-        else
-        {
-            assert compactValueDef == null;
-            return null;
-        }
+        return !isCQLTable();
     }
 
-    /*
-     * We call dense a CF for which each component of the comparator is a clustering column, i.e. no
-     * component is used to store a regular column names. In other words, non-composite static "thrift"
-     * and CQL3 CF are *not* dense.
-     * We save whether the table is dense or not during table creation through CQL, but we don't have this
-     * information for table just created through thrift, nor for table prior to CASSANDRA-7744, so this
-     * method does its best to infer whether the table is dense or not based on other elements.
-     */
-    public static boolean calculateIsDense(AbstractType<?> comparator, Collection<ColumnDefinition> defs)
-    {
-        /*
-         * As said above, this method is only here because we need to deal with thrift upgrades.
-         * Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
-         * then we'll have saved the "is_dense" value and will be good to go.
-         *
-         * But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need
-         * to infer that information without relying on it in that case. And for the most part this is
-         * easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
-         * having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
-         * PRIMARY KEY defined.
-         *
-         * So we need to recognize those special case CQL3 table with only a primary key. If we have some
-         * clustering columns, we're fine as said above. So the only problem is that we cannot decide for
-         * sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
-         * has been created in CQL3 by say:
-         *    CREATE TABLE test (k int PRIMARY KEY)
-         * in which case it should not be dense. However, we can limit our margin of error by assuming we are
-         * in the latter case only if the comparator is exactly CompositeType(UTF8Type).
-         */
-        boolean hasRegular = false;
-        int maxClusteringIdx = -1;
-        for (ColumnDefinition def : defs)
-        {
-            switch (def.kind)
-            {
-                case CLUSTERING_COLUMN:
-                    maxClusteringIdx = Math.max(maxClusteringIdx, def.position());
-                    break;
-                case REGULAR:
-                    hasRegular = true;
-                    break;
-            }
-        }
-
-        return maxClusteringIdx >= 0
-             ? maxClusteringIdx == comparator.componentsCount() - 1
-             : !hasRegular && !isCQL3OnlyPKComparator(comparator);
-    }
-
-    private static boolean isCQL3OnlyPKComparator(AbstractType<?> comparator)
+    public boolean isStaticCompactTable()
     {
-        if (!(comparator instanceof CompositeType))
-            return false;
-
-        CompositeType ct = (CompositeType)comparator;
-        return ct.types.size() == 1 && ct.types.get(0) instanceof UTF8Type;
+        return !isSuper && !isDense() && !isCompound();
     }
 
-    public boolean isCQL3Table()
+    private static <T> boolean hasNoNulls(List<T> l)
     {
-        return !isSuper() && !comparator.isDense() && comparator.isCompound();
+        for (T t : l)
+            if (t == null)
+                return false;
+        return true;
     }
 
     private static <T> List<T> nullInitializedList(int size)
@@ -1412,34 +1286,50 @@ public final class CFMetaData
      */
     public boolean isThriftCompatible()
     {
-        // Super CF are always "thrift compatible". But since they may have defs with a componentIndex != null,
-        // we have to special case here.
-        if (isSuper())
-            return true;
+        return isCompactTable();
+    }
 
-        for (ColumnDefinition def : allColumns())
-        {
-            // Non-REGULAR ColumnDefinition are not "thrift compatible" per-se, but it's ok because they hold metadata
-            // this is only of use to CQL3, so we will just skip them in toThrift.
-            if (def.kind == ColumnDefinition.Kind.REGULAR && !def.isThriftCompatible())
-                return false;
-        }
+    public boolean isCounter()
+    {
+        return isCounter;
+    }
 
-        // The table might also have no REGULAR columns (be PK-only), but still be "thrift incompatible". See #7832.
-        if (isCQL3OnlyPKComparator(comparator.asAbstractType()) && !isDense)
-            return false;
+    public boolean hasStaticColumns()
+    {
+        return !partitionColumns.statics.isEmpty();
+    }
 
-        return true;
+    public boolean hasCollectionColumns()
+    {
+        for (ColumnDefinition def : partitionColumns())
+            if (def.type instanceof CollectionType && def.type.isMultiCell())
+                return true;
+        return false;
     }
 
-    public boolean isCounter()
+    // We call dense a CF for which each component of the comparator is a clustering column, i.e. no
+    // component is used to store a regular column names. In other words, non-composite static "thrift"
+    // and CQL3 CF are *not* dense.
+    public boolean isDense()
     {
-        return defaultValidator.isCounter();
+        return isDense;
     }
 
-    public boolean hasStaticColumns()
+    public boolean isCompound()
+    {
+        return isCompound;
+    }
+
+    public Serializers serializers()
+    {
+        return serializers;
+    }
+
+    public AbstractType<?> makeLegacyDefaultValidator()
     {
-        return !staticColumns.isEmpty();
+        return isCounter()
+             ? CounterColumnType.instance
+             : (isCompactTable() ? compactValueColumn().type : BytesType.instance);
     }
 
     @Override
@@ -1449,13 +1339,18 @@ public final class CFMetaData
             .append("cfId", cfId)
             .append("ksName", ksName)
             .append("cfName", cfName)
-            .append("cfType", cfType)
+            .append("isDense", isDense)
+            .append("isCompound", isCompound)
+            .append("isSuper", isSuper)
+            .append("isCounter", isCounter)
             .append("comparator", comparator)
+            .append("partitionColumns", partitionColumns)
+            .append("partitionKeyColumns", partitionKeyColumns)
+            .append("clusteringColumns", clusteringColumns)
             .append("comment", comment)
             .append("readRepairChance", readRepairChance)
             .append("dcLocalReadRepairChance", dcLocalReadRepairChance)
             .append("gcGraceSeconds", gcGraceSeconds)
-            .append("defaultValidator", defaultValidator)
             .append("keyValidator", keyValidator)
             .append("minCompactionThreshold", minCompactionThreshold)
             .append("maxCompactionThreshold", maxCompactionThreshold)
@@ -1472,7 +1367,234 @@ public final class CFMetaData
             .append("speculativeRetry", speculativeRetry)
             .append("droppedColumns", droppedColumns)
             .append("triggers", triggers.values())
-            .append("isDense", isDense)
             .toString();
     }
+
+    public static class Builder
+    {
+        private final String keyspace;
+        private final String table;
+        private final boolean isDense;
+        private final boolean isCompound;
+        private final boolean isSuper;
+        private final boolean isCounter;
+
+        private UUID tableId;
+
+        private final List<Pair<ColumnIdentifier, AbstractType>> partitionKeys = new ArrayList<>();
+        private final List<Pair<ColumnIdentifier, AbstractType>> clusteringColumns = new ArrayList<>();
+        private final List<Pair<ColumnIdentifier, AbstractType>> staticColumns = new ArrayList<>();
+        private final List<Pair<ColumnIdentifier, AbstractType>> regularColumns = new ArrayList<>();
+
+        private Builder(String keyspace, String table, boolean isDense, boolean isCompound, boolean isSuper, boolean isCounter)
+        {
+            this.keyspace = keyspace;
+            this.table = table;
+            this.isDense = isDense;
+            this.isCompound = isCompound;
+            this.isSuper = isSuper;
+            this.isCounter = isCounter;
+        }
+
+        public static Builder create(String keyspace, String table)
+        {
+            return create(keyspace, table, false, true, false);
+        }
+
+        public static Builder create(String keyspace, String table, boolean isDense, boolean isCompound, boolean isCounter)
+        {
+            return create(keyspace, table, isDense, isCompound, false, isCounter);
+        }
+
+        public static Builder create(String keyspace, String table, boolean isDense, boolean isCompound, boolean isSuper, boolean isCounter)
+        {
+            return new Builder(keyspace, table, isDense, isCompound, isSuper, isCounter);
+        }
+
+        public static Builder createDense(String keyspace, String table, boolean isCompound, boolean isCounter)
+        {
+            return create(keyspace, table, true, isCompound, isCounter);
+        }
+
+        public static Builder createSuper(String keyspace, String table, boolean isCounter)
+        {
+            return create(keyspace, table, false, false, true, isCounter);
+        }
+
+        public Builder withId(UUID tableId)
+        {
+            this.tableId = tableId;
+            return this;
+        }
+
+        public Builder addPartitionKey(String name, AbstractType type)
+        {
+            return addPartitionKey(ColumnIdentifier.getInterned(name, false), type);
+        }
+
+        public Builder addPartitionKey(ColumnIdentifier name, AbstractType type)
+        {
+            this.partitionKeys.add(Pair.create(name, type));
+            return this;
+        }
+
+        public Builder addClusteringColumn(String name, AbstractType type)
+        {
+            return addClusteringColumn(ColumnIdentifier.getInterned(name, false), type);
+        }
+
+        public Builder addClusteringColumn(ColumnIdentifier name, AbstractType type)
+        {
+            this.clusteringColumns.add(Pair.create(name, type));
+            return this;
+        }
+
+        public Builder addRegularColumn(String name, AbstractType type)
+        {
+            return addRegularColumn(ColumnIdentifier.getInterned(name, false), type);
+        }
+
+        public Builder addRegularColumn(ColumnIdentifier name, AbstractType type)
+        {
+            this.regularColumns.add(Pair.create(name, type));
+            return this;
+        }
+
+        public boolean hasRegulars()
+        {
+            return !this.regularColumns.isEmpty();
+        }
+
+        public Builder addStaticColumn(String name, AbstractType type)
+        {
+            return addStaticColumn(ColumnIdentifier.getInterned(name, false), type);
+        }
+
+        public Builder addStaticColumn(ColumnIdentifier name, AbstractType type)
+        {
+            this.staticColumns.add(Pair.create(name, type));
+            return this;
+        }
+
+        public Set<String> usedColumnNames()
+        {
+            Set<String> usedNames = new HashSet<>();
+            for (Pair<ColumnIdentifier, AbstractType> p : partitionKeys)
+                usedNames.add(p.left.toString());
+            for (Pair<ColumnIdentifier, AbstractType> p : clusteringColumns)
+                usedNames.add(p.left.toString());
+            for (Pair<ColumnIdentifier, AbstractType> p : staticColumns)
+                usedNames.add(p.left.toString());
+            for (Pair<ColumnIdentifier, AbstractType> p : regularColumns)
+                usedNames.add(p.left.toString());
+            return usedNames;
+        }
+
+        public CFMetaData build()
+        {
+            if (tableId == null)
+                tableId = UUIDGen.getTimeUUID();
+
+            List<ColumnDefinition> partitions = new ArrayList<>(partitionKeys.size());
+            List<ColumnDefinition> clusterings = new ArrayList<>(clusteringColumns.size());
+            PartitionColumns.Builder builder = PartitionColumns.builder();
+
+            for (int i = 0; i < partitionKeys.size(); i++)
+            {
+                Pair<ColumnIdentifier, AbstractType> p = partitionKeys.get(i);
+                Integer componentIndex = partitionKeys.size() == 1 ? null : i;
+                partitions.add(new ColumnDefinition(keyspace, table, p.left, p.right, componentIndex, ColumnDefinition.Kind.PARTITION_KEY));
+            }
+
+            for (int i = 0; i < clusteringColumns.size(); i++)
+            {
+                Pair<ColumnIdentifier, AbstractType> p = clusteringColumns.get(i);
+                clusterings.add(new ColumnDefinition(keyspace, table, p.left, p.right, i, ColumnDefinition.Kind.CLUSTERING_COLUMN));
+            }
+
+            for (int i = 0; i < regularColumns.size(); i++)
+            {
+                Pair<ColumnIdentifier, AbstractType> p = regularColumns.get(i);
+                builder.add(new ColumnDefinition(keyspace, table, p.left, p.right, null, ColumnDefinition.Kind.REGULAR));
+            }
+
+            for (int i = 0; i < staticColumns.size(); i++)
+            {
+                Pair<ColumnIdentifier, AbstractType> p = staticColumns.get(i);
+                builder.add(new ColumnDefinition(keyspace, table, p.left, p.right, null, ColumnDefinition.Kind.STATIC));
+            }
+
+            return new CFMetaData(keyspace,
+                                  table,
+                                  tableId,
+                                  isSuper,
+                                  isCounter,
+                                  isDense,
+                                  isCompound,
+                                  partitions,
+                                  clusterings,
+                                  builder.build());
+        }
+    }
+
+    // We don't use UUIDSerializer below because we want to use this with vint-encoded streams and UUIDSerializer
+    // currently assumes a NATIVE encoding. This is also why we don't use writeLong()/readLong in the methods below:
+    // this would encode the values, but by design of UUID it is likely that both long will be very big numbers
+    // and so we have a fair change that the encoding will take more than 16 bytes which is not desirable. Note that
+    // we could make UUIDSerializer work as the serializer below, but I'll keep that to later.
+    public static class Serializer
+    {
+        private static void writeLongAsSeparateBytes(long value, DataOutputPlus out) throws IOException
+        {
+            for (int i = 7; i >= 0; i--)
+                out.writeByte((int)((value >> (8 * i)) & 0xFF));
+        }
+
+        private static long readLongAsSeparateBytes(DataInput in) throws IOException
+        {
+            long val = 0;
+            for (int i = 7; i >= 0; i--)
+                val |= ((long)in.readUnsignedByte()) << (8 * i);
+            return val;
+        }
+
+        public void serialize(CFMetaData metadata, DataOutputPlus out, int version) throws IOException
+        {
+            writeLongAsSeparateBytes(metadata.cfId.getMostSignificantBits(), out);
+            writeLongAsSeparateBytes(metadata.cfId.getLeastSignificantBits(), out);
+        }
+
+        public CFMetaData deserialize(DataInput in, int version) throws IOException
+        {
+            UUID cfId = new UUID(readLongAsSeparateBytes(in), readLongAsSeparateBytes(in));
+            CFMetaData metadata = Schema.instance.getCFMetaData(cfId);
+            if (metadata == null)
+            {
+                String message = String.format("Couldn't find table for cfId %s. If a table was just " +
+                        "created, this is likely due to the schema not being fully propagated.  Please wait for schema " +
+                        "agreement on table creation.", cfId);
+                throw new UnknownColumnFamilyException(message, cfId);
+            }
+
+            return metadata;
+        }
+
+        public long serializedSize(CFMetaData metadata, int version, TypeSizes sizes)
+        {
+            // We've made sure it was encoded as 16 bytes whatever the TypeSizes is.
+            return 16;
+        }
+    }
+
+    public static class DroppedColumn
+    {
+        public final AbstractType<?> type;
+        public final long droppedTime;
+
+        public DroppedColumn(AbstractType<?> type, long droppedTime)
+        {
+            this.type = type;
+            this.droppedTime = droppedTime;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index b33718f..ea00816 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -26,18 +26,19 @@ import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class ColumnDefinition extends ColumnSpecification
+public class ColumnDefinition extends ColumnSpecification implements Comparable<ColumnDefinition>
 {
     /*
      * The type of CQL3 column this definition represents.
-     * There is 3 main type of CQL3 columns: those parts of the partition key,
-     * those parts of the clustering key and the other, regular ones.
-     * But when COMPACT STORAGE is used, there is by design only one regular
-     * column, whose name is not stored in the data contrarily to the column of
-     * type REGULAR. Hence the COMPACT_VALUE type to distinguish it below.
+     * There is 4 main type of CQL3 columns: those parts of the partition key,
+     * those parts of the clustering key and amongst the others, regular and
+     * static ones.
      *
      * Note that thrift only knows about definitions of type REGULAR (and
      * the ones whose componentIndex == null).
@@ -47,8 +48,12 @@ public class ColumnDefinition extends ColumnSpecification
         PARTITION_KEY,
         CLUSTERING_COLUMN,
         REGULAR,
-        STATIC,
-        COMPACT_VALUE
+        STATIC;
+
+        public boolean isPrimaryKeyKind()
+        {
+            return this == PARTITION_KEY || this == CLUSTERING_COLUMN;
+        }
     }
 
     public final Kind kind;
@@ -64,14 +69,17 @@ public class ColumnDefinition extends ColumnSpecification
      */
     private final Integer componentIndex;
 
+    private final Comparator<CellPath> cellPathComparator;
+    private final Comparator<Cell> cellComparator;
+
     public static ColumnDefinition partitionKeyDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
         return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.PARTITION_KEY);
     }
 
-    public static ColumnDefinition partitionKeyDef(String ksName, String cfName, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    public static ColumnDefinition partitionKeyDef(String ksName, String cfName, String name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(ksName, cfName, new ColumnIdentifier(name, UTF8Type.instance), validator, null, null, null, componentIndex, Kind.PARTITION_KEY);
+        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true), validator, null, null, null, componentIndex, Kind.PARTITION_KEY);
     }
 
     public static ColumnDefinition clusteringKeyDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
@@ -79,26 +87,31 @@ public class ColumnDefinition extends ColumnSpecification
         return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.CLUSTERING_COLUMN);
     }
 
+    public static ColumnDefinition clusteringKeyDef(String ksName, String cfName, String name, AbstractType<?> validator, Integer componentIndex)
+    {
+        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true),  validator, null, null, null, componentIndex, Kind.CLUSTERING_COLUMN);
+    }
+
     public static ColumnDefinition regularDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
         return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.REGULAR);
     }
 
-    public static ColumnDefinition staticDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    public static ColumnDefinition regularDef(String ksName, String cfName, String name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.STATIC);
+        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true), validator, null, null, null, componentIndex, Kind.REGULAR);
     }
 
-    public static ColumnDefinition compactValueDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator)
+    public static ColumnDefinition staticDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(cfm, name, validator, null, Kind.COMPACT_VALUE);
+        return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.STATIC);
     }
 
     public ColumnDefinition(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex, Kind kind)
     {
         this(cfm.ksName,
              cfm.cfName,
-             new ColumnIdentifier(name, cfm.getComponentComparator(componentIndex, kind)),
+             ColumnIdentifier.getInterned(name, cfm.getColumnDefinitionNameComparator(kind)),
              validator,
              null,
              null,
@@ -107,6 +120,11 @@ public class ColumnDefinition extends ColumnSpecification
              kind);
     }
 
+    public ColumnDefinition(String ksName, String cfName, ColumnIdentifier name, AbstractType<?> type, Integer componentIndex, Kind kind)
+    {
+        this(ksName, cfName, name, type, null, null, null, componentIndex, kind);
+    }
+
     @VisibleForTesting
     public ColumnDefinition(String ksName,
                             String cfName,
@@ -119,11 +137,55 @@ public class ColumnDefinition extends ColumnSpecification
                             Kind kind)
     {
         super(ksName, cfName, name, validator);
-        assert name != null && validator != null;
+        assert name != null && validator != null && kind != null;
+        assert name.isInterned();
         this.kind = kind;
         this.indexName = indexName;
         this.componentIndex = componentIndex;
         this.setIndexType(indexType, indexOptions);
+        this.cellPathComparator = makeCellPathComparator(kind, validator);
+        this.cellComparator = makeCellComparator(cellPathComparator);
+    }
+
+    private static Comparator<CellPath> makeCellPathComparator(Kind kind, AbstractType<?> validator)
+    {
+        if (kind.isPrimaryKeyKind() || !validator.isCollection() || !validator.isMultiCell())
+            return null;
+
+        final CollectionType type = (CollectionType)validator;
+        return new Comparator<CellPath>()
+        {
+            public int compare(CellPath path1, CellPath path2)
+            {
+                if (path1.size() == 0 || path2.size() == 0)
+                {
+                    if (path1 == CellPath.BOTTOM)
+                        return path2 == CellPath.BOTTOM ? 0 : -1;
+                    if (path1 == CellPath.TOP)
+                        return path2 == CellPath.TOP ? 0 : 1;
+                    return path2 == CellPath.BOTTOM ? 1 : -1;
+                }
+
+                // This will get more complicated once we have non-frozen UDT and nested collections
+                assert path1.size() == 1 && path2.size() == 1;
+                return type.nameComparator().compare(path1.get(0), path2.get(0));
+            }
+        };
+    }
+
+    private static Comparator<Cell> makeCellComparator(final Comparator<CellPath> cellPathComparator)
+    {
+        return new Comparator<Cell>()
+        {
+            public int compare(Cell c1, Cell c2)
+            {
+                int cmp = c1.column().compareTo(c2.column());
+                if (cmp != 0 || cellPathComparator == null)
+                    return cmp;
+
+                return cellPathComparator.compare(c1.path(), c2.path());
+            }
+        };
     }
 
     public ColumnDefinition copy()
@@ -166,11 +228,6 @@ public class ColumnDefinition extends ColumnSpecification
         return kind == Kind.REGULAR;
     }
 
-    public boolean isCompactValue()
-    {
-        return kind == Kind.COMPACT_VALUE;
-    }
-
     // The componentIndex. This never return null however for convenience sake:
     // if componentIndex == null, this return 0. So caller should first check
     // isOnAllComponents() to distinguish if that's a possibility.
@@ -220,23 +277,26 @@ public class ColumnDefinition extends ColumnSpecification
                       .toString();
     }
 
-    public boolean isThriftCompatible()
-    {
-        return kind == ColumnDefinition.Kind.REGULAR && componentIndex == null;
-    }
-
     public boolean isPrimaryKeyColumn()
     {
-        return kind == Kind.PARTITION_KEY || kind == Kind.CLUSTERING_COLUMN;
+        return kind.isPrimaryKeyKind();
     }
 
     /**
      * Whether the name of this definition is serialized in the cell nane, i.e. whether
      * it's not just a non-stored CQL metadata.
      */
-    public boolean isPartOfCellName()
+    public boolean isPartOfCellName(boolean isCQL3Table, boolean isSuper)
     {
-        return kind == Kind.REGULAR || kind == Kind.STATIC;
+        // When converting CQL3 tables to thrift, any regular or static column ends up in the cell name.
+        // When it's a compact table however, the REGULAR definition is the name for the cell value of "dynamic"
+        // column (so it's not part of the cell name) and it's static columns that ends up in the cell name.
+        if (isCQL3Table)
+            return kind == Kind.REGULAR || kind == Kind.STATIC;
+        else if (isSuper)
+            return kind == Kind.REGULAR;
+        else
+            return kind == Kind.STATIC;
     }
 
     public ColumnDefinition apply(ColumnDefinition def)  throws ConfigurationException
@@ -333,4 +393,87 @@ public class ColumnDefinition extends ColumnSpecification
             }
         });
     }
+
+    public int compareTo(ColumnDefinition other)
+    {
+        if (this == other)
+            return 0;
+
+        if (kind != other.kind)
+            return kind.ordinal() < other.kind.ordinal() ? -1 : 1;
+        if (position() != other.position())
+            return position() < other.position() ? -1 : 1;
+
+        if (isStatic() != other.isStatic())
+            return isStatic() ? -1 : 1;
+        if (isComplex() != other.isComplex())
+            return isComplex() ? 1 : -1;
+
+        return ByteBufferUtil.compareUnsigned(name.bytes, other.name.bytes);
+    }
+
+    public Comparator<CellPath> cellPathComparator()
+    {
+        return cellPathComparator;
+    }
+
+    public Comparator<Cell> cellComparator()
+    {
+        return cellComparator;
+    }
+
+    public boolean isComplex()
+    {
+        return cellPathComparator != null;
+    }
+
+    public CellPath.Serializer cellPathSerializer()
+    {
+        // Collections are our only complex so far, so keep it simple
+        return CollectionType.cellPathSerializer;
+    }
+
+    public void validateCellValue(ByteBuffer value)
+    {
+        type.validateCellValue(value);
+    }
+
+    public void validateCellPath(CellPath path)
+    {
+        if (!isComplex())
+            throw new MarshalException("Only complex cells should have a cell path");
+
+        assert type instanceof CollectionType;
+        ((CollectionType)type).nameComparator().validate(path.get(0));
+    }
+
+    public static String toCQLString(Iterable<ColumnDefinition> defs)
+    {
+        return toCQLString(defs.iterator());
+    }
+
+    public static String toCQLString(Iterator<ColumnDefinition> defs)
+    {
+        if (!defs.hasNext())
+            return "";
+
+        StringBuilder sb = new StringBuilder();
+        sb.append(defs.next().name);
+        while (defs.hasNext())
+            sb.append(", ").append(defs.next().name);
+        return sb.toString();
+    }
+
+    /**
+     * The type of the cell values for cell belonging to this column.
+     *
+     * This is the same than the column type, except for collections where it's the 'valueComparator'
+     * of the collection.
+     */
+    public AbstractType<?> cellValueType()
+    {
+        return type instanceof CollectionType
+             ? ((CollectionType)type).valueComparator()
+             : type;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 913d23c..f0dda09 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1341,9 +1341,15 @@ public class DatabaseDescriptor
     }
 
     @VisibleForTesting
-    public static void setAutoSnapshot(boolean autoSnapshot) {
+    public static void setAutoSnapshot(boolean autoSnapshot)
+    {
         conf.auto_snapshot = autoSnapshot;
     }
+    @VisibleForTesting
+    public static boolean getAutoSnapshot()
+    {
+        return conf.auto_snapshot;
+    }
 
     public static boolean isAutoBootstrap()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index 8331f85..062f64d 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -70,8 +70,11 @@ public class YamlConfigurationLoader implements ConfigurationLoader
             {
                 String required = "file:" + File.separator + File.separator;
                 if (!configUrl.startsWith(required))
-                    throw new ConfigurationException("Expecting URI in variable: [cassandra.config].  Please prefix the file with " + required + File.separator +
-                            " for local files or " + required + "<server>" + File.separator + " for remote files. Aborting. If you are executing this from an external tool, it needs to set Config.setClientMode(true) to avoid loading configuration.");
+                    throw new ConfigurationException(String.format(
+                        "Expecting URI in variable: [cassandra.config]. Found[%s]. Please prefix the file with [%s%s] for local " +
+                        "files and [%s<server>%s] for remote files. If you are executing this from an external tool, it needs " +
+                        "to set Config.setClientMode(true) to avoid loading configuration.",
+                        configUrl, required, File.separator, required, File.separator));
                 throw new ConfigurationException("Cannot locate " + configUrl + ".  If this is a local file, please confirm you've provided " + required + File.separator + " as a URI prefix.");
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index 7b38e9f..97bdcd1 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.ExpiringCell;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -36,6 +35,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public class Attributes
 {
+    public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
+
     private final Term timestamp;
     private final Term timeToLive;
 
@@ -121,8 +122,8 @@ public class Attributes
         if (ttl < 0)
             throw new InvalidRequestException("A TTL must be greater or equal to 0, but was " + ttl);
 
-        if (ttl > ExpiringCell.MAX_TTL)
-            throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", ttl, ExpiringCell.MAX_TTL));
+        if (ttl > MAX_TTL)
+            throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", ttl, MAX_TTL));
 
         return ttl;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/CQL3Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Row.java b/src/java/org/apache/cassandra/cql3/CQL3Row.java
deleted file mode 100644
index e3e76d1..0000000
--- a/src/java/org/apache/cassandra/cql3/CQL3Row.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.cassandra.db.Cell;
-
-public interface CQL3Row
-{
-    public ByteBuffer getClusteringColumn(int i);
-    public Cell getColumn(ColumnIdentifier name);
-    public List<Cell> getMultiCellColumn(ColumnIdentifier name);
-
-    public interface Builder
-    {
-        public RowIterator group(Iterator<Cell> cells);
-    }
-
-    public interface RowIterator extends Iterator<CQL3Row>
-    {
-        public CQL3Row getStaticRow();
-    }
-}