You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/10/30 17:10:20 UTC

[4/4] git commit: Remove CFDefinition

Remove CFDefinition

patch by slebresne; reviewed by iamaleksey for CASSANDRA-6253


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5f5905d5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5f5905d5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5f5905d5

Branch: refs/heads/trunk
Commit: 5f5905d51561b2adb616a9b1b0b29f1358372ba4
Parents: 91d60fb
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Oct 25 15:13:41 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Oct 30 17:10:09 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java | 534 +++++++++++--------
 .../cassandra/config/ColumnDefinition.java      | 239 +++++----
 .../org/apache/cassandra/config/Schema.java     |  31 --
 .../cassandra/config/TriggerDefinition.java     |   4 +-
 .../cassandra/cql/AlterTableStatement.java      |   4 +-
 .../cql/CreateColumnFamilyStatement.java        |  11 +-
 .../apache/cassandra/cql/QueryProcessor.java    |  19 +-
 .../apache/cassandra/cql/SelectStatement.java   |  10 -
 .../apache/cassandra/cql/UpdateStatement.java   |  14 +-
 .../org/apache/cassandra/cql3/CFDefinition.java | 297 -----------
 .../apache/cassandra/cql3/ColumnIdentifier.java |  16 +-
 .../cassandra/cql3/ColumnNameBuilder.java       |   2 +
 .../cassandra/cql3/ColumnSpecification.java     |   7 -
 .../org/apache/cassandra/cql3/Constants.java    |   8 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |  12 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |   8 +-
 .../org/apache/cassandra/cql3/Operation.java    |  19 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |   8 +-
 src/java/org/apache/cassandra/cql3/Sets.java    |   6 +-
 .../apache/cassandra/cql3/UntypedResultSet.java |   2 +-
 .../apache/cassandra/cql3/UpdateParameters.java |   4 +-
 .../cassandra/cql3/functions/TokenFct.java      |  14 +-
 .../cql3/statements/AlterTableStatement.java    |  91 ++--
 .../cql3/statements/CreateIndexStatement.java   |  15 +-
 .../cql3/statements/CreateTableStatement.java   |  32 +-
 .../cql3/statements/DeleteStatement.java        |  22 +-
 .../cql3/statements/ModificationStatement.java  | 120 ++---
 .../cql3/statements/SelectStatement.java        | 358 +++++++------
 .../cassandra/cql3/statements/Selection.java    | 103 ++--
 .../cql3/statements/UpdateStatement.java        |  69 +--
 .../apache/cassandra/db/BatchlogManager.java    |   2 +-
 src/java/org/apache/cassandra/db/Column.java    |  10 +-
 .../cassandra/db/filter/ExtendedFilter.java     |  18 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |   4 +-
 .../cassandra/db/index/SecondaryIndex.java      |  12 +-
 .../db/index/SecondaryIndexManager.java         |  10 +-
 .../db/index/composites/CompositesIndex.java    |   8 +-
 .../CompositesIndexOnClusteringKey.java         |  22 +-
 .../CompositesIndexOnPartitionKey.java          |   8 +-
 .../composites/CompositesIndexOnRegular.java    |  12 +-
 .../db/index/composites/CompositesSearcher.java |   2 +-
 .../cassandra/db/index/keys/KeysIndex.java      |   4 +-
 .../cassandra/db/marshal/CompositeType.java     |   8 +
 .../hadoop/pig/AbstractCassandraStorage.java    |  34 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  13 +-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../cassandra/service/pager/QueryPagers.java    |   2 +-
 .../cassandra/thrift/ThriftValidation.java      |  16 +-
 .../apache/cassandra/tools/SSTableExport.java   |   2 +-
 .../apache/cassandra/tools/SSTableImport.java   |   2 +-
 .../org/apache/cassandra/tracing/Tracing.java   |   2 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  87 +--
 .../cassandra/config/ColumnDefinitionTest.java  |  19 +-
 .../org/apache/cassandra/config/DefsTest.java   |  23 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   4 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  18 +-
 .../cassandra/thrift/ThriftValidationTest.java  |   4 +-
 58 files changed, 1114 insertions(+), 1314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a8f7fd9..e51f598 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
  * Remove 1.2 network compatibility code (CASSANDRA-5960)
  * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
 
 
 2.0.3

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 2fa2221..e2bd3eb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -27,6 +27,7 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.ArrayUtils;
@@ -37,10 +38,7 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.CFDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
@@ -89,6 +87,15 @@ public final class CFMetaData
     // Note that this is the default only for user created tables
     public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
 
+    // Note that this need to come *before* any CFMetaData is defined so before the compile below.
+    private static final Comparator<ColumnDefinition> regularColumnComparator = new Comparator<ColumnDefinition>()
+    {
+        public int compare(ColumnDefinition def1, ColumnDefinition def2)
+        {
+            return def1.name.compareTo(def2.name);
+        }
+    };
+
     public static final CFMetaData IndexCf = compile("CREATE TABLE \"" + SystemKeyspace.INDEX_CF + "\" ("
                                                      + "table_name text,"
                                                      + "index_name text,"
@@ -389,11 +396,11 @@ public final class CFMetaData
     private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
     private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
     private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
-    private volatile Map<ByteBuffer, Long> droppedColumns = new HashMap<>();
+    private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
     private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
 
     /*
-     * All CQL3 columns definition are stored in the column_metadata map.
+     * All CQL3 columns definition are stored in the columnMetadata map.
      * On top of that, we keep separated collection of each kind of definition, to
      * 1) allow easy access to each kind and 2) for the partition key and
      * clustering key ones, those list are ordered by the "component index" of the
@@ -403,10 +410,10 @@ public final class CFMetaData
     public static final String DEFAULT_COLUMN_ALIAS = "column";
     public static final String DEFAULT_VALUE_ALIAS = "value";
 
-    private volatile Map<ByteBuffer, ColumnDefinition> column_metadata = new HashMap<>();
+    private volatile Map<ByteBuffer, ColumnDefinition> columnMetadata = new HashMap<>();
     private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
-    private volatile List<ColumnDefinition> clusteringKeyColumns; // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
-    private volatile Set<ColumnDefinition> regularColumns;
+    private volatile List<ColumnDefinition> clusteringColumns;    // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
+    private volatile SortedSet<ColumnDefinition> regularColumns;  // We use a sorted set so iteration is of predictable order (for SELECT for instance)
     private volatile ColumnDefinition compactValueColumn;
 
     public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -414,11 +421,6 @@ public final class CFMetaData
 
     public volatile CompressionParameters compressionParameters = new CompressionParameters(null);
 
-    // Processed infos used by CQL. This can be fully reconstructed from the CFMedata,
-    // so it's not saved on disk. It is however costlyish to recreate for each query
-    // so we cache it here (and update on each relevant CFMetadata change)
-    private volatile CFDefinition cqlCfDef;
-
     public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;}
     public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
     public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
@@ -428,7 +430,7 @@ public final class CFMetaData
     public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;}
     public CFMetaData minCompactionThreshold(int prop) {minCompactionThreshold = prop; return this;}
     public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
-    public CFMetaData columnMetadata(Map<ByteBuffer,ColumnDefinition> prop) {column_metadata = prop; return this;}
+    public CFMetaData columnMetadata(Map<ByteBuffer,ColumnDefinition> prop) {columnMetadata = prop; return this;}
     public CFMetaData compactionStrategyClass(Class<? extends AbstractCompactionStrategy> prop) {compactionStrategyClass = prop; return this;}
     public CFMetaData compactionStrategyOptions(Map<String, String> prop) {compactionStrategyOptions = prop; return this;}
     public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
@@ -439,7 +441,7 @@ public final class CFMetaData
     public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
     public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
     public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;}
-    public CFMetaData droppedColumns(Map<ByteBuffer, Long> cols) {droppedColumns = cols; return this;}
+    public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
     public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
 
     public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
@@ -528,7 +530,7 @@ public final class CFMetaData
                              : Caching.NONE;
 
         return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, columnComparator, (AbstractType)null)
-                             .keyValidator(info.getValidator())
+                             .keyValidator(info.type)
                              .readRepairChance(0.0)
                              .dcLocalReadRepairChance(0.0)
                              .gcGraceSeconds(0)
@@ -564,10 +566,10 @@ public final class CFMetaData
     static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
     {
         Map<ByteBuffer, ColumnDefinition> clonedColumns = new HashMap<>();
-        for (ColumnDefinition cd : oldCFMD.column_metadata.values())
+        for (ColumnDefinition cd : oldCFMD.allColumns())
         {
             ColumnDefinition cloned = cd.copy();
-            clonedColumns.put(cloned.name, cloned);
+            clonedColumns.put(cloned.name.bytes, cloned);
         }
 
         return newCFMD.comment(oldCFMD.comment)
@@ -606,7 +608,7 @@ public final class CFMetaData
     public String indexColumnFamilyName(ColumnDefinition info)
     {
         // TODO simplify this when info.index_name is guaranteed to be set
-        return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name) : info.getIndexName());
+        return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name.bytes) : info.getIndexName());
     }
 
     public String getComment()
@@ -682,16 +684,9 @@ public final class CFMetaData
         if (partitionKeyColumns.size() > 1)
             throw new IllegalStateException("Cannot acces column family with composite key from CQL < 3.0.0");
 
-        try
-        {
-            // For compatibility sake, we uppercase if it's the default alias as we used to return it that way in resultsets.
-            String str = ByteBufferUtil.string(partitionKeyColumns.get(0).name);
-            return str.equalsIgnoreCase(DEFAULT_KEY_ALIAS) ? str.toUpperCase() : str;
-        }
-        catch (CharacterCodingException e)
-        {
-            throw new RuntimeException(e.getMessage(), e);
-        }
+        // For compatibility sake, we uppercase if it's the default alias as we used to return it that way in resultsets.
+        String str = partitionKeyColumns.get(0).name.toString();
+        return str.equalsIgnoreCase(DEFAULT_KEY_ALIAS) ? str.toUpperCase() : str;
     }
 
     public CompressionParameters compressionParameters()
@@ -701,7 +696,42 @@ public final class CFMetaData
 
     public Collection<ColumnDefinition> allColumns()
     {
-        return column_metadata.values();
+        return columnMetadata.values();
+    }
+
+    // An iterator over all column definitions but that respect the order of a SELECT *.
+    public Iterator<ColumnDefinition> allColumnsInSelectOrder()
+    {
+        return new AbstractIterator<ColumnDefinition>()
+        {
+            private final Iterator<ColumnDefinition> partitionKeyIter = partitionKeyColumns.iterator();
+            private final Iterator<ColumnDefinition> clusteringIter = clusteringColumns.iterator();
+            private boolean valueDone;
+            private final Iterator<ColumnDefinition> regularIter = regularColumns.iterator();
+
+            protected ColumnDefinition computeNext()
+            {
+                if (partitionKeyIter.hasNext())
+                    return partitionKeyIter.next();
+
+                if (clusteringIter.hasNext())
+                    return clusteringIter.next();
+
+                if (compactValueColumn != null && !valueDone)
+                {
+                    valueDone = true;
+                    // If the compactValueColumn is empty, this means we have a dense table but
+                    // with only a PK. As far as selects are concerned, we should ignore the value.
+                    if (compactValueColumn.name.bytes.hasRemaining())
+                        return compactValueColumn;
+                }
+
+                if (regularIter.hasNext())
+                    return regularIter.next();
+
+                return endOfData();
+            }
+        };
     }
 
     public List<ColumnDefinition> partitionKeyColumns()
@@ -709,9 +739,9 @@ public final class CFMetaData
         return partitionKeyColumns;
     }
 
-    public List<ColumnDefinition> clusteringKeyColumns()
+    public List<ColumnDefinition> clusteringColumns()
     {
-        return clusteringKeyColumns;
+        return clusteringColumns;
     }
 
     public Set<ColumnDefinition> regularColumns()
@@ -724,6 +754,20 @@ public final class CFMetaData
         return compactValueColumn;
     }
 
+    public ColumnNameBuilder getKeyNameBuilder()
+    {
+        return keyValidator instanceof CompositeType
+             ? new CompositeType.Builder((CompositeType)keyValidator)
+             : new NonCompositeBuilder(keyValidator);
+    }
+
+    public ColumnNameBuilder getColumnNameBuilder()
+    {
+        return comparator instanceof CompositeType
+             ? new CompositeType.Builder((CompositeType)comparator)
+             : new NonCompositeBuilder(comparator);
+    }
+
     public double getBloomFilterFpChance()
     {
         // we disallow bFFPC==null starting in 1.2.1 but tolerated it before that
@@ -757,7 +801,7 @@ public final class CFMetaData
         return defaultTimeToLive;
     }
 
-    public Map<ByteBuffer, Long> getDroppedColumns()
+    public Map<ColumnIdentifier, Long> getDroppedColumns()
     {
         return droppedColumns;
     }
@@ -789,7 +833,7 @@ public final class CFMetaData
             .append(minCompactionThreshold, rhs.minCompactionThreshold)
             .append(maxCompactionThreshold, rhs.maxCompactionThreshold)
             .append(cfId, rhs.cfId)
-            .append(column_metadata, rhs.column_metadata)
+            .append(columnMetadata, rhs.columnMetadata)
             .append(compactionStrategyClass, rhs.compactionStrategyClass)
             .append(compactionStrategyOptions, rhs.compactionStrategyOptions)
             .append(compressionParameters, rhs.compressionParameters)
@@ -822,7 +866,7 @@ public final class CFMetaData
             .append(minCompactionThreshold)
             .append(maxCompactionThreshold)
             .append(cfId)
-            .append(column_metadata)
+            .append(columnMetadata)
             .append(compactionStrategyClass)
             .append(compactionStrategyOptions)
             .append(compressionParameters)
@@ -838,7 +882,17 @@ public final class CFMetaData
             .toHashCode();
     }
 
-    public AbstractType<?> getValueValidator(ByteBuffer column)
+    public AbstractType<?> getValueValidatorFromCellName(ByteBuffer cellName)
+    {
+        // If this is a CQL table, we need to pull out the CQL column name to look up the correct column type.
+        if (!hasCompositeComparator() || isDense())
+            return getValueValidator(new ColumnIdentifier(cellName, comparator));
+
+        ByteBuffer name = ((CompositeType)comparator).extractLastComponent(cellName);
+        return getValueValidator(new ColumnIdentifier(name, UTF8Type.instance));
+    }
+
+    public AbstractType<?> getValueValidator(ColumnIdentifier column)
     {
         return getValueValidator(getColumnDefinition(column));
     }
@@ -847,7 +901,7 @@ public final class CFMetaData
     {
         return columnDefinition == null
                ? defaultValidator
-               : columnDefinition.getValidator();
+               : columnDefinition.type;
     }
 
     /** applies implicit defaults to cf definition. useful in updates */
@@ -930,13 +984,14 @@ public final class CFMetaData
             if (cf_def.isSetKey_validation_class()) { newCFMD.keyValidator(TypeParser.parse(cf_def.key_validation_class)); }
             if (cf_def.isSetKey_alias() && !(newCFMD.keyValidator instanceof CompositeType))
             {
-                newCFMD.column_metadata.put(cf_def.key_alias, ColumnDefinition.partitionKeyDef(cf_def.key_alias, newCFMD.keyValidator, null));
+                newCFMD.columnMetadata.put(cf_def.key_alias,
+                                           ColumnDefinition.partitionKeyDef(newCFMD, cf_def.key_alias, newCFMD.keyValidator, null));
             }
 
             return newCFMD.comment(cf_def.comment)
                           .replicateOnWrite(cf_def.replicate_on_write)
                           .defaultValidator(TypeParser.parse(cf_def.default_validation_class))
-                          .columnMetadata(ColumnDefinition.fromThrift(cf_def.column_metadata, newCFMD.isSuper()))
+                          .columnMetadata(ColumnDefinition.fromThrift(newCFMD, cf_def.column_metadata))
                           .compressionParameters(cp)
                           .rebuild();
         }
@@ -1025,19 +1080,19 @@ public final class CFMetaData
         if (!cfm.droppedColumns.isEmpty())
             droppedColumns = cfm.droppedColumns;
 
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, cfm.column_metadata);
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, cfm.columnMetadata);
         // columns that are no longer needed
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
-            column_metadata.remove(cd.name);
+            removeColumnDefinition(cd);
         // newly added columns
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
-            column_metadata.put(cd.name, cd);
+            addColumnDefinition(cd);
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
         {
-            ColumnDefinition oldDef = column_metadata.get(name);
-            ColumnDefinition def = cfm.column_metadata.get(name);
-            oldDef.apply(def, getColumnDefinitionComparator(oldDef));
+            ColumnDefinition oldDef = columnMetadata.get(name);
+            ColumnDefinition def = cfm.columnMetadata.get(name);
+            addOrReplaceColumnDefinition(oldDef.apply(def));
         }
 
         compactionStrategyClass = cfm.compactionStrategyClass;
@@ -1158,8 +1213,8 @@ public final class CFMetaData
         def.setMax_compaction_threshold(maxCompactionThreshold);
         // We only return the alias if only one is set since thrift don't know about multiple key aliases
         if (partitionKeyColumns.size() == 1)
-            def.setKey_alias(partitionKeyColumns.get(0).name);
-        def.setColumn_metadata(ColumnDefinition.toThrift(column_metadata));
+            def.setKey_alias(partitionKeyColumns.get(0).name.bytes);
+        def.setColumn_metadata(ColumnDefinition.toThrift(columnMetadata));
         def.setCompaction_strategy(compactionStrategyClass.getName());
         def.setCompaction_strategy_options(new HashMap<>(compactionStrategyOptions));
         def.setCompression_options(compressionParameters.asThriftOptions());
@@ -1177,39 +1232,43 @@ public final class CFMetaData
 
     /**
      * Returns the ColumnDefinition for {@code name}.
-     *
-     * Note that {@code name} correspond to the returned ColumnDefinition name,
-     * and in particular for composite cfs, it should usually be only a
-     * component of the full column name. If you have a full column name, use
-     * getColumnDefinitionFromColumnName instead.
      */
+    public ColumnDefinition getColumnDefinition(ColumnIdentifier name)
+    {
+        return columnMetadata.get(name.bytes);
+    }
+
+    // In general it is preferable to work with ColumnIdentifier to make it
+    // clear that we are talking about a CQL column, not a cell name, but there
+    // is a few cases where all we have is a ByteBuffer (when dealing with IndexExpression
+    // for instance) so...
     public ColumnDefinition getColumnDefinition(ByteBuffer name)
     {
-            return column_metadata.get(name);
+        return columnMetadata.get(name);
     }
 
     /**
      * Returns a ColumnDefinition given a full (internal) column name.
      */
-    public ColumnDefinition getColumnDefinitionFromColumnName(ByteBuffer columnName)
+    public ColumnDefinition getColumnDefinitionFromCellName(ByteBuffer cellName)
     {
-        if (!isSuper() && (comparator instanceof CompositeType))
+        if (!isSuper() && hasCompositeComparator())
         {
             CompositeType composite = (CompositeType)comparator;
-            ByteBuffer[] components = composite.split(columnName);
-            for (ColumnDefinition def : column_metadata.values())
+            ByteBuffer[] components = composite.split(cellName);
+            for (ColumnDefinition def : allColumns())
             {
                 ByteBuffer toCompare;
-                if (def.componentIndex == null)
+                if (def.isOnAllComponents())
                 {
-                    toCompare = columnName;
+                    toCompare = cellName;
                 }
                 else
                 {
-                    if (def.componentIndex >= components.length)
+                    if (def.position() >= components.length)
                         break;
 
-                    toCompare = components[def.componentIndex];
+                    toCompare = components[def.position()];
                 }
                 if (def.name.equals(toCompare))
                     return def;
@@ -1218,7 +1277,7 @@ public final class CFMetaData
         }
         else
         {
-            return column_metadata.get(columnName);
+            return columnMetadata.get(cellName);
         }
     }
 
@@ -1226,7 +1285,7 @@ public final class CFMetaData
 
     public ColumnDefinition getColumnDefinitionForIndex(String indexName)
     {
-        for (ColumnDefinition def : column_metadata.values())
+        for (ColumnDefinition def : allColumns())
         {
             if (indexName.equals(def.getIndexName()))
                 return def;
@@ -1245,14 +1304,12 @@ public final class CFMetaData
         {
             CFMetaData cfm = Schema.instance.getCFMetaData(cfId);
 
-            for (Map.Entry<ByteBuffer, ColumnDefinition> entry : column_metadata.entrySet())
+            for (ColumnDefinition newDef : allColumns())
             {
-                ColumnDefinition newDef = entry.getValue();
-
-                if (!cfm.column_metadata.containsKey(entry.getKey()) || newDef.getIndexType() == null)
+                if (!cfm.columnMetadata.containsKey(newDef.name.bytes) || newDef.getIndexType() == null)
                     continue;
 
-                String oldIndexName = cfm.column_metadata.get(entry.getKey()).getIndexName();
+                String oldIndexName = cfm.getColumnDefinition(newDef.name).getIndexName();
 
                 if (oldIndexName == null)
                     continue;
@@ -1265,11 +1322,11 @@ public final class CFMetaData
         }
 
         Set<String> existingNames = existingIndexNames(null);
-        for (ColumnDefinition column : column_metadata.values())
+        for (ColumnDefinition column : allColumns())
         {
             if (column.getIndexType() != null && column.getIndexName() == null)
             {
-                String baseName = getDefaultIndexName(cfName, getColumnDefinitionComparator(column), column.name);
+                String baseName = getDefaultIndexName(cfName, column.name);
                 String indexName = baseName;
                 int i = 0;
                 while (existingNames.contains(indexName))
@@ -1279,9 +1336,9 @@ public final class CFMetaData
         }
     }
 
-    public static String getDefaultIndexName(String cfName, AbstractType<?> comparator, ByteBuffer columnName)
+    public static String getDefaultIndexName(String cfName, ColumnIdentifier columnName)
     {
-        return (cfName + "_" + comparator.getString(columnName) + "_idx").replaceAll("\\W", "");
+        return (cfName + "_" + columnName + "_idx").replaceAll("\\W", "");
     }
 
     public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, Descriptor.Version version)
@@ -1327,39 +1384,20 @@ public final class CFMetaData
         if (defaultValidator instanceof CounterColumnType)
         {
             for (ColumnDefinition def : regularColumns)
-                if (!(def.getValidator() instanceof CounterColumnType))
-                    throw new ConfigurationException("Cannot add a non counter column (" + getColumnDefinitionComparator(def).getString(def.name) + ") in a counter column family");
+                if (!(def.type instanceof CounterColumnType))
+                    throw new ConfigurationException("Cannot add a non counter column (" + def.name + ") in a counter column family");
         }
         else
         {
-            for (ColumnDefinition def : column_metadata.values())
-                if (def.getValidator() instanceof CounterColumnType)
-                    throw new ConfigurationException("Cannot add a counter column (" + getColumnDefinitionComparator(def).getString(def.name) + ") in a non counter column family");
+            for (ColumnDefinition def : allColumns())
+                if (def.type instanceof CounterColumnType)
+                    throw new ConfigurationException("Cannot add a counter column (" + def.name + ") in a non counter column family");
         }
 
-        for (ColumnDefinition def : partitionKeyColumns)
-            validateAlias(def, "Key");
-        for (ColumnDefinition def : clusteringKeyColumns)
-            validateAlias(def, "Column");
-        if (compactValueColumn != null)
-            validateAlias(compactValueColumn, "Value");
-
         // initialize a set of names NOT in the CF under consideration
         Set<String> indexNames = existingIndexNames(cfName);
-        for (ColumnDefinition c : column_metadata.values())
+        for (ColumnDefinition c : allColumns())
         {
-            AbstractType<?> comparator = getColumnDefinitionComparator(c);
-
-            try
-            {
-                comparator.validate(c.name);
-            }
-            catch (MarshalException e)
-            {
-                throw new ConfigurationException(String.format("Column name %s is not valid for comparator %s",
-                                                               ByteBufferUtil.bytesToHex(c.name), comparator));
-            }
-
             if (c.getIndexType() == null)
             {
                 if (c.getIndexName() != null)
@@ -1407,18 +1445,6 @@ public final class CFMetaData
         return indexNames;
     }
 
-    private static void validateAlias(ColumnDefinition alias, String msg) throws ConfigurationException
-    {
-        try
-        {
-            UTF8Type.instance.validate(alias.name);
-        }
-        catch (MarshalException e)
-        {
-            throw new ConfigurationException(msg + " alias must be UTF8");
-        }
-    }
-
     private void validateCompactionThresholds() throws ConfigurationException
     {
         if (maxCompactionThreshold == 0)
@@ -1451,28 +1477,28 @@ public final class CFMetaData
 
         newState.toSchemaNoColumnsNoTriggers(rm, modificationTimestamp);
 
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, newState.column_metadata);
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, newState.columnMetadata);
 
         // columns that are no longer needed
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
         {
             // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type
             // are being deleted just because they are not here.
-            if (fromThrift && cd.type != ColumnDefinition.Type.REGULAR)
+            if (fromThrift && cd.kind != ColumnDefinition.Kind.REGULAR)
                 continue;
 
-            cd.deleteFromSchema(rm, cfName, getColumnDefinitionComparator(cd), modificationTimestamp);
+            cd.deleteFromSchema(rm, modificationTimestamp);
         }
 
         // newly added columns
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
-            cd.toSchema(rm, cfName, getColumnDefinitionComparator(cd), modificationTimestamp);
+            cd.toSchema(rm, modificationTimestamp);
 
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
         {
-            ColumnDefinition cd = newState.getColumnDefinition(name);
-            cd.toSchema(rm, cfName, getColumnDefinitionComparator(cd), modificationTimestamp);
+            ColumnDefinition cd = newState.columnMetadata.get(name);
+            cd.toSchema(rm, modificationTimestamp);
         }
 
         MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(triggers, newState.triggers);
@@ -1501,12 +1527,12 @@ public final class CFMetaData
         ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = SchemaColumnFamiliesCf.getCfDef().getColumnNameBuilder();
+        ColumnNameBuilder builder = SchemaColumnFamiliesCf.getColumnNameBuilder();
         builder.add(ByteBufferUtil.bytes(cfName));
         cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
 
-        for (ColumnDefinition cd : column_metadata.values())
-            cd.deleteFromSchema(rm, cfName, getColumnDefinitionComparator(cd), timestamp);
+        for (ColumnDefinition cd : allColumns())
+            cd.deleteFromSchema(rm, timestamp);
 
         for (TriggerDefinition td : triggers.values())
             td.deleteFromSchema(rm, cfName, timestamp);
@@ -1518,8 +1544,8 @@ public final class CFMetaData
     {
         toSchemaNoColumnsNoTriggers(rm, timestamp);
 
-        for (ColumnDefinition cd : column_metadata.values())
-            cd.toSchema(rm, cfName, getColumnDefinitionComparator(cd), timestamp);
+        for (ColumnDefinition cd : allColumns())
+            cd.toSchema(rm, timestamp);
     }
 
     private void toSchemaNoColumnsNoTriggers(RowMutation rm, long timestamp)
@@ -1568,14 +1594,14 @@ public final class CFMetaData
         cf.addColumn(Column.create(indexInterval, timestamp, cfName, "index_interval"));
         cf.addColumn(Column.create(speculativeRetry.toString(), timestamp, cfName, "speculative_retry"));
 
-        for (Map.Entry<ByteBuffer, Long> entry : droppedColumns.entrySet())
+        for (Map.Entry<ColumnIdentifier, Long> entry : droppedColumns.entrySet())
             cf.addColumn(new Column(makeDroppedColumnName(entry.getKey()), LongType.instance.decompose(entry.getValue()), timestamp));
 
         // Save the CQL3 metadata "the old way" for compatibility sake
         cf.addColumn(Column.create(aliasesToJson(partitionKeyColumns), timestamp, cfName, "key_aliases"));
-        cf.addColumn(Column.create(aliasesToJson(clusteringKeyColumns), timestamp, cfName, "column_aliases"));
+        cf.addColumn(Column.create(aliasesToJson(clusteringColumns), timestamp, cfName, "column_aliases"));
         cf.addColumn(compactValueColumn == null ? DeletedColumn.create(ldt, timestamp, cfName, "value_alias")
-                                                : Column.create(compactValueColumn.name, timestamp, cfName, "value_alias"));
+                                                : Column.create(compactValueColumn.name.bytes, timestamp, cfName, "value_alias"));
     }
 
     // Package protected for use by tests
@@ -1627,19 +1653,19 @@ public final class CFMetaData
                 cfm.populateIoCacheOnFlush(result.getBoolean("populate_io_cache_on_flush"));
 
             /*
-             * The info previously hold by key_aliases, column_aliases and value_alias is now stored in column_metadata (because 1) this
+             * The info previously hold by key_aliases, column_aliases and value_alias is now stored in columnMetadata (because 1) this
              * make more sense and 2) this allow to store indexing information).
              * However, for upgrade sake we need to still be able to read those old values. Moreover, we cannot easily
-             * remove those old columns once "converted" to column_metadata because that would screw up nodes that may
+             * remove those old columns once "converted" to columnMetadata because that would screw up nodes that may
              * not have upgraded. So for now we keep the both info and in sync, even though its redundant.
              * In other words, the ColumnDefinition the following lines add may be replaced later when ColumnDefinition.fromSchema
              * is called but that's ok.
              */
-            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("key_aliases"))), cfm.keyValidator, ColumnDefinition.Type.PARTITION_KEY);
-            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("column_aliases"))), cfm.comparator, ColumnDefinition.Type.CLUSTERING_KEY);
+            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("key_aliases"))), cfm.keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
+            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("column_aliases"))), cfm.comparator, ColumnDefinition.Kind.CLUSTERING_COLUMN);
 
             if (result.has("value_alias"))
-                cfm.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(result.getBytes("value_alias")), cfm.defaultValidator, ColumnDefinition.Type.COMPACT_VALUE);
+                cfm.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(result.getBytes("value_alias")), cfm.defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
 
             if (result.has("dropped_columns"))
                 cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
@@ -1652,7 +1678,7 @@ public final class CFMetaData
         }
     }
 
-    public void addColumnMetadataFromAliases(List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Type type)
+    public void addColumnMetadataFromAliases(List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
     {
         if (comparator instanceof CompositeType)
         {
@@ -1660,14 +1686,16 @@ public final class CFMetaData
             for (int i = 0; i < aliases.size(); ++i)
             {
                 if (aliases.get(i) != null)
-                    column_metadata.put(aliases.get(i), new ColumnDefinition(aliases.get(i), ct.types.get(i), i, type));
+                {
+                    addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(i), ct.types.get(i), i, kind));
+                }
             }
         }
         else
         {
             assert aliases.size() <= 1;
             if (!aliases.isEmpty() && aliases.get(0) != null)
-                column_metadata.put(aliases.get(0), new ColumnDefinition(aliases.get(0), comparator, null, type));
+                addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(0), comparator, null, kind));
         }
     }
 
@@ -1697,7 +1725,7 @@ public final class CFMetaData
     {
         List<String> aliases = new ArrayList<String>(rawAliases.size());
         for (ColumnDefinition rawAlias : rawAliases)
-            aliases.add(UTF8Type.instance.compose(rawAlias.name));
+            aliases.add(rawAlias.name.toString());
         return json(aliases);
     }
 
@@ -1709,17 +1737,17 @@ public final class CFMetaData
         return rawAliases;
     }
 
-    private static Map<ByteBuffer, Long> convertDroppedColumns(Map<String, Long> raw)
+    private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw)
     {
-        Map<ByteBuffer, Long> converted = Maps.newHashMap();
+        Map<ColumnIdentifier, Long> converted = Maps.newHashMap();
         for (Map.Entry<String, Long> entry : raw.entrySet())
-            converted.put(UTF8Type.instance.decompose(entry.getKey()), entry.getValue());
+            converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue());
         return converted;
     }
 
-    private ByteBuffer makeDroppedColumnName(ByteBuffer column)
+    private ByteBuffer makeDroppedColumnName(ColumnIdentifier column)
     {
-        ColumnNameBuilder builder = SchemaColumnFamiliesCf.cqlCfDef.getColumnNameBuilder();
+        ColumnNameBuilder builder = SchemaColumnFamiliesCf.getColumnNameBuilder();
         builder.add(UTF8Type.instance.decompose(cfName));
         builder.add(UTF8Type.instance.decompose("dropped_columns"));
         return builder.add(column).build();
@@ -1745,12 +1773,12 @@ public final class CFMetaData
 
     public AbstractType<?> getColumnDefinitionComparator(ColumnDefinition def)
     {
-        return getComponentComparator(def.componentIndex, def.type);
+        return getComponentComparator(def.isOnAllComponents() ? null : def.position(), def.kind);
     }
 
-    public AbstractType<?> getComponentComparator(Integer componentIndex, ColumnDefinition.Type type)
+    public AbstractType<?> getComponentComparator(Integer componentIndex, ColumnDefinition.Kind kind)
     {
-        switch (type)
+        switch (kind)
         {
             case REGULAR:
                 AbstractType<?> cfComparator = cfType == ColumnFamilyType.Super ? ((CompositeType)comparator).types.get(1) : comparator;
@@ -1775,31 +1803,32 @@ public final class CFMetaData
     }
 
     // Package protected for use by tests
-    static CFMetaData addColumnDefinitionsFromSchema(CFMetaData cfDef, Row serializedColumnDefinitions)
+    static CFMetaData addColumnDefinitionsFromSchema(CFMetaData cfm, Row serializedColumnDefinitions)
     {
-        for (ColumnDefinition cd : ColumnDefinition.fromSchema(serializedColumnDefinitions, cfDef))
-            cfDef.column_metadata.put(cd.name, cd);
-        return cfDef.rebuild();
+        for (ColumnDefinition cd : ColumnDefinition.fromSchema(serializedColumnDefinitions, cfm))
+            cfm.addOrReplaceColumnDefinition(cd);
+        return cfm.rebuild();
     }
 
-    public void addColumnDefinition(ColumnDefinition def) throws ConfigurationException
+    public CFMetaData addColumnDefinition(ColumnDefinition def) throws ConfigurationException
     {
-        if (column_metadata.containsKey(def.name))
-            throw new ConfigurationException(String.format("Cannot add column %s, a column with the same name already exists", getColumnDefinitionComparator(def).getString(def.name)));
+        if (columnMetadata.containsKey(def.name.bytes))
+            throw new ConfigurationException(String.format("Cannot add column %s, a column with the same name already exists", def.name));
 
-        addOrReplaceColumnDefinition(def);
+        return addOrReplaceColumnDefinition(def);
     }
 
     // This method doesn't check if a def of the same name already exist and should only be used when we
     // know this cannot happen.
-    public void addOrReplaceColumnDefinition(ColumnDefinition def)
+    public CFMetaData addOrReplaceColumnDefinition(ColumnDefinition def)
     {
-        column_metadata.put(def.name, def);
+        columnMetadata.put(def.name.bytes, def);
+        return this;
     }
 
     public boolean removeColumnDefinition(ColumnDefinition def)
     {
-        return column_metadata.remove(def.name) != null;
+        return columnMetadata.remove(def.name.bytes) != null;
     }
 
     private static void addTriggerDefinitionsFromSchema(CFMetaData cfDef, Row serializedTriggerDefinitions)
@@ -1822,76 +1851,58 @@ public final class CFMetaData
 
     public void recordColumnDrop(ColumnDefinition def)
     {
-        assert def.componentIndex != null;
+        assert !def.isOnAllComponents();
         droppedColumns.put(def.name, FBUtilities.timestampMicros());
     }
 
-    public void renameColumn(ByteBuffer from, String strFrom, ByteBuffer to, String strTo) throws InvalidRequestException
+    public void renameColumn(ColumnIdentifier from, ColumnIdentifier to) throws InvalidRequestException
     {
-        ColumnDefinition def = column_metadata.get(from);
+        ColumnDefinition def = getColumnDefinition(from);
         if (def == null)
-            throw new InvalidRequestException(String.format("Cannot rename unknown column %s in keyspace %s", strFrom, cfName));
+            throw new InvalidRequestException(String.format("Cannot rename unknown column %s in keyspace %s", from, cfName));
 
-        if (column_metadata.get(to) != null)
-            throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", strFrom, strTo, cfName));
+        if (getColumnDefinition(to) != null)
+            throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", from, to, cfName));
 
-        if (def.type == ColumnDefinition.Type.REGULAR)
+        if (def.kind == ColumnDefinition.Kind.REGULAR)
         {
-            throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", strFrom));
+            throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", from));
         }
         else if (def.isIndexed())
         {
-            throw new InvalidRequestException(String.format("Cannot rename column %s because it is secondary indexed", strFrom));
+            throw new InvalidRequestException(String.format("Cannot rename column %s because it is secondary indexed", from));
         }
 
-        ColumnDefinition newDef = def.cloneWithNewName(to);
+        ColumnDefinition newDef = def.withNewName(to);
         // don't call addColumnDefinition/removeColumnDefition because we want to avoid recomputing
         // the CQL3 cfDef between those two operation
-        column_metadata.put(newDef.name, newDef);
-        column_metadata.remove(def.name);
+        addOrReplaceColumnDefinition(newDef);
+        removeColumnDefinition(def);
     }
 
     public CFMetaData rebuild()
     {
-        /*
-         * TODO: There is definitively some repetition between the CQL3  metadata stored in this
-         * object (partitionKeyColumns, ...) and the one stored in CFDefinition.
-         * Ultimately, we should probably merge both. However, there is enough details to fix that
-         * it's worth doing that in a separate issue.
-         */
-        rebuildCQL3Metadata();
-        cqlCfDef = new CFDefinition(this);
-        return this;
-    }
-
-    public CFDefinition getCfDef()
-    {
-        assert cqlCfDef != null;
-        return cqlCfDef;
-    }
-
-    private void rebuildCQL3Metadata()
-    {
         List<ColumnDefinition> pkCols = nullInitializedList(keyValidator.componentsCount());
-        boolean isDense = isDense(comparator, column_metadata.values());
+        boolean isDense = isDense(comparator, allColumns());
         int nbCkCols = isDense
                      ? comparator.componentsCount()
-                     : comparator.componentsCount() - (hasCollection() ? 2 : 1);
+                     : comparator.componentsCount() - (hasCollections() ? 2 : 1);
         List<ColumnDefinition> ckCols = nullInitializedList(nbCkCols);
-        Set<ColumnDefinition> regCols = new HashSet<ColumnDefinition>();
+        // We keep things sorted to get consistent/predicatable order in select queries
+        SortedSet<ColumnDefinition> regCols = new TreeSet<ColumnDefinition>(regularColumnComparator);
         ColumnDefinition compactCol = null;
 
-        for (ColumnDefinition def : column_metadata.values())
+        for (ColumnDefinition def : allColumns())
         {
-            switch (def.type)
+            switch (def.kind)
             {
                 case PARTITION_KEY:
-                    assert !(def.componentIndex == null && keyValidator instanceof CompositeType);
-                    pkCols.set(def.componentIndex == null ? 0 : def.componentIndex, def);
+                    assert !(def.isOnAllComponents() && keyValidator instanceof CompositeType);
+                    pkCols.set(def.position(), def);
                     break;
-                case CLUSTERING_KEY:
-                    assert !(def.componentIndex == null && comparator instanceof CompositeType);
-                    ckCols.set(def.componentIndex == null ? 0 : def.componentIndex, def);
+                case CLUSTERING_COLUMN:
+                    assert !(def.isOnAllComponents() && comparator instanceof CompositeType);
+                    ckCols.set(def.position(), def);
                     break;
                 case REGULAR:
                     regCols.add(def);
@@ -1905,9 +1916,10 @@ public final class CFMetaData
 
         // Now actually assign the correct value. This is not atomic, but then again, updating CFMetaData is never atomic anyway.
         partitionKeyColumns = addDefaultKeyAliases(pkCols);
-        clusteringKeyColumns = addDefaultColumnAliases(ckCols);
+        clusteringColumns = addDefaultColumnAliases(ckCols);
         regularColumns = regCols;
         compactValueColumn = addDefaultValueAlias(compactCol, isDense);
+        return this;
     }
 
     private List<ColumnDefinition> addDefaultKeyAliases(List<ColumnDefinition> pkCols)
@@ -1926,8 +1938,8 @@ public final class CFMetaData
                 // For compatibility sake, we call the first alias 'key' rather than 'key1'. This
                 // is inconsistent with column alias, but it's probably not worth risking breaking compatibility now.
                 ByteBuffer name = ByteBufferUtil.bytes(i == 0 ? DEFAULT_KEY_ALIAS : DEFAULT_KEY_ALIAS + (i + 1));
-                ColumnDefinition newDef = ColumnDefinition.partitionKeyDef(name, type, idx);
-                column_metadata.put(newDef.name, newDef);
+                ColumnDefinition newDef = ColumnDefinition.partitionKeyDef(this, name, type, idx);
+                addOrReplaceColumnDefinition(newDef);
                 pkCols.set(i, newDef);
             }
         }
@@ -1942,14 +1954,14 @@ public final class CFMetaData
             {
                 Integer idx = null;
                 AbstractType<?> type = comparator;
-                if (comparator instanceof CompositeType)
+                if (hasCompositeComparator())
                 {
                     idx = i;
                     type = ((CompositeType)comparator).types.get(i);
                 }
                 ByteBuffer name = ByteBufferUtil.bytes(DEFAULT_COLUMN_ALIAS + (i + 1));
-                ColumnDefinition newDef = ColumnDefinition.clusteringKeyDef(name, type, idx);
-                column_metadata.put(newDef.name, newDef);
+                ColumnDefinition newDef = ColumnDefinition.clusteringKeyDef(this, name, type, idx);
+                addOrReplaceColumnDefinition(newDef);
                 ckCols.set(i, newDef);
             }
         }
@@ -1963,8 +1975,8 @@ public final class CFMetaData
             if (compactValueDef != null)
                 return compactValueDef;
 
-            ColumnDefinition newDef = ColumnDefinition.compactValueDef(ByteBufferUtil.bytes(DEFAULT_VALUE_ALIAS), defaultValidator);
-            column_metadata.put(newDef.name, newDef);
+            ColumnDefinition newDef = ColumnDefinition.compactValueDef(this, ByteBufferUtil.bytes(DEFAULT_VALUE_ALIAS), defaultValidator);
+            addOrReplaceColumnDefinition(newDef);
             return newDef;
         }
         else
@@ -1974,13 +1986,24 @@ public final class CFMetaData
         }
     }
 
-    private boolean hasCollection()
+    public boolean hasCollections()
     {
-        if (isSuper() || !(comparator instanceof CompositeType))
-            return false;
+        return getCollectionType() != null;
+    }
+
+    public boolean hasCompositeComparator()
+    {
+        return comparator instanceof CompositeType;
+    }
+
+    public ColumnToCollectionType getCollectionType()
+    {
+        if (isSuper() || !hasCompositeComparator())
+            return null;
 
-        List<AbstractType<?>> types = ((CompositeType)comparator).types;
-        return types.get(types.size() - 1) instanceof ColumnToCollectionType;
+        CompositeType composite = (CompositeType)comparator;
+        AbstractType<?> last = composite.types.get(composite.types.size() - 1);
+        return last instanceof ColumnToCollectionType ? (ColumnToCollectionType)last : null;
     }
 
     /*
@@ -1988,7 +2011,7 @@ public final class CFMetaData
      * component is used to store a regular column names. In other words, non-composite static "thrift"
      * and CQL3 CF are *not* dense.
      * Note that his method is only used by rebuildCQL3Metadata. Once said metadata are built, finding
-     * if a CF is dense amounts more simply to check if clusteringKeyColumns.size() == comparator.componentsCount().
+     * if a CF is dense amounts more simply to check if clusteringColumns.size() == comparator.componentsCount().
      */
     private static boolean isDense(AbstractType<?> comparator, Collection<ColumnDefinition> defs)
     {
@@ -1996,9 +2019,9 @@ public final class CFMetaData
          * As said above, this method is only here because we need to deal with thrift upgrades.
          * Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
          * then checking for isDense amounts to looking whether the maximum componentIndex for the
-         * CLUSTERING_KEY ColumnDefinitions is equal to comparator.componentsCount() - 1 or not.
+         * CLUSTERING_COLUMN ColumnDefinitions is equal to comparator.componentsCount() - 1 or not.
          *
-         * But non-upgraded thrift CF will have no such CLUSTERING_KEY column definitions, so we need
+         * But non-upgraded thrift CF will have no such CLUSTERING_COLUMN column definitions, so we need
          * to infer that information without relying on them in that case. And for the most part this is
          * easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
          * having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
@@ -2006,7 +2029,7 @@ public final class CFMetaData
          *
          * So we need to recognize those special case CQL3 table with only a primary key. If we have some
          * clustering columns, we're fine as said above. So the only problem is that we cannot decide for
-         * sure if a CF without REGULAR columns nor CLUSTERING_KEY definition is meant to be dense, or if it
+         * sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
          * has been created in CQL3 by say:
          *    CREATE TABLE test (k int PRIMARY KEY)
          * in which case it should not be dense. However, we can limit our margin of error by assuming we are
@@ -2016,10 +2039,10 @@ public final class CFMetaData
         int maxClusteringIdx = -1;
         for (ColumnDefinition def : defs)
         {
-            switch (def.type)
+            switch (def.kind)
             {
-                case CLUSTERING_KEY:
-                    maxClusteringIdx = Math.max(maxClusteringIdx, def.componentIndex == null ? 0 : def.componentIndex);
+                case CLUSTERING_COLUMN:
+                    maxClusteringIdx = Math.max(maxClusteringIdx, def.position());
                     break;
                 case REGULAR:
                     hasRegular = true;
@@ -2033,6 +2056,12 @@ public final class CFMetaData
 
     }
 
+    // See above.
+    public boolean isDense()
+    {
+        return clusteringColumns.size() == comparator.componentsCount();
+    }
+
     private static boolean isCQL3OnlyPKComparator(AbstractType<?> comparator)
     {
         if (!(comparator instanceof CompositeType))
@@ -2060,11 +2089,11 @@ public final class CFMetaData
         if (isSuper())
             return true;
 
-        for (ColumnDefinition def : column_metadata.values())
+        for (ColumnDefinition def : allColumns())
         {
             // Non-REGULAR ColumnDefinition are not "thrift compatible" per-se, but it's ok because they hold metadata
             // this is only of use to CQL3, so we will just skip them in toThrift.
-            if (def.type == ColumnDefinition.Type.REGULAR && !def.isThriftCompatible())
+            if (def.kind == ColumnDefinition.Kind.REGULAR && !def.isThriftCompatible())
                 return false;
         }
         return true;
@@ -2094,7 +2123,7 @@ public final class CFMetaData
             .append("keyValidator", keyValidator)
             .append("minCompactionThreshold", minCompactionThreshold)
             .append("maxCompactionThreshold", maxCompactionThreshold)
-            .append("column_metadata", column_metadata)
+            .append("columnMetadata", columnMetadata)
             .append("compactionStrategyClass", compactionStrategyClass)
             .append("compactionStrategyOptions", compactionStrategyOptions)
             .append("compressionOptions", compressionParameters.asThriftOptions())
@@ -2109,4 +2138,77 @@ public final class CFMetaData
             .append("triggers", triggers)
             .toString();
     }
+
+    private static class NonCompositeBuilder implements ColumnNameBuilder
+    {
+        private final AbstractType<?> type;
+        private ByteBuffer columnName;
+
+        private NonCompositeBuilder(AbstractType<?> type)
+        {
+            this.type = type;
+        }
+
+        public NonCompositeBuilder add(ByteBuffer bb)
+        {
+            if (columnName != null)
+                throw new IllegalStateException("Column name is already constructed");
+
+            columnName = bb;
+            return this;
+        }
+
+        public NonCompositeBuilder add(ColumnIdentifier name)
+        {
+            return add(name.bytes);
+        }
+
+        public NonCompositeBuilder add(ByteBuffer bb, Relation.Type op)
+        {
+            return add(bb);
+        }
+
+        public int componentCount()
+        {
+            return columnName == null ? 0 : 1;
+        }
+
+        public int remainingCount()
+        {
+            return columnName == null ? 1 : 0;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            if (i < 0 || i >= (columnName == null ? 0 : 1))
+                throw new IllegalArgumentException();
+
+            return columnName;
+        }
+
+        public ByteBuffer build()
+        {
+            return columnName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : columnName;
+        }
+
+        public ByteBuffer buildAsEndOfRange()
+        {
+            return build();
+        }
+
+        public NonCompositeBuilder copy()
+        {
+            NonCompositeBuilder newBuilder = new NonCompositeBuilder(type);
+            newBuilder.columnName = columnName;
+            return newBuilder;
+        }
+
+        public ByteBuffer getComponent(int i)
+        {
+            if (i != 0 || columnName == null)
+                throw new IllegalArgumentException();
+
+            return columnName;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index fee8b1b..05a10bc 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -24,11 +24,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.Maps;
 
-import org.apache.cassandra.cql3.ColumnNameBuilder;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -36,16 +35,16 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.utils.FBUtilities.json;
 
-public class ColumnDefinition
+public class ColumnDefinition extends ColumnSpecification
 {
     // system.schema_columns column names
     private static final String COLUMN_NAME = "column_name";
-    private static final String VALIDATOR = "validator";
+    private static final String TYPE = "validator";
     private static final String INDEX_TYPE = "index_type";
     private static final String INDEX_OPTIONS = "index_options";
     private static final String INDEX_NAME = "index_name";
     private static final String COMPONENT_INDEX = "component_index";
-    private static final String TYPE = "type";
+    private static final String KIND = "type";
 
     /*
      * The type of CQL3 column this definition represents.
@@ -58,79 +57,118 @@ public class ColumnDefinition
      * Note that thrift/CQL2 only know about definitions of type REGULAR (and
      * the ones whose componentIndex == null).
      */
-    public enum Type
+    public enum Kind
     {
         PARTITION_KEY,
-        CLUSTERING_KEY,
+        CLUSTERING_COLUMN,
         REGULAR,
-        COMPACT_VALUE
+        COMPACT_VALUE;
+
+        public String serialize()
+        {
+            // For backward compatibility we need to special case CLUSTERING_COLUMN
+            return this == CLUSTERING_COLUMN ? "clustering_key" : this.toString().toLowerCase();
+        }
+
+        public static Kind deserialize(String value)
+        {
+            if (value.equalsIgnoreCase("clustering_key"))
+                return CLUSTERING_COLUMN;
+            return Enum.valueOf(Kind.class, value.toUpperCase());
+        }
     }
 
-    public final ByteBuffer name;
-    private AbstractType<?> validator;
+    public final Kind kind;
+
+    private String indexName;
     private IndexType indexType;
     private Map<String,String> indexOptions;
-    private String indexName;
-    public final Type type;
 
     /*
      * If the column comparator is a composite type, indicates to which
      * component this definition refers to. If null, the definition refers to
      * the full column name.
      */
-    public final Integer componentIndex;
+    private final Integer componentIndex;
 
-    public static ColumnDefinition partitionKeyDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    public static ColumnDefinition partitionKeyDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(name, validator, componentIndex, Type.PARTITION_KEY);
+        return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.PARTITION_KEY);
     }
 
-    public static ColumnDefinition clusteringKeyDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    public static ColumnDefinition clusteringKeyDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(name, validator, componentIndex, Type.CLUSTERING_KEY);
+        return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.CLUSTERING_COLUMN);
     }
 
-    public static ColumnDefinition regularDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    public static ColumnDefinition regularDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(name, validator, componentIndex, Type.REGULAR);
+        return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.REGULAR);
     }
 
-    public static ColumnDefinition compactValueDef(ByteBuffer name, AbstractType<?> validator)
+    public static ColumnDefinition compactValueDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator)
     {
-        return new ColumnDefinition(name, validator, null, Type.COMPACT_VALUE);
+        return new ColumnDefinition(cfm, name, validator, null, Kind.COMPACT_VALUE);
     }
 
-    public ColumnDefinition(ByteBuffer name, AbstractType<?> validator, Integer componentIndex, Type type)
+    public ColumnDefinition(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex, Kind kind)
     {
-        this(name, validator, null, null, null, componentIndex, type);
+        this(cfm.ksName,
+             cfm.cfName,
+             new ColumnIdentifier(name, cfm.getComponentComparator(componentIndex, kind)),
+             validator,
+             null,
+             null,
+             null,
+             componentIndex,
+             kind);
     }
 
     @VisibleForTesting
-    public ColumnDefinition(ByteBuffer name,
+    public ColumnDefinition(String ksName,
+                            String cfName,
+                            ColumnIdentifier name,
                             AbstractType<?> validator,
                             IndexType indexType,
                             Map<String, String> indexOptions,
                             String indexName,
                             Integer componentIndex,
-                            Type type)
+                            Kind kind)
     {
+        super(ksName, cfName, name, validator);
         assert name != null && validator != null;
-        this.name = name;
+        this.kind = kind;
         this.indexName = indexName;
-        this.validator = validator;
         this.componentIndex = componentIndex;
         this.setIndexType(indexType, indexOptions);
-        this.type = type;
     }
 
     public ColumnDefinition copy()
     {
-        return new ColumnDefinition(name, validator, indexType, indexOptions, indexName, componentIndex, type);
+        return new ColumnDefinition(ksName, cfName, name, type, indexType, indexOptions, indexName, componentIndex, kind);
     }
 
-    public ColumnDefinition cloneWithNewName(ByteBuffer newName)
+    public ColumnDefinition withNewName(ColumnIdentifier newName)
     {
-        return new ColumnDefinition(newName, validator, indexType, indexOptions, indexName, componentIndex, type);
+        return new ColumnDefinition(ksName, cfName, newName, type, indexType, indexOptions, indexName, componentIndex, kind);
+    }
+
+    public ColumnDefinition withNewType(AbstractType<?> newType)
+    {
+        return new ColumnDefinition(ksName, cfName, name, newType, indexType, indexOptions, indexName, componentIndex, kind);
+    }
+
+    public boolean isOnAllComponents()
+    {
+        return componentIndex == null;
+    }
+
+    // The componentIndex. This never return null however for convenience sake:
+    // if componentIndex == null, this return 0. So caller should first check
+    // isOnAllComponents() to distinguish if that's a possibility.
+    public int position()
+    {
+        return componentIndex == null ? 0 : componentIndex;
     }
 
     @Override
@@ -144,8 +182,11 @@ public class ColumnDefinition
 
         ColumnDefinition cd = (ColumnDefinition) o;
 
-        return Objects.equal(name, cd.name)
-            && Objects.equal(validator, cd.validator)
+        return Objects.equal(ksName, cd.ksName)
+            && Objects.equal(cfName, cd.cfName)
+            && Objects.equal(name, cd.name)
+            && Objects.equal(type, cd.type)
+            && Objects.equal(kind, cd.kind)
             && Objects.equal(componentIndex, cd.componentIndex)
             && Objects.equal(indexName, cd.indexName)
             && Objects.equal(indexType, cd.indexType)
@@ -155,16 +196,16 @@ public class ColumnDefinition
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, validator, componentIndex, indexName, indexType, indexOptions);
+        return Objects.hashCode(ksName, cfName, name, type, kind, componentIndex, indexName, indexType, indexOptions);
     }
 
     @Override
     public String toString()
     {
         return Objects.toStringHelper(this)
-                      .add("name", ByteBufferUtil.bytesToHex(name))
-                      .add("validator", validator)
+                      .add("name", name)
                       .add("type", type)
+                      .add("kind", kind)
                       .add("componentIndex", componentIndex)
                       .add("indexName", indexName)
                       .add("indexType", indexType)
@@ -173,14 +214,14 @@ public class ColumnDefinition
 
     public boolean isThriftCompatible()
     {
-        return type == ColumnDefinition.Type.REGULAR && componentIndex == null;
+        return kind == ColumnDefinition.Kind.REGULAR && componentIndex == null;
     }
 
     public static List<ColumnDef> toThrift(Map<ByteBuffer, ColumnDefinition> columns)
     {
         List<ColumnDef> thriftDefs = new ArrayList<>(columns.size());
         for (ColumnDefinition def : columns.values())
-            if (def.type == ColumnDefinition.Type.REGULAR)
+            if (def.kind == ColumnDefinition.Kind.REGULAR)
                 thriftDefs.add(def.toThrift());
         return thriftDefs;
     }
@@ -189,8 +230,8 @@ public class ColumnDefinition
     {
         ColumnDef cd = new ColumnDef();
 
-        cd.setName(ByteBufferUtil.clone(name));
-        cd.setValidation_class(validator.toString());
+        cd.setName(ByteBufferUtil.clone(name.bytes));
+        cd.setValidation_class(type.toString());
         cd.setIndex_type(indexType == null ? null : org.apache.cassandra.thrift.IndexType.valueOf(indexType.name()));
         cd.setIndex_name(indexName == null ? null : indexName);
         cd.setIndex_options(indexOptions == null ? null : Maps.newHashMap(indexOptions));
@@ -198,26 +239,43 @@ public class ColumnDefinition
         return cd;
     }
 
-    public static ColumnDefinition fromThrift(ColumnDef thriftColumnDef, boolean isSuper) throws SyntaxException, ConfigurationException
+    public static ColumnDefinition fromThrift(CFMetaData cfm, ColumnDef thriftColumnDef) throws SyntaxException, ConfigurationException
     {
         // For super columns, the componentIndex is 1 because the ColumnDefinition applies to the column component.
-        return new ColumnDefinition(ByteBufferUtil.clone(thriftColumnDef.name),
-                                    TypeParser.parse(thriftColumnDef.validation_class),
-                                    thriftColumnDef.index_type == null ? null : IndexType.valueOf(thriftColumnDef.index_type.name()),
-                                    thriftColumnDef.index_options,
-                                    thriftColumnDef.index_name,
-                                    isSuper ? 1 : null,
-                                    Type.REGULAR);
+        Integer componentIndex = cfm.isSuper() ? 1 : null;
+        AbstractType<?> comparator = cfm.getComponentComparator(componentIndex, Kind.REGULAR);
+        try
+        {
+            comparator.validate(thriftColumnDef.name);
+        }
+        catch (MarshalException e)
+        {
+            throw new ConfigurationException(String.format("Column name %s is not valid for comparator %s", ByteBufferUtil.bytesToHex(thriftColumnDef.name), comparator));
+        }
+
+        ColumnDefinition cd = new ColumnDefinition(cfm,
+                                                   ByteBufferUtil.clone(thriftColumnDef.name),
+                                                   TypeParser.parse(thriftColumnDef.validation_class),
+                                                   componentIndex,
+                                                   Kind.REGULAR);
+
+        cd.setIndex(thriftColumnDef.index_name,
+                    thriftColumnDef.index_type == null ? null : IndexType.valueOf(thriftColumnDef.index_type.name()),
+                    thriftColumnDef.index_options);
+        return cd;
     }
 
-    public static Map<ByteBuffer, ColumnDefinition> fromThrift(List<ColumnDef> thriftDefs, boolean isSuper) throws SyntaxException, ConfigurationException
+    public static Map<ByteBuffer, ColumnDefinition> fromThrift(CFMetaData cfm, List<ColumnDef> thriftDefs) throws SyntaxException, ConfigurationException
     {
         if (thriftDefs == null)
             return new HashMap<>();
 
         Map<ByteBuffer, ColumnDefinition> cds = new TreeMap<>();
         for (ColumnDef thriftColumnDef : thriftDefs)
-            cds.put(ByteBufferUtil.clone(thriftColumnDef.name), fromThrift(thriftColumnDef, isSuper));
+        {
+            ColumnDefinition def = fromThrift(cfm, thriftColumnDef);
+            cds.put(def.name.bytes, def);
+        }
 
         return cds;
     }
@@ -229,55 +287,61 @@ public class ColumnDefinition
      * @param cfName     The name of the parent ColumnFamily
      * @param timestamp  The timestamp to use for column modification
      */
-    public void deleteFromSchema(RowMutation rm, String cfName, AbstractType<?> comparator, long timestamp)
+    public void deleteFromSchema(RowMutation rm, long timestamp)
     {
         ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaColumnsCf.getCfDef().getColumnNameBuilder();
-        // Note: the following is necessary for backward compatibility. For CQL3, comparator will be UTF8 and nameBytes == name
-        ByteBuffer nameBytes = ByteBufferUtil.bytes(comparator.getString(name));
+        ColumnNameBuilder builder = CFMetaData.SchemaColumnsCf.getColumnNameBuilder();
+        // Note: the following is necessary for backward compatibility. For CQL3, BBU.bytes(name.toString()) == name
+        ByteBuffer nameBytes = ByteBufferUtil.bytes(name.toString());
         builder.add(ByteBufferUtil.bytes(cfName)).add(nameBytes);
         cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
     }
 
-    public void toSchema(RowMutation rm, String cfName, AbstractType<?> comparator, long timestamp)
+    public void toSchema(RowMutation rm, long timestamp)
     {
         ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        cf.addColumn(Column.create("", timestamp, cfName, comparator.getString(name), ""));
-        cf.addColumn(Column.create(validator.toString(), timestamp, cfName, comparator.getString(name), VALIDATOR));
-        cf.addColumn(indexType == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), INDEX_TYPE)
-                                       : Column.create(indexType.toString(), timestamp, cfName, comparator.getString(name), INDEX_TYPE));
-        cf.addColumn(indexOptions == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), INDEX_OPTIONS)
-                                          : Column.create(json(indexOptions), timestamp, cfName, comparator.getString(name), INDEX_OPTIONS));
-        cf.addColumn(indexName == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), INDEX_NAME)
-                                       : Column.create(indexName, timestamp, cfName, comparator.getString(name), INDEX_NAME));
-        cf.addColumn(componentIndex == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), COMPONENT_INDEX)
-                                            : Column.create(componentIndex, timestamp, cfName, comparator.getString(name), COMPONENT_INDEX));
-        cf.addColumn(Column.create(type.toString().toLowerCase(), timestamp, cfName, comparator.getString(name), TYPE));
+        cf.addColumn(Column.create("", timestamp, cfName, name.toString(), ""));
+        cf.addColumn(Column.create(type.toString(), timestamp, cfName, name.toString(), TYPE));
+        cf.addColumn(indexType == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_TYPE)
+                                       : Column.create(indexType.toString(), timestamp, cfName, name.toString(), INDEX_TYPE));
+        cf.addColumn(indexOptions == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_OPTIONS)
+                                          : Column.create(json(indexOptions), timestamp, cfName, name.toString(), INDEX_OPTIONS));
+        cf.addColumn(indexName == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_NAME)
+                                       : Column.create(indexName, timestamp, cfName, name.toString(), INDEX_NAME));
+        cf.addColumn(componentIndex == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), COMPONENT_INDEX)
+                                            : Column.create(componentIndex, timestamp, cfName, name.toString(), COMPONENT_INDEX));
+        cf.addColumn(Column.create(kind.serialize(), timestamp, cfName, name.toString(), KIND));
     }
 
-    public void apply(ColumnDefinition def, AbstractType<?> comparator)  throws ConfigurationException
+    public ColumnDefinition apply(ColumnDefinition def)  throws ConfigurationException
     {
-        assert type == def.type && Objects.equal(componentIndex, def.componentIndex);
+        assert kind == def.kind && Objects.equal(componentIndex, def.componentIndex);
 
         if (getIndexType() != null && def.getIndexType() != null)
         {
             // If an index is set (and not drop by this update), the validator shouldn't be change to a non-compatible one
             // (and we want true comparator compatibility, not just value one, since the validator is used by LocalPartitioner to order index rows)
-            if (!def.getValidator().isCompatibleWith(getValidator()))
-                throw new ConfigurationException(String.format("Cannot modify validator to a non-order-compatible one for column %s since an index is set", comparator.getString(name)));
+            if (!def.type.isCompatibleWith(type))
+                throw new ConfigurationException(String.format("Cannot modify validator to a non-order-compatible one for column %s since an index is set", name));
 
             assert getIndexName() != null;
             if (!getIndexName().equals(def.getIndexName()))
                 throw new ConfigurationException("Cannot modify index name");
         }
 
-        setValidator(def.getValidator());
-        setIndexType(def.getIndexType(), def.getIndexOptions());
-        setIndexName(def.getIndexName());
+        return new ColumnDefinition(ksName,
+                                    cfName,
+                                    name,
+                                    def.type,
+                                    def.getIndexType(),
+                                    def.getIndexOptions(),
+                                    def.getIndexName(),
+                                    componentIndex,
+                                    kind);
     }
 
     /**
@@ -293,22 +357,25 @@ public class ColumnDefinition
         String query = String.format("SELECT * FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.SCHEMA_COLUMNS_CF);
         for (UntypedResultSet.Row row : QueryProcessor.resultify(query, serializedColumns))
         {
-            Type type = row.has(TYPE)
-                      ? Enum.valueOf(Type.class, row.getString(TYPE).toUpperCase())
-                      : Type.REGULAR;
+            Kind kind = row.has(KIND)
+                      ? Kind.deserialize(row.getString(KIND))
+                      : Kind.REGULAR;
 
             Integer componentIndex = null;
             if (row.has(COMPONENT_INDEX))
                 componentIndex = row.getInt(COMPONENT_INDEX);
-            else if (type == Type.CLUSTERING_KEY && cfm.isSuper())
+            else if (kind == Kind.CLUSTERING_COLUMN && cfm.isSuper())
                 componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
 
-            ByteBuffer name = cfm.getComponentComparator(componentIndex, type).fromString(row.getString(COLUMN_NAME));
+            // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
+            // we need to use the comparator fromString method
+            AbstractType<?> comparator = cfm.getComponentComparator(componentIndex, kind);
+            ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString(COLUMN_NAME)), comparator);
 
             AbstractType<?> validator;
             try
             {
-                validator = TypeParser.parse(row.getString(VALIDATOR));
+                validator = TypeParser.parse(row.getString(TYPE));
             }
             catch (RequestValidationException e)
             {
@@ -327,7 +394,7 @@ public class ColumnDefinition
             if (row.has(INDEX_NAME))
                 indexName = row.getString(INDEX_NAME);
 
-            cds.add(new ColumnDefinition(name, validator, indexType, indexOptions, indexName, componentIndex, type));
+            cds.add(new ColumnDefinition(cfm.ksName, cfm.cfName, name, validator, indexType, indexOptions, indexName, componentIndex, kind));
         }
 
         return cds;
@@ -370,14 +437,4 @@ public class ColumnDefinition
     {
         return indexOptions;
     }
-
-    public AbstractType<?> getValidator()
-    {
-        return validator;
-    }
-
-    public void setValidator(AbstractType<?> validator)
-    {
-        this.validator = validator;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index d822704..112dc87 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -216,37 +216,6 @@ public class Schema
     }
 
     /**
-     * Get column comparator for ColumnFamily but it's keyspace/name
-     *
-     * @param ksName The keyspace name
-     * @param cfName The ColumnFamily name
-     *
-     * @return The comparator of the ColumnFamily
-     */
-    public AbstractType<?> getComparator(String ksName, String cfName)
-    {
-        assert ksName != null;
-        CFMetaData cfmd = getCFMetaData(ksName, cfName);
-        if (cfmd == null)
-            throw new IllegalArgumentException("Unknown ColumnFamily " + cfName + " in keyspace " + ksName);
-        return cfmd.comparator;
-    }
-
-    /**
-     * Get value validator for specific column
-     *
-     * @param ksName The keyspace name
-     * @param cfName The ColumnFamily name
-     * @param column The name of the column
-     *
-     * @return value validator specific to the column or default (per-cf) one
-     */
-    public AbstractType<?> getValueValidator(String ksName, String cfName, ByteBuffer column)
-    {
-        return getCFMetaData(ksName, cfName).getValueValidator(column);
-    }
-
-    /**
      * Get metadata about keyspace by its name
      *
      * @param keyspaceName The name of the keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/config/TriggerDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/TriggerDefinition.java b/src/java/org/apache/cassandra/config/TriggerDefinition.java
index 69e06b1..e08f97c 100644
--- a/src/java/org/apache/cassandra/config/TriggerDefinition.java
+++ b/src/java/org/apache/cassandra/config/TriggerDefinition.java
@@ -84,7 +84,7 @@ public class TriggerDefinition
     {
         ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getCfDef().getColumnNameBuilder();
+        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getColumnNameBuilder();
         builder.add(bytes(cfName)).add(bytes(name));
 
         cf.addColumn(builder.copy().add(bytes("")).build(), bytes(""), timestamp); // the row marker
@@ -103,7 +103,7 @@ public class TriggerDefinition
         ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getCfDef().getColumnNameBuilder();
+        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getColumnNameBuilder();
         builder.add(bytes(cfName)).add(bytes(name));
         cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 48e64c8..74c6593 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -74,7 +74,7 @@ public class AlterTableStatement
         switch (oType)
         {
             case ADD:
-                cfm.addColumnDefinition(ColumnDefinition.regularDef(columnName, TypeParser.parse(validator), null));
+                cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, columnName, TypeParser.parse(validator), null));
                 break;
 
             case ALTER:
@@ -102,7 +102,7 @@ public class AlterTableStatement
                                     this.columnName,
                                     columnFamily));
 
-                    toUpdate.setValidator(TypeParser.parse(validator));
+                    cfm.addOrReplaceColumnDefinition(toUpdate.withNewType(TypeParser.parse(validator)));
                 }
                 break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index dd56387..a140d02 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TypeParser;
@@ -122,7 +123,7 @@ public class CreateColumnFamilyStatement
     }
 
     // Column definitions
-    private Map<ByteBuffer, ColumnDefinition> getColumns(AbstractType<?> comparator) throws InvalidRequestException
+    private Map<ByteBuffer, ColumnDefinition> getColumns(CFMetaData cfm) throws InvalidRequestException
     {
         Map<ByteBuffer, ColumnDefinition> columnDefs = new HashMap<ByteBuffer, ColumnDefinition>();
 
@@ -130,12 +131,12 @@ public class CreateColumnFamilyStatement
         {
             try
             {
-                ByteBuffer columnName = comparator.fromStringCQL2(col.getKey().getText());
+                ByteBuffer columnName = cfm.comparator.fromStringCQL2(col.getKey().getText());
                 String validatorClassName = CFPropDefs.comparators.containsKey(col.getValue())
                                           ? CFPropDefs.comparators.get(col.getValue())
                                           : col.getValue();
                 AbstractType<?> validator = TypeParser.parse(validatorClassName);
-                columnDefs.put(columnName, ColumnDefinition.regularDef(columnName, validator, null));
+                columnDefs.put(columnName, ColumnDefinition.regularDef(cfm, columnName, validator, null));
             }
             catch (ConfigurationException e)
             {
@@ -192,7 +193,7 @@ public class CreateColumnFamilyStatement
                    .defaultValidator(cfProps.getValidator())
                    .minCompactionThreshold(minCompactionThreshold)
                    .maxCompactionThreshold(maxCompactionThreshold)
-                   .columnMetadata(getColumns(comparator))
+                   .columnMetadata(getColumns(newCFMD))
                    .keyValidator(TypeParser.parse(CFPropDefs.comparators.get(getKeyType())))
                    .compactionStrategyClass(cfProps.compactionStrategyClass)
                    .compactionStrategyOptions(cfProps.compactionStrategyOptions)
@@ -206,7 +207,7 @@ public class CreateColumnFamilyStatement
 
             // CQL2 can have null keyAliases
             if (keyAlias != null)
-                newCFMD.addColumnDefinition(ColumnDefinition.partitionKeyDef(keyAlias, newCFMD.getKeyValidator(), null));
+                newCFMD.addColumnDefinition(ColumnDefinition.partitionKeyDef(newCFMD, keyAlias, newCFMD.getKeyValidator(), null));
         }
         catch (ConfigurationException e)
         {