You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/10/30 17:10:17 UTC

[1/4] Remove CFDefinition

Updated Branches:
  refs/heads/trunk 91d60fbce -> 5f5905d51


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 92f2dcf..357ad65 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.IndexType;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.ExtendedFilter;
@@ -100,7 +101,7 @@ public class SecondaryIndexManager
 
         // TODO: allow all ColumnDefinition type
         for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
-            if (cdef.getIndexType() != null && !indexedColumnNames.contains(cdef.name))
+            if (cdef.getIndexType() != null && !indexedColumnNames.contains(cdef.name.bytes))
                 addIndexedColumn(cdef);
 
         Set<SecondaryIndex> reloadedIndexes = Collections.newSetFromMap(new IdentityHashMap<SecondaryIndex, Boolean>());
@@ -231,8 +232,7 @@ public class SecondaryIndexManager
      */
     public synchronized Future<?> addIndexedColumn(ColumnDefinition cdef)
     {
-
-        if (indexesByColumn.containsKey(cdef.name))
+        if (indexesByColumn.containsKey(cdef.name.bytes))
             return null;
 
         assert cdef.getIndexType() != null;
@@ -278,11 +278,11 @@ public class SecondaryIndexManager
         // so we don't have to lock everything while we do the build. it's up to
         // the operator to wait
         // until the index is actually built before using in queries.
-        indexesByColumn.put(cdef.name, index);
+        indexesByColumn.put(cdef.name.bytes, index);
 
         // if we're just linking in the index to indexedColumns on an
         // already-built index post-restart, we're done
-        if (index.isIndexBuilt(cdef.name))
+        if (index.isIndexBuilt(cdef.name.bytes))
             return null;
 
         return index.buildIndexAsync();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 0720e83..3dea495 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -53,9 +53,9 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     public static CompositesIndex create(ColumnDefinition cfDef)
     {
-        switch (cfDef.type)
+        switch (cfDef.kind)
         {
-            case CLUSTERING_KEY:
+            case CLUSTERING_COLUMN:
                 return new CompositesIndexOnClusteringKey();
             case REGULAR:
                 return new CompositesIndexOnRegular();
@@ -70,9 +70,9 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
     // Check SecondaryIndex.getIndexComparator if you want to know why this is static
     public static CompositeType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
     {
-        switch (cfDef.type)
+        switch (cfDef.kind)
         {
-            case CLUSTERING_KEY:
+            case CLUSTERING_COLUMN:
                 return CompositesIndexOnClusteringKey.buildIndexComparator(baseMetadata, cfDef);
             case REGULAR:
                 return CompositesIndexOnRegular.buildIndexComparator(baseMetadata, cfDef);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 954f380..63889ee 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 
 /**
- * Index on a CLUSTERING_KEY column definition.
+ * Index on a CLUSTERING_COLUMN column definition.
  *
  * A cell indexed by this index will have the general form:
  *   ck_0 ... ck_n c_name : v
@@ -51,13 +51,13 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
     {
         // Index cell names are rk ck_0 ... ck_{i-1} ck_{i+1} ck_n, so n
         // components total (where n is the number of clustering keys)
-        int ckCount = baseMetadata.clusteringKeyColumns().size();
+        int ckCount = baseMetadata.clusteringColumns().size();
         List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount);
         List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents();
         types.add(SecondaryIndex.keyComparator);
-        for (int i = 0; i < columnDef.componentIndex; i++)
+        for (int i = 0; i < columnDef.position(); i++)
             types.add(ckTypes.get(i));
-        for (int i = columnDef.componentIndex + 1; i < ckCount; i++)
+        for (int i = columnDef.position() + 1; i < ckCount; i++)
             types.add(ckTypes.get(i));
         return CompositeType.getInstance(types);
     }
@@ -66,36 +66,36 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
     {
         CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
         ByteBuffer[] components = baseComparator.split(column.name());
-        return components[columnDef.componentIndex];
+        return components[columnDef.position()];
     }
 
     protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
     {
-        int ckCount = baseCfs.metadata.clusteringKeyColumns().size();
+        int ckCount = baseCfs.metadata.clusteringColumns().size();
         CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
         ByteBuffer[] components = baseComparator.split(columnName);
         CompositeType.Builder builder = getIndexComparator().builder();
         builder.add(rowKey);
 
-        for (int i = 0; i < Math.min(components.length, columnDef.componentIndex); i++)
+        for (int i = 0; i < Math.min(components.length, columnDef.position()); i++)
             builder.add(components[i]);
-        for (int i = columnDef.componentIndex + 1; i < Math.min(components.length, ckCount); i++)
+        for (int i = columnDef.position() + 1; i < Math.min(components.length, ckCount); i++)
             builder.add(components[i]);
         return builder;
     }
 
     public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
     {
-        int ckCount = baseCfs.metadata.clusteringKeyColumns().size();
+        int ckCount = baseCfs.metadata.clusteringColumns().size();
         ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
 
         ColumnNameBuilder builder = getBaseComparator().builder();
-        for (int i = 0; i < columnDef.componentIndex; i++)
+        for (int i = 0; i < columnDef.position(); i++)
             builder.add(components[i + 1]);
 
         builder.add(indexedValue.key);
 
-        for (int i = columnDef.componentIndex + 1; i < ckCount; i++)
+        for (int i = columnDef.position() + 1; i < ckCount; i++)
             builder.add(components[i]);
 
         return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index 4e2c580..d097899 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -50,7 +50,7 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
 {
     public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        int ckCount = baseMetadata.clusteringKeyColumns().size();
+        int ckCount = baseMetadata.clusteringColumns().size();
         List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount + 1);
         types.add(SecondaryIndex.keyComparator);
         types.addAll(baseMetadata.comparator.getComponents());
@@ -61,12 +61,12 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
     {
         CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
         ByteBuffer[] components = keyComparator.split(rowKey);
-        return components[columnDef.componentIndex];
+        return components[columnDef.position()];
     }
 
     protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
     {
-        int ckCount = baseCfs.metadata.clusteringKeyColumns().size();
+        int ckCount = baseCfs.metadata.clusteringColumns().size();
         CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
         ByteBuffer[] components = baseComparator.split(columnName);
         CompositeType.Builder builder = getIndexComparator().builder();
@@ -78,7 +78,7 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
 
     public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
     {
-        int ckCount = baseCfs.metadata.clusteringKeyColumns().size();
+        int ckCount = baseCfs.metadata.clusteringColumns().size();
         ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
 
         ColumnNameBuilder builder = getBaseComparator().builder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index 7159c23..55a0f88 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -49,7 +49,7 @@ public class CompositesIndexOnRegular extends CompositesIndex
 {
     public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        int prefixSize = columnDef.componentIndex;
+        int prefixSize = columnDef.position();
         List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
         types.add(SecondaryIndex.keyComparator);
         for (int i = 0; i < prefixSize; i++)
@@ -68,7 +68,7 @@ public class CompositesIndexOnRegular extends CompositesIndex
         ByteBuffer[] components = baseComparator.split(columnName);
         CompositeType.Builder builder = getIndexComparator().builder();
         builder.add(rowKey);
-        for (int i = 0; i < Math.min(columnDef.componentIndex, components.length); i++)
+        for (int i = 0; i < Math.min(columnDef.position(), components.length); i++)
             builder.add(components[i]);
         return builder;
     }
@@ -77,7 +77,7 @@ public class CompositesIndexOnRegular extends CompositesIndex
     {
         ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
         CompositeType.Builder builder = getBaseComparator().builder();
-        for (int i = 0; i < columnDef.componentIndex; i++)
+        for (int i = 0; i < columnDef.position(); i++)
             builder.add(components[i + 1]);
         return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
     }
@@ -87,8 +87,8 @@ public class CompositesIndexOnRegular extends CompositesIndex
     {
         ByteBuffer[] components = getBaseComparator().split(name);
         AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return components.length > columnDef.componentIndex
-            && comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
+        return components.length > columnDef.position()
+            && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
     }
 
     public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
@@ -99,6 +99,6 @@ public class CompositesIndexOnRegular extends CompositesIndex
             return true;
 
         ByteBuffer liveValue = liveColumn.value();
-        return columnDef.getValidator().compare(entry.indexValue.key, liveValue) != 0;
+        return columnDef.type.compare(entry.indexValue.key, liveValue) != 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 4331f05..0d5d1a5 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -240,7 +240,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                                                                            entry.indexedEntryEnd(),
                                                                            false,
                                                                            Integer.MAX_VALUE,
-                                                                           baseCfs.metadata.clusteringKeyColumns().size());
+                                                                           baseCfs.metadata.clusteringColumns().size());
                         ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
                         if (newData == null || index.isStale(entry, newData, filter.timestamp))
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index f47b1e1..7d98c24 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -50,12 +50,12 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
 
     public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
     {
-        Column liveColumn = data.getColumn(columnDef.name);
+        Column liveColumn = data.getColumn(columnDef.name.bytes);
         if (liveColumn == null || liveColumn.isMarkedForDelete(now))
             return true;
 
         ByteBuffer liveValue = liveColumn.value();
-        return columnDef.getValidator().compare(indexedValue, liveValue) != 0;
+        return columnDef.type.compare(indexedValue, liveValue) != 0;
     }
 
     public void validateOptions() throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 3110049..244ae08 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
@@ -354,11 +355,18 @@ public class CompositeType extends AbstractCompositeType
             return this;
         }
 
+        @Override
         public Builder add(ByteBuffer bb)
         {
             return add(bb, Relation.Type.EQ);
         }
 
+        @Override
+        public Builder add(ColumnIdentifier name)
+        {
+            return add(name.bytes);
+        }
+
         public int componentCount()
         {
             return components.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 2ff8908..45b2d2b 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.marshal.*;
@@ -622,11 +622,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             // classis thrift tables
             if (keys.size() == 0)
             {
-                CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
-                for (ColumnIdentifier column : cfDefinition.keys.keySet())
+                CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+                for (ColumnDefinition def : cfm.partitionKeyColumns())
                 {
-                    String key = column.toString();
-                    String type = cfDefinition.keys.get(column).type.toString();
+                    String key = def.name.toString();
+                    String type = def.type.toString();
                     logger.debug("name: {}, type: {} ", key, type);
                     keys.add(key);
                 }
@@ -689,12 +689,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                 return columnDefs;
 
             // otherwise for CqlStorage, check metadata for classic thrift tables
-            CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
-            for (ColumnIdentifier column : cfDefinition.metadata.keySet())
+            CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+            for (ColumnDefinition def : cfm.regularColumns())
             {
                 ColumnDef cDef = new ColumnDef();
-                String columnName = column.toString();
-                String type = cfDefinition.metadata.get(column).type.toString();
+                String columnName = def.name.toString();
+                String type = def.type.toString();
                 logger.debug("name: {}, type: {} ", columnName, type);
                 cDef.name = ByteBufferUtil.bytes(columnName);
                 cDef.validation_class = type;
@@ -702,14 +702,14 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             }
             // we may not need to include the value column for compact tables as we 
             // could have already processed it as schema_columnfamilies.value_alias
-            if (columnDefs.size() == 0 && includeCompactValueColumn)
+            if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
             {
-                String value = cfDefinition.value != null ? cfDefinition.value.toString() : null;
-                if ("value".equals(value))
+                ColumnDefinition def = cfm.compactValueColumn();
+                if ("value".equals(def.name.toString()))
                 {
                     ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(value);
-                    cDef.validation_class = cfDefinition.value.type.toString();
+                    cDef.name = def.name.bytes;
+                    cDef.validation_class = def.type.toString();
                     columnDefs.add(cDef);
                 }
             }
@@ -772,8 +772,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         return indexes;
     }
 
-    /** get CFDefinition of a column family */
-    protected CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
+    /** get CFMetaData of a column family */
+    protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
             throws NotFoundException,
             InvalidRequestException,
             TException,
@@ -784,7 +784,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         for (CfDef cfDef : ksDef.cf_defs)
         {
             if (cfDef.name.equalsIgnoreCase(cf))
-                return new CFDefinition(CFMetaData.fromThrift(cfDef));
+                return CFMetaData.fromThrift(cfDef);
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 5fbce21..a7793e2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -23,8 +23,9 @@ import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
 
-import org.apache.cassandra.cql3.CFDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -522,18 +523,18 @@ public class CqlStorage extends AbstractCassandraStorage
             // classic thrift tables
             if (keys.size() == 0)
             {
-                CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
-                for (ColumnIdentifier column : cfDefinition.keys.keySet())
+                CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+                for (ColumnDefinition def : cfm.partitionKeyColumns())
                 {
-                    String key = column.toString();
+                    String key = def.name.toString();
                     logger.debug("name: {} ", key);
                     ColumnDef cDef = new ColumnDef();
                     cDef.name = ByteBufferUtil.bytes(key);
                     keys.add(cDef);
                 }
-                for (ColumnIdentifier column : cfDefinition.columns.keySet())
+                for (ColumnDefinition def : cfm.clusteringColumns())
                 {
-                    String key = column.toString();
+                    String key = def.name.toString();
                     logger.debug("name: {} ", key);
                     ColumnDef cDef = new ColumnDef();
                     cDef.name = ByteBufferUtil.bytes(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3ad107f..b1f6dcb 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1478,7 +1478,7 @@ public class StorageProxy implements StorageProxyMBean
         }
         else
         {
-            if (cfs.metadata.getCfDef().isCompact)
+            if (cfs.metadata.isDense())
             {
                 // one storage row per result row, so use key estimate directly
                 resultRowsPerRange = cfs.estimateKeys();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 1601ff6..b00bb8a 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -175,7 +175,7 @@ public class QueryPagers
         SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter);
         final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, false);
 
-        ColumnCounter counter = filter.columnCounter(Schema.instance.getComparator(keyspace, columnFamily), now);
+        ColumnCounter counter = filter.columnCounter(Schema.instance.getCFMetaData(keyspace, columnFamily).comparator, now);
         while (!pager.isExhausted())
         {
             List<Row> next = pager.fetchPage(pageSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 8fb2e50..beef7c6 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.CFDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
@@ -211,8 +210,7 @@ public class ThriftValidation
                 throw new org.apache.cassandra.exceptions.InvalidRequestException("supercolumn specified to ColumnFamily " + metadata.cfName + " containing normal columns");
         }
         AbstractType<?> comparator = SuperColumns.getComparatorFor(metadata, superColumnName);
-        CFDefinition cfDef = metadata.getCfDef();
-        boolean isCQL3Table = cfDef.isComposite && !cfDef.isCompact && !metadata.isSuper();
+        boolean isCQL3Table = metadata.hasCompositeComparator() && !metadata.isDense() && !metadata.isSuper();
         for (ByteBuffer name : column_names)
         {
             if (name.remaining() > maxNameLength)
@@ -233,24 +231,24 @@ public class ThriftValidation
                 // CQL3 table don't support having only part of their composite column names set
                 CompositeType composite = (CompositeType)comparator;
                 ByteBuffer[] components = composite.split(name);
-                int minComponents = composite.types.size() - (cfDef.hasCollections ? 1 : 0);
+                int minComponents = composite.types.size() - (metadata.hasCollections() ? 1 : 0);
                 if (components.length < minComponents)
                     throw new org.apache.cassandra.exceptions.InvalidRequestException(String.format("Not enough components (found %d but %d expected) for column name since %s is a CQL3 table",
                                                                                                     components.length, minComponents, metadata.cfName));
 
                 // Furthermore, the column name must be a declared one.
-                int columnIndex = composite.types.size() - (cfDef.hasCollections ? 2 : 1);
+                int columnIndex = composite.types.size() - (metadata.hasCollections() ? 2 : 1);
                 ByteBuffer CQL3ColumnName = components[columnIndex];
                 if (!CQL3ColumnName.hasRemaining())
                     continue; // Row marker, ok
 
                 ColumnIdentifier columnId = new ColumnIdentifier(CQL3ColumnName, composite.types.get(columnIndex));
-                if (cfDef.metadata.get(columnId) == null)
+                if (metadata.getColumnDefinition(columnId) == null)
                     throw new org.apache.cassandra.exceptions.InvalidRequestException(String.format("Invalid cell for CQL3 table %s. The CQL3 column component (%s) does not correspond to a defined CQL3 column",
                                                                                                     metadata.cfName, columnId));
 
                 // On top of that, if we have a collection component, he (CQL3) column must be a collection
-                if (cfDef.hasCollections && components.length == composite.types.size())
+                if (metadata.hasCollections() && components.length == composite.types.size())
                 {
                     assert components.length >= 2;
                     ColumnToCollectionType collectionType = (ColumnToCollectionType)composite.types.get(composite.types.size() - 1);
@@ -436,7 +434,7 @@ public class ThriftValidation
         if (!column.isSetTimestamp())
             throw new org.apache.cassandra.exceptions.InvalidRequestException("Column timestamp is required");
 
-        ColumnDefinition columnDef = metadata.getColumnDefinitionFromColumnName(column.name);
+        ColumnDefinition columnDef = metadata.getColumnDefinitionFromCellName(column.name);
         try
         {
             AbstractType<?> validator = metadata.getValueValidator(columnDef);
@@ -590,7 +588,7 @@ public class ThriftValidation
             if (expression.value.remaining() > 0xFFFF)
                 throw new org.apache.cassandra.exceptions.InvalidRequestException("Index expression values may not be larger than 64K");
 
-            AbstractType<?> valueValidator = Schema.instance.getValueValidator(metadata.ksName, metadata.cfName, expression.column_name);
+            AbstractType<?> valueValidator = metadata.getValueValidatorFromCellName(expression.column_name);
             try
             {
                 valueValidator.validate(expression.value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 1568b00..dab7760 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -187,7 +187,7 @@ public class SSTableExport
         }
         else
         {
-            AbstractType<?> validator = cfMetaData.getValueValidator(cfMetaData.getColumnDefinitionFromColumnName(name));
+            AbstractType<?> validator = cfMetaData.getValueValidator(cfMetaData.getColumnDefinitionFromCellName(name));
             serializedColumn.add(validator.getString(value));
         }
         serializedColumn.add(column.timestamp());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index 79584c5..2e9c4ed 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -152,7 +152,7 @@ public class SSTableImport
                 }
 
                 value = isDeleted() ? ByteBufferUtil.hexToBytes((String) fields.get(1))
-                                    : stringAsType((String) fields.get(1), meta.getValueValidator(meta.getColumnDefinitionFromColumnName(name)));
+                                    : stringAsType((String) fields.get(1), meta.getValueValidator(meta.getColumnDefinitionFromCellName(name)));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index c000af9..3c3092e 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -107,7 +107,7 @@ public class Tracing
 
     public static ByteBuffer buildName(CFMetaData meta, ByteBuffer... args)
     {
-        ColumnNameBuilder builder = meta.getCfDef().getColumnNameBuilder();
+        ColumnNameBuilder builder = meta.getColumnNameBuilder();
         for (ByteBuffer arg : args)
             builder.add(arg);
         return builder.build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index e5f86bb..f9256da 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
@@ -126,12 +127,6 @@ public class SchemaLoader
         aliases.put((byte)'t', TimeUUIDType.instance);
         AbstractType<?> dynamicComposite = DynamicCompositeType.getInstance(aliases);
 
-        // these column definitions will will be applied to the jdbc utf and integer column familes respectively.
-        Map<ByteBuffer, ColumnDefinition> integerColumn = new HashMap<ByteBuffer, ColumnDefinition>();
-        integerColumn.put(IntegerType.instance.fromString("42"), ColumnDefinition.regularDef(IntegerType.instance.fromString("42"), UTF8Type.instance, null));
-        Map<ByteBuffer, ColumnDefinition> utf8Column = new HashMap<ByteBuffer, ColumnDefinition>();
-        utf8Column.put(UTF8Type.instance.fromString("fortytwo"), ColumnDefinition.regularDef(UTF8Type.instance.fromString("fortytwo"), IntegerType.instance, null));
-
         // Make it easy to test compaction
         Map<String, String> compactionOptions = new HashMap<String, String>();
         compactionOptions.put("tombstone_compaction_interval", "1");
@@ -183,8 +178,8 @@ public class SchemaLoader
                                                           bytes)
                                                    .defaultValidator(CounterColumnType.instance),
                                            superCFMD(ks1, "SuperDirectGC", BytesType.instance).gcGraceSeconds(0),
-                                           jdbcCFMD(ks1, "JdbcInteger", IntegerType.instance).columnMetadata(integerColumn),
-                                           jdbcCFMD(ks1, "JdbcUtf8", UTF8Type.instance).columnMetadata(utf8Column),
+                                           jdbcCFMD(ks1, "JdbcInteger", IntegerType.instance).addColumnDefinition(integerColumn("ks1", "JdbcInteger")),
+                                           jdbcCFMD(ks1, "JdbcUtf8", UTF8Type.instance).addColumnDefinition(utf8Column("ks1", "JdbcUtf8")),
                                            jdbcCFMD(ks1, "JdbcLong", LongType.instance),
                                            jdbcCFMD(ks1, "JdbcBytes", bytes),
                                            jdbcCFMD(ks1, "JdbcAscii", AsciiType.instance),
@@ -308,23 +303,43 @@ public class SchemaLoader
         return schema;
     }
 
+    private static ColumnDefinition integerColumn(String ksName, String cfName)
+    {
+        return new ColumnDefinition(ksName,
+                                    cfName,
+                                    new ColumnIdentifier(IntegerType.instance.fromString("42"), IntegerType.instance),
+                                    UTF8Type.instance,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    ColumnDefinition.Kind.REGULAR);
+    }
+
+    private static ColumnDefinition utf8Column(String ksName, String cfName)
+    {
+        return new ColumnDefinition(ksName,
+                                    cfName,
+                                    new ColumnIdentifier("fortytwo", true),
+                                    UTF8Type.instance,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    ColumnDefinition.Kind.REGULAR);
+    }
+
     private static CFMetaData perRowIndexedCFMD(String ksName, String cfName, boolean withOldCfIds)
     {
         final Map<String, String> indexOptions = Collections.singletonMap(
                                                       SecondaryIndex.CUSTOM_INDEX_OPTION_NAME,
                                                       PerRowSecondaryIndexTest.TestIndex.class.getName());
-        return standardCFMD(ksName, cfName)
-                .keyValidator(AsciiType.instance)
-                .columnMetadata(new HashMap<ByteBuffer, ColumnDefinition>()
-                {{
-                        ByteBuffer cName = ByteBuffer.wrap("indexed".getBytes(Charsets.UTF_8));
-                        put(cName, new ColumnDefinition(cName,
-                                AsciiType.instance,
-                                IndexType.CUSTOM,
-                                indexOptions,
-                                ByteBufferUtil.bytesToHex(cName),
-                                null, ColumnDefinition.Type.REGULAR));
-                }});
+
+        CFMetaData cfm =  standardCFMD(ksName, cfName).keyValidator(AsciiType.instance);
+
+        ByteBuffer cName = ByteBufferUtil.bytes("indexed");
+        return cfm.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(cfm, cName, AsciiType.instance, null)
+                                                                .setIndex("indexe1", IndexType.CUSTOM, indexOptions));
     }
 
     private static void useCompression(List<KSMetaData> schema)
@@ -352,30 +367,22 @@ public class SchemaLoader
     }
     private static CFMetaData indexCFMD(String ksName, String cfName, final Boolean withIdxType) throws ConfigurationException
     {
-        return standardCFMD(ksName, cfName)
-               .keyValidator(AsciiType.instance)
-               .columnMetadata(new HashMap<ByteBuffer, ColumnDefinition>()
-                   {{
-                        ByteBuffer cName = ByteBuffer.wrap("birthdate".getBytes(Charsets.UTF_8));
-                        IndexType keys = withIdxType ? IndexType.KEYS : null;
-                        put(cName, ColumnDefinition.regularDef(cName, LongType.instance, null).setIndex(withIdxType ? ByteBufferUtil.bytesToHex(cName) : null, keys, null));
-                    }});
+        CFMetaData cfm = standardCFMD(ksName, cfName).keyValidator(AsciiType.instance);
+
+        ByteBuffer cName = ByteBufferUtil.bytes("birthdate");
+        IndexType keys = withIdxType ? IndexType.KEYS : null;
+        return cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, cName, LongType.instance, null)
+                                                       .setIndex(withIdxType ? ByteBufferUtil.bytesToHex(cName) : null, keys, null));
     }
     private static CFMetaData compositeIndexCFMD(String ksName, String cfName, final Boolean withIdxType, boolean withOldCfIds) throws ConfigurationException
     {
         final CompositeType composite = CompositeType.getInstance(Arrays.asList(new AbstractType<?>[]{UTF8Type.instance, UTF8Type.instance})); 
-        return new CFMetaData(ksName,
-                cfName,
-                ColumnFamilyType.Standard,
-                composite,
-                null)
-               .columnMetadata(new HashMap<ByteBuffer, ColumnDefinition>()
-                {{
-                   ByteBuffer cName = ByteBuffer.wrap("col1".getBytes(Charsets.UTF_8));
-                   IndexType idxType = withIdxType ? IndexType.COMPOSITES : null;
-                   put(cName, ColumnDefinition.regularDef(cName, UTF8Type.instance, 1)
-                                              .setIndex(withIdxType ? "col1_idx" : null, idxType, Collections.<String, String>emptyMap()));
-                }});
+        CFMetaData cfm = new CFMetaData(ksName, cfName, ColumnFamilyType.Standard, composite, null);
+
+        ByteBuffer cName = ByteBufferUtil.bytes("col1");
+        IndexType idxType = withIdxType ? IndexType.COMPOSITES : null;
+        return cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, cName, UTF8Type.instance, 1)
+                                                       .setIndex(withIdxType ? "col1_idx" : null, idxType, Collections.<String, String>emptyMap()));
     }
     
     private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java b/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
index 151df61..d9f5a4f 100644
--- a/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
+++ b/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
@@ -23,8 +23,9 @@ package org.apache.cassandra.config;
 
 import org.junit.Test;
 
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnDefinitionTest
@@ -32,18 +33,20 @@ public class ColumnDefinitionTest
     @Test
     public void testSerializeDeserialize() throws Exception
     {
-        ColumnDefinition cd0 = ColumnDefinition.regularDef(ByteBufferUtil.bytes("TestColumnDefinitionName0"), BytesType.instance, null)
+        CFMetaData cfm = new CFMetaData("ks", "cf", ColumnFamilyType.Standard, UTF8Type.instance);
+
+        ColumnDefinition cd0 = ColumnDefinition.regularDef(cfm, ByteBufferUtil.bytes("TestColumnDefinitionName0"), BytesType.instance, null)
                                                .setIndex("random index name 0", IndexType.KEYS, null);
 
-        ColumnDefinition cd1 = ColumnDefinition.regularDef(ByteBufferUtil.bytes("TestColumnDefinition1"), LongType.instance, null);
+        ColumnDefinition cd1 = ColumnDefinition.regularDef(cfm, ByteBufferUtil.bytes("TestColumnDefinition1"), LongType.instance, null);
 
-        testSerializeDeserialize(cd0);
-        testSerializeDeserialize(cd1);
+        testSerializeDeserialize(cfm, cd0);
+        testSerializeDeserialize(cfm, cd1);
     }
 
-    protected void testSerializeDeserialize(ColumnDefinition cd) throws Exception
+    protected void testSerializeDeserialize(CFMetaData cfm, ColumnDefinition cd) throws Exception
     {
-        ColumnDefinition newCd = ColumnDefinition.fromThrift(cd.toThrift(), false);
+        ColumnDefinition newCd = ColumnDefinition.fromThrift(cfm, cd.toThrift());
         assert cd != newCd;
         assert cd.hashCode() == newCd.hashCode();
         assert cd.equals(newCd);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index a8c81dd..754c029 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -50,25 +50,24 @@ public class DefsTest extends SchemaLoader
     @Test
     public void testCFMetaDataApply() throws ConfigurationException
     {
-        Map<ByteBuffer, ColumnDefinition> indexes = new HashMap<ByteBuffer, ColumnDefinition>();
-        for (int i = 0; i < 5; i++)
-        {
-            ByteBuffer name = ByteBuffer.wrap(new byte[] { (byte)i });
-            indexes.put(name, ColumnDefinition.regularDef(name, BytesType.instance, null).setIndex(Integer.toString(i), IndexType.KEYS, null));
-        }
         CFMetaData cfm = new CFMetaData("Keyspace1",
                                         "TestApplyCFM_CF",
                                         ColumnFamilyType.Standard,
                                         BytesType.instance);
 
+        for (int i = 0; i < 5; i++)
+        {
+            ByteBuffer name = ByteBuffer.wrap(new byte[] { (byte)i });
+            cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, name, BytesType.instance, null).setIndex(Integer.toString(i), IndexType.KEYS, null));
+        }
+
         cfm.comment("No comment")
            .readRepairChance(0.5)
            .replicateOnWrite(false)
            .gcGraceSeconds(100000)
            .defaultValidator(null)
            .minCompactionThreshold(500)
-           .maxCompactionThreshold(500)
-           .columnMetadata(indexes);
+           .maxCompactionThreshold(500);
 
         // we'll be adding this one later. make sure it's not already there.
         assert cfm.getColumnDefinition(ByteBuffer.wrap(new byte[] { 5 })) == null;
@@ -76,18 +75,18 @@ public class DefsTest extends SchemaLoader
         CFMetaData cfNew = cfm.clone();
 
         // add one.
-        ColumnDefinition addIndexDef = ColumnDefinition.regularDef(ByteBuffer.wrap(new byte[] { 5 }), BytesType.instance, null)
+        ColumnDefinition addIndexDef = ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(new byte[] { 5 }), BytesType.instance, null)
                                                        .setIndex("5", IndexType.KEYS, null);
         cfNew.addColumnDefinition(addIndexDef);
 
         // remove one.
-        ColumnDefinition removeIndexDef = ColumnDefinition.regularDef(ByteBuffer.wrap(new byte[] { 0 }), BytesType.instance, null)
+        ColumnDefinition removeIndexDef = ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(new byte[] { 0 }), BytesType.instance, null)
                                                           .setIndex("0", IndexType.KEYS, null);
         assert cfNew.removeColumnDefinition(removeIndexDef);
 
         cfm.apply(cfNew);
 
-        for (int i = 1; i < indexes.size(); i++)
+        for (int i = 1; i < cfm.allColumns().size(); i++)
             assert cfm.getColumnDefinition(ByteBuffer.wrap(new byte[] { 1 })) != null;
         assert cfm.getColumnDefinition(ByteBuffer.wrap(new byte[] { 0 })) == null;
         assert cfm.getColumnDefinition(ByteBuffer.wrap(new byte[] { 5 })) != null;
@@ -513,7 +512,7 @@ public class DefsTest extends SchemaLoader
         // drop the index
         CFMetaData meta = cfs.metadata.clone();
         ColumnDefinition cdOld = meta.regularColumns().iterator().next();
-        ColumnDefinition cdNew = ColumnDefinition.regularDef(cdOld.name, cdOld.getValidator(), null);
+        ColumnDefinition cdNew = ColumnDefinition.regularDef(meta, cdOld.name.bytes, cdOld.type, null);
         meta.addOrReplaceColumnDefinition(cdNew);
         MigrationManager.announceColumnFamilyUpdate(meta, false);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 51f4a78..32bc7df 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -655,11 +655,11 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Indexed2");
         ColumnDefinition old = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        ColumnDefinition cd = ColumnDefinition.regularDef(old.name, old.getValidator(), null).setIndex("birthdate_index", IndexType.KEYS, null);
+        ColumnDefinition cd = ColumnDefinition.regularDef(cfs.metadata, old.name.bytes, old.type, null).setIndex("birthdate_index", IndexType.KEYS, null);
         Future<?> future = cfs.indexManager.addIndexedColumn(cd);
         future.get();
         // we had a bug (CASSANDRA-2244) where index would get created but not flushed -- check for that
-        assert cfs.indexManager.getIndexForColumn(cd.name).getIndexCfs().getSSTables().size() > 0;
+        assert cfs.indexManager.getIndexForColumn(cd.name.bytes).getIndexCfs().getSSTables().size() > 0;
 
         queryBirthdate(keyspace);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 4f6478c..e324dd2 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -271,13 +271,8 @@ public class RangeTombstoneTest extends SchemaLoader
         cfs.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
         if (cfs.indexManager.getIndexForColumn(indexedColumnName) == null)
         {
-            ColumnDefinition cd = new ColumnDefinition(indexedColumnName,
-                                                       cfs.getComparator(),
-                                                       IndexType.CUSTOM,
-                                                       ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()),
-                                                       "test_index",
-                                                       0,
-                                                       null);
+            ColumnDefinition cd = ColumnDefinition.regularDef(cfs.metadata, indexedColumnName, cfs.getComparator(), 0)
+                                                  .setIndex("test_index", IndexType.CUSTOM, ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()));
             cfs.indexManager.addIndexedColumn(cd);
         }
 
@@ -315,13 +310,8 @@ public class RangeTombstoneTest extends SchemaLoader
         cfs.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
         if (cfs.indexManager.getIndexForColumn(indexedColumnName) == null)
         {
-            ColumnDefinition cd = new ColumnDefinition(indexedColumnName,
-                                                       cfs.getComparator(),
-                                                       IndexType.CUSTOM,
-                                                       ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()),
-                                                       "test_index",
-                                                       0,
-                                                       null);
+            ColumnDefinition cd = ColumnDefinition.regularDef(cfs.metadata, indexedColumnName, cfs.getComparator(), 0)
+                                                  .setIndex("test_index", IndexType.CUSTOM, ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()));
             cfs.indexManager.addIndexedColumn(cd);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java b/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java
index fdf0ebb..35d851e 100644
--- a/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java
+++ b/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java
@@ -57,7 +57,7 @@ public class ThriftValidationTest extends SchemaLoader
         // should not throw IRE here
         try
         {
-            newMetadata.addColumnDefinition(ColumnDefinition.partitionKeyDef(AsciiType.instance.decompose("id"), UTF8Type.instance, null));
+            newMetadata.addColumnDefinition(ColumnDefinition.partitionKeyDef(metaData, AsciiType.instance.decompose("id"), UTF8Type.instance, null));
             newMetadata.validate();
         }
         catch (ConfigurationException e)
@@ -73,7 +73,7 @@ public class ThriftValidationTest extends SchemaLoader
         // add a column with name = "id"
         try
         {
-            newMetadata.addColumnDefinition(ColumnDefinition.regularDef(ByteBufferUtil.bytes("id"), UTF8Type.instance, null));
+            newMetadata.addColumnDefinition(ColumnDefinition.regularDef(metaData, ByteBufferUtil.bytes("id"), UTF8Type.instance, null));
             newMetadata.validate();
         }
         catch (ConfigurationException e)


[2/4] Remove CFDefinition

Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 918f043..8cec64e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -24,6 +24,7 @@ import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
@@ -57,17 +58,17 @@ public class SelectStatement implements CQLStatement
     private static final int DEFAULT_COUNT_PAGE_SIZE = 10000;
 
     private final int boundTerms;
-    public final CFDefinition cfDef;
+    public final CFMetaData cfm;
     public final Parameters parameters;
     private final Selection selection;
     private final Term limit;
 
     private final Restriction[] keyRestrictions;
     private final Restriction[] columnRestrictions;
-    private final Map<CFDefinition.Name, Restriction> metadataRestrictions = new HashMap<CFDefinition.Name, Restriction>();
+    private final Map<ColumnIdentifier, Restriction> metadataRestrictions = new HashMap<ColumnIdentifier, Restriction>();
 
-    // The name of all restricted names not covered by the key or index filter
-    private final Set<CFDefinition.Name> restrictedNames = new HashSet<CFDefinition.Name>();
+    // All restricted columns not covered by the key or index filter
+    private final Set<ColumnDefinition> restrictedColumns = new HashSet<ColumnDefinition>();
     private Restriction.Slice sliceRestriction;
 
     private boolean isReversed;
@@ -76,18 +77,18 @@ public class SelectStatement implements CQLStatement
     private boolean keyIsInRelation;
     private boolean usesSecondaryIndexing;
 
-    private Map<CFDefinition.Name, Integer> orderingIndexes;
+    private Map<ColumnDefinition, Integer> orderingIndexes;
 
     // Used by forSelection below
     private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier, Boolean>emptyMap(), false, false, null, false);
 
-    public SelectStatement(CFDefinition cfDef, int boundTerms, Parameters parameters, Selection selection, Term limit)
+    public SelectStatement(CFMetaData cfm, int boundTerms, Parameters parameters, Selection selection, Term limit)
     {
-        this.cfDef = cfDef;
+        this.cfm = cfm;
         this.boundTerms = boundTerms;
         this.selection = selection;
-        this.keyRestrictions = new Restriction[cfDef.keys.size()];
-        this.columnRestrictions = new Restriction[cfDef.columns.size()];
+        this.keyRestrictions = new Restriction[cfm.partitionKeyColumns().size()];
+        this.columnRestrictions = new Restriction[cfm.clusteringColumns().size()];
         this.parameters = parameters;
         this.limit = limit;
     }
@@ -95,9 +96,9 @@ public class SelectStatement implements CQLStatement
     // Creates a simple select based on the given selection.
     // Note that the results select statement should not be used for actual queries, but only for processing already
     // queried data through processColumnFamily.
-    static SelectStatement forSelection(CFDefinition cfDef, Selection selection)
+    static SelectStatement forSelection(CFMetaData cfm, Selection selection)
     {
-        return new SelectStatement(cfDef, 0, defaultParameters, selection, null);
+        return new SelectStatement(cfm, 0, defaultParameters, selection, null);
     }
 
     public ResultSet.Metadata getResultMetadata()
@@ -244,12 +245,12 @@ public class SelectStatement implements CQLStatement
 
     public String keyspace()
     {
-        return cfDef.cfm.ksName;
+        return cfm.ksName;
     }
 
     public String columnFamily()
     {
-        return cfDef.cfm.cfName;
+        return cfm.cfName;
     }
 
     private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
@@ -363,7 +364,7 @@ public class SelectStatement implements CQLStatement
             // to account for the grouping of columns.
             // Since that doesn't work for maps/sets/lists, we now use the compositesToGroup option of SliceQueryFilter.
             // But we must preserve backward compatibility too (for mixed version cluster that is).
-            int toGroup = cfDef.isCompact ? -1 : cfDef.columns.size();
+            int toGroup = cfm.isDense() ? -1 : cfm.clusteringColumns().size();
             List<ByteBuffer> startBounds = getRequestedBound(Bound.START, variables);
             List<ByteBuffer> endBounds = getRequestedBound(Bound.END, variables);
             assert startBounds.size() == endBounds.size();
@@ -373,7 +374,7 @@ public class SelectStatement implements CQLStatement
             if (startBounds.size() == 1)
             {
                 ColumnSlice slice = new ColumnSlice(startBounds.get(0), endBounds.get(0));
-                if (slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
+                if (slice.isAlwaysEmpty(cfm.comparator, isReversed))
                     return null;
                 slices = new ColumnSlice[]{slice};
             }
@@ -383,7 +384,7 @@ public class SelectStatement implements CQLStatement
                 for (int i = 0; i < startBounds.size(); i++)
                 {
                     ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i));
-                    if (!slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
+                    if (!slice.isAlwaysEmpty(cfm.comparator, isReversed))
                         l.add(slice);
                 }
                 if (l.isEmpty())
@@ -437,10 +438,10 @@ public class SelectStatement implements CQLStatement
     private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-        ColumnNameBuilder builder = cfDef.getKeyNameBuilder();
-        for (CFDefinition.Name name : cfDef.keys.values())
+        ColumnNameBuilder builder = cfm.getKeyNameBuilder();
+        for (ColumnDefinition def : cfm.partitionKeyColumns())
         {
-            Restriction r = keyRestrictions[name.position];
+            Restriction r = keyRestrictions[def.position()];
             assert r != null && !r.isSlice();
 
             List<ByteBuffer> values = r.values(variables);
@@ -450,7 +451,7 @@ public class SelectStatement implements CQLStatement
                 for (ByteBuffer val : values)
                 {
                     if (val == null)
-                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
                     keys.add(builder.copy().add(val).build());
                 }
             }
@@ -461,7 +462,7 @@ public class SelectStatement implements CQLStatement
                     throw new InvalidRequestException("IN is only supported on the last column of the partition key");
                 ByteBuffer val = values.get(0);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
                 builder.add(val);
             }
         }
@@ -477,7 +478,7 @@ public class SelectStatement implements CQLStatement
                 return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         // We deal with IN queries for keys in other places, so we know buildBound will return only one result
-        return buildBound(b, cfDef.keys.values(), keyRestrictions, false, cfDef.getKeyNameBuilder(), variables).get(0);
+        return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyNameBuilder(), variables).get(0);
     }
 
     private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
@@ -519,10 +520,10 @@ public class SelectStatement implements CQLStatement
 
     private boolean isColumnRange()
     {
-        // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not compact, composite).
-        // Static CF (non compact but non composite) never entails a column slice however
-        if (!cfDef.isCompact)
-            return cfDef.isComposite;
+        // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
+        // Static CF (non dense but non composite) never entails a column slice however
+        if (!cfm.isDense())
+            return cfm.hasCompositeComparator();
 
         // Otherwise (i.e. for compact table where we don't have a row marker anyway and thus don't care about CASSANDRA-5762),
         // it is a range query if it has at least one the column alias for which no relation is defined or is not EQ.
@@ -538,11 +539,11 @@ public class SelectStatement implements CQLStatement
     {
         assert !isColumnRange();
 
-        ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
-        Iterator<ColumnIdentifier> idIter = cfDef.columns.keySet().iterator();
+        ColumnNameBuilder builder = cfm.getColumnNameBuilder();
+        Iterator<ColumnDefinition> idIter = cfm.clusteringColumns().iterator();
         for (Restriction r : columnRestrictions)
         {
-            ColumnIdentifier id = idIter.next();
+            ColumnIdentifier id = idIter.next().name;
             assert r != null && !r.isSlice();
 
             List<ByteBuffer> values = r.values(variables);
@@ -560,7 +561,7 @@ public class SelectStatement implements CQLStatement
                 // for each value of the IN, creates all the columns corresponding to the selection.
                 if (values.isEmpty())
                     return null;
-                SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfDef.cfm.comparator);
+                SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfm.comparator);
                 Iterator<ByteBuffer> iter = values.iterator();
                 while (iter.hasNext())
                 {
@@ -569,7 +570,7 @@ public class SelectStatement implements CQLStatement
                     if (val == null)
                         throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", id));
                     b.add(val);
-                    if (cfDef.isCompact)
+                    if (cfm.isDense())
                         columns.add(b.build());
                     else
                         columns.addAll(addSelectedColumns(b));
@@ -583,7 +584,7 @@ public class SelectStatement implements CQLStatement
 
     private SortedSet<ByteBuffer> addSelectedColumns(ColumnNameBuilder builder)
     {
-        if (cfDef.isCompact)
+        if (cfm.isDense())
         {
             return FBUtilities.singleton(builder.build());
         }
@@ -593,29 +594,29 @@ public class SelectStatement implements CQLStatement
             // non-know set of columns, so we shouldn't get there
             assert !selectACollection();
 
-            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfDef.cfm.comparator);
+            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfm.comparator);
 
             // We need to query the selected column as well as the marker
             // column (for the case where the row exists but has no columns outside the PK)
             // Two exceptions are "static CF" (non-composite non-compact CF) and "super CF"
             // that don't have marker and for which we must query all columns instead
-            if (cfDef.isComposite && !cfDef.cfm.isSuper())
+            if (cfm.hasCompositeComparator() && !cfm.isSuper())
             {
                 // marker
                 columns.add(builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build());
 
                 // selected columns
                 for (ColumnIdentifier id : selection.regularColumnsToFetch())
-                    columns.add(builder.copy().add(id.key).build());
+                    columns.add(builder.copy().add(id).build());
             }
             else
             {
-                Iterator<ColumnIdentifier> iter = cfDef.metadata.keySet().iterator();
+                Iterator<ColumnDefinition> iter = cfm.regularColumns().iterator();
                 while (iter.hasNext())
                 {
-                    ColumnIdentifier name = iter.next();
+                    ColumnDefinition def = iter.next();
                     ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
-                    ByteBuffer cname = b.add(name.key).build();
+                    ByteBuffer cname = b.add(def.name).build();
                     columns.add(cname);
                 }
             }
@@ -625,12 +626,12 @@ public class SelectStatement implements CQLStatement
 
     private boolean selectACollection()
     {
-        if (!cfDef.hasCollections)
+        if (!cfm.hasCollections())
             return false;
 
-        for (CFDefinition.Name name : selection.getColumnsList())
+        for (ColumnDefinition def : selection.getColumnsList())
         {
-            if (name.type instanceof CollectionType)
+            if (def.type instanceof CollectionType)
                 return true;
         }
 
@@ -638,7 +639,7 @@ public class SelectStatement implements CQLStatement
     }
 
     private List<ByteBuffer> buildBound(Bound bound,
-                                        Collection<CFDefinition.Name> names,
+                                        Collection<ColumnDefinition> defs,
                                         Restriction[] restrictions,
                                         boolean isReversed,
                                         ColumnNameBuilder builder,
@@ -649,13 +650,13 @@ public class SelectStatement implements CQLStatement
         // to the component comparator but not to the end-of-component itself),
         // it only depends on whether the slice is reversed
         Bound eocBound = isReversed ? Bound.reverse(bound) : bound;
-        for (CFDefinition.Name name : names)
+        for (ColumnDefinition def : defs)
         {
             // In a restriction, we always have Bound.START < Bound.END for the "base" comparator.
             // So if we're doing a reverse slice, we must inverse the bounds when giving them as start and end of the slice filter.
             // But if the actual comparator itself is reversed, we must inversed the bounds too.
-            Bound b = isReversed == isReversedType(name) ? bound : Bound.reverse(bound);
-            Restriction r = restrictions[name.position];
+            Bound b = isReversed == isReversedType(def) ? bound : Bound.reverse(bound);
+            Restriction r = restrictions[def.position()];
             if (r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b)))
             {
                 // There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
@@ -672,7 +673,7 @@ public class SelectStatement implements CQLStatement
                 assert slice.hasBound(b);
                 ByteBuffer val = slice.bound(b, variables);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null clustering key part %s", name));
+                    throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
                 return Collections.singletonList(builder.add(val, slice.getRelation(eocBound, b)).build());
             }
             else
@@ -681,14 +682,14 @@ public class SelectStatement implements CQLStatement
                 if (values.size() != 1)
                 {
                     // IN query, we only support it on the clustering column
-                    assert name.position == names.size() - 1;
+                    assert def.position() == defs.size() - 1;
                     // The IN query might not have listed the values in comparator order, so we need to re-sort
                     // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
-                    TreeSet<ByteBuffer> s = new TreeSet<ByteBuffer>(isReversed ? cfDef.cfm.comparator.reverseComparator : cfDef.cfm.comparator);
+                    TreeSet<ByteBuffer> s = new TreeSet<ByteBuffer>(isReversed ? cfm.comparator.reverseComparator : cfm.comparator);
                     for (ByteBuffer val : values)
                     {
                         if (val == null)
-                            throw new InvalidRequestException(String.format("Invalid null clustering key part %s", name));
+                            throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
                         ColumnNameBuilder copy = builder.copy().add(val);
                         // See below for why this
                         s.add((bound == Bound.END && copy.remainingCount() > 0) ? copy.buildAsEndOfRange() : copy.build());
@@ -698,7 +699,7 @@ public class SelectStatement implements CQLStatement
 
                 ByteBuffer val = values.get(0);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null clustering key part %s", name));
+                    throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
                 builder.add(val);
             }
         }
@@ -714,31 +715,31 @@ public class SelectStatement implements CQLStatement
     private List<ByteBuffer> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
     {
         assert isColumnRange();
-        return buildBound(b, cfDef.columns.values(), columnRestrictions, isReversed, cfDef.getColumnNameBuilder(), variables);
+        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.getColumnNameBuilder(), variables);
     }
 
     public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
     {
-        if (!usesSecondaryIndexing || restrictedNames.isEmpty())
+        if (!usesSecondaryIndexing || restrictedColumns.isEmpty())
             return Collections.emptyList();
 
         List<IndexExpression> expressions = new ArrayList<IndexExpression>();
-        for (CFDefinition.Name name : restrictedNames)
+        for (ColumnDefinition def : restrictedColumns)
         {
             Restriction restriction;
-            switch (name.kind)
+            switch (def.kind)
             {
-                case KEY_ALIAS:
-                    restriction = keyRestrictions[name.position];
+                case PARTITION_KEY:
+                    restriction = keyRestrictions[def.position()];
                     break;
-                case COLUMN_ALIAS:
-                    restriction = columnRestrictions[name.position];
+                case CLUSTERING_COLUMN:
+                    restriction = columnRestrictions[def.position()];
                     break;
-                case COLUMN_METADATA:
-                    restriction = metadataRestrictions.get(name);
+                case REGULAR:
+                    restriction = metadataRestrictions.get(def.name);
                     break;
                 default:
-                    // We don't allow restricting a VALUE_ALIAS for now in prepare.
+                    // We don't allow restricting a COMPACT_VALUE for now in prepare.
                     throw new AssertionError();
             }
 
@@ -751,10 +752,10 @@ public class SelectStatement implements CQLStatement
                     {
                         ByteBuffer value = slice.bound(b, variables);
                         if (value == null)
-                            throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", name));
+                            throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
                         if (value.remaining() > 0xFFFF)
                             throw new InvalidRequestException("Index expression values may not be larger than 64K");
-                        expressions.add(new IndexExpression(name.name.key, slice.getIndexOperator(b), value));
+                        expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value));
                     }
                 }
             }
@@ -767,10 +768,10 @@ public class SelectStatement implements CQLStatement
 
                 ByteBuffer value = values.get(0);
                 if (value == null)
-                    throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", name));
+                    throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
                 if (value.remaining() > 0xFFFF)
                     throw new InvalidRequestException("Index expression values may not be larger than 64K");
-                expressions.add(new IndexExpression(name.name.key, IndexExpression.Operator.EQ, value));
+                expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.EQ, value));
             }
         }
         return expressions;
@@ -788,11 +789,10 @@ public class SelectStatement implements CQLStatement
         if (last == null || last.isSlice())
             return cf.getSortedColumns();
 
-        ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
+        ColumnNameBuilder builder = cfm.getColumnNameBuilder();
         for (int i = 0; i < columnRestrictions.length - 1; i++)
             builder.add(columnRestrictions[i].values(variables).get(0));
 
-
         List<ByteBuffer> values = last.values(variables);
         final List<ByteBuffer> requested = new ArrayList<ByteBuffer>(values.size());
         Iterator<ByteBuffer> iter = values.iterator();
@@ -851,8 +851,8 @@ public class SelectStatement implements CQLStatement
     void processColumnFamily(ByteBuffer key, ColumnFamily cf, List<ByteBuffer> variables, long now, Selection.ResultSetBuilder result)
     throws InvalidRequestException
     {
-        ByteBuffer[] keyComponents = cfDef.hasCompositeKey
-                                   ? ((CompositeType)cfDef.cfm.getKeyValidator()).split(key)
+        ByteBuffer[] keyComponents = cfm.getKeyValidator() instanceof CompositeType
+                                   ? ((CompositeType)cfm.getKeyValidator()).split(key)
                                    : new ByteBuffer[]{ key };
 
         if (parameters.isDistinct)
@@ -861,11 +861,11 @@ public class SelectStatement implements CQLStatement
             {
                 result.newRow();
                 // selection.getColumnsList() will contain only the partition key components - all of them.
-                for (CFDefinition.Name name : selection.getColumnsList())
-                    result.add(keyComponents[name.position]);
+                for (ColumnDefinition def : selection.getColumnsList())
+                    result.add(keyComponents[def.position()]);
             }
         }
-        else if (cfDef.isCompact)
+        else if (cfm.isDense())
         {
             // One cqlRow per column
             for (Column c : columnsInOrder(cf, variables))
@@ -874,9 +874,9 @@ public class SelectStatement implements CQLStatement
                     continue;
 
                 ByteBuffer[] components = null;
-                if (cfDef.isComposite)
+                if (cfm.hasCompositeComparator())
                 {
-                    components = ((CompositeType)cfDef.cfm.comparator).split(c.name());
+                    components = ((CompositeType)cfm.comparator).split(c.name());
                 }
                 else if (sliceRestriction != null)
                 {
@@ -889,23 +889,23 @@ public class SelectStatement implements CQLStatement
 
                 result.newRow();
                 // Respect selection order
-                for (CFDefinition.Name name : selection.getColumnsList())
+                for (ColumnDefinition def : selection.getColumnsList())
                 {
-                    switch (name.kind)
+                    switch (def.kind)
                     {
-                        case KEY_ALIAS:
-                            result.add(keyComponents[name.position]);
+                        case PARTITION_KEY:
+                            result.add(keyComponents[def.position()]);
                             break;
-                        case COLUMN_ALIAS:
-                            ByteBuffer val = cfDef.isComposite
-                                           ? (name.position < components.length ? components[name.position] : null)
+                        case CLUSTERING_COLUMN:
+                            ByteBuffer val = cfm.hasCompositeComparator()
+                                           ? (def.position() < components.length ? components[def.position()] : null)
                                            : c.name();
                             result.add(val);
                             break;
-                        case VALUE_ALIAS:
+                        case COMPACT_VALUE:
                             result.add(c);
                             break;
-                        case COLUMN_METADATA:
+                        case REGULAR:
                             // This should not happen for compact CF
                             throw new AssertionError();
                         default:
@@ -914,12 +914,12 @@ public class SelectStatement implements CQLStatement
                 }
             }
         }
-        else if (cfDef.isComposite)
+        else if (cfm.hasCompositeComparator())
         {
             // Sparse case: group column in cqlRow when composite prefix is equal
-            CompositeType composite = (CompositeType)cfDef.cfm.comparator;
+            CompositeType composite = (CompositeType)cfm.comparator;
 
-            ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections, now);
+            ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfm.hasCollections(), now);
 
             for (Column c : cf)
             {
@@ -939,12 +939,12 @@ public class SelectStatement implements CQLStatement
 
             // Static case: One cqlRow for all columns
             result.newRow();
-            for (CFDefinition.Name name : selection.getColumnsList())
+            for (ColumnDefinition def : selection.getColumnsList())
             {
-                if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS)
-                    result.add(keyComponents[name.position]);
+                if (def.kind == ColumnDefinition.Kind.PARTITION_KEY)
+                    result.add(keyComponents[def.position()]);
                 else
-                    result.add(cf.getColumn(name.name.key));
+                    result.add(cf.getColumn(def.name.bytes));
             }
         }
     }
@@ -967,7 +967,7 @@ public class SelectStatement implements CQLStatement
         // because there is no point of using composite comparator if there is only one order condition
         if (parameters.orderings.size() == 1)
         {
-            CFDefinition.Name ordering = cfDef.get(parameters.orderings.keySet().iterator().next());
+            ColumnDefinition ordering = cfm.getColumnDefinition(parameters.orderings.keySet().iterator().next());
             Collections.sort(cqlRows.rows, new SingleColumnComparator(orderingIndexes.get(ordering), ordering.type));
             return;
         }
@@ -981,7 +981,7 @@ public class SelectStatement implements CQLStatement
         int idx = 0;
         for (ColumnIdentifier identifier : parameters.orderings.keySet())
         {
-            CFDefinition.Name orderingColumn = cfDef.get(identifier);
+            ColumnDefinition orderingColumn = cfm.getColumnDefinition(identifier);
             types.add(orderingColumn.type);
             positions[idx++] = orderingIndexes.get(orderingColumn);
         }
@@ -993,40 +993,40 @@ public class SelectStatement implements CQLStatement
     {
         // Respect requested order
         result.newRow();
-        for (CFDefinition.Name name : selection.getColumnsList())
+        for (ColumnDefinition def : selection.getColumnsList())
         {
-            switch (name.kind)
+            switch (def.kind)
             {
-                case KEY_ALIAS:
-                    result.add(keyComponents[name.position]);
+                case PARTITION_KEY:
+                    result.add(keyComponents[def.position()]);
                     break;
-                case COLUMN_ALIAS:
-                    result.add(columns.getKeyComponent(name.position));
+                case CLUSTERING_COLUMN:
+                    result.add(columns.getKeyComponent(def.position()));
                     break;
-                case VALUE_ALIAS:
+                case COMPACT_VALUE:
                     // This should not happen for SPARSE
                     throw new AssertionError();
-                case COLUMN_METADATA:
-                    if (name.type.isCollection())
+                case REGULAR:
+                    if (def.type.isCollection())
                     {
-                        List<Pair<ByteBuffer, Column>> collection = columns.getCollection(name.name.key);
+                        List<Pair<ByteBuffer, Column>> collection = columns.getCollection(def.name.bytes);
                         ByteBuffer value = collection == null
                                          ? null
-                                         : ((CollectionType)name.type).serialize(collection);
+                                         : ((CollectionType)def.type).serialize(collection);
                         result.add(value);
                     }
                     else
                     {
-                        result.add(columns.getSimple(name.name.key));
+                        result.add(columns.getSimple(def.name.bytes));
                     }
                     break;
             }
         }
     }
 
-    private static boolean isReversedType(CFDefinition.Name name)
+    private static boolean isReversedType(ColumnDefinition def)
     {
-        return name.type instanceof ReversedType;
+        return def.type instanceof ReversedType;
     }
 
     private boolean columnFilterIsIdentity()
@@ -1059,8 +1059,6 @@ public class SelectStatement implements CQLStatement
         {
             CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
 
-            CFDefinition cfDef = cfm.getCfDef();
-
             VariableSpecifications names = getBoundsVariables();
 
             // Select clause
@@ -1068,11 +1066,11 @@ public class SelectStatement implements CQLStatement
                 throw new InvalidRequestException("Only COUNT(*) and COUNT(1) operations are currently supported.");
 
             Selection selection = selectClause.isEmpty()
-                                ? Selection.wildcard(cfDef)
-                                : Selection.fromSelectors(cfDef, selectClause);
+                                ? Selection.wildcard(cfm)
+                                : Selection.fromSelectors(cfm, selectClause);
 
             if (parameters.isDistinct)
-                validateDistinctSelection(selection.getColumnsList(), cfDef.keys.values());
+                validateDistinctSelection(selection.getColumnsList(), cfm.partitionKeyColumns());
 
             Term prepLimit = null;
             if (limit != null)
@@ -1081,7 +1079,7 @@ public class SelectStatement implements CQLStatement
                 prepLimit.collectMarkerSpecification(names);
             }
 
-            SelectStatement stmt = new SelectStatement(cfDef, names.size(), parameters, selection, prepLimit);
+            SelectStatement stmt = new SelectStatement(cfm, names.size(), parameters, selection, prepLimit);
 
             /*
              * WHERE clause. For a given entity, rules are:
@@ -1095,8 +1093,8 @@ public class SelectStatement implements CQLStatement
             boolean hasQueriableClusteringColumnIndex = false;
             for (Relation rel : whereClause)
             {
-                CFDefinition.Name name = cfDef.get(rel.getEntity());
-                if (name == null)
+                ColumnDefinition def = cfm.getColumnDefinition(rel.getEntity());
+                if (def == null)
                 {
                     if (containsAlias(rel.getEntity()))
                         throw new InvalidRequestException(String.format("Aliases aren't allowed in where clause ('%s')", rel));
@@ -1104,32 +1102,31 @@ public class SelectStatement implements CQLStatement
                         throw new InvalidRequestException(String.format("Undefined name %s in where clause ('%s')", rel.getEntity(), rel));
                 }
 
-                ColumnDefinition def = cfDef.cfm.getColumnDefinition(name.name.key);
-                stmt.restrictedNames.add(name);
+                stmt.restrictedColumns.add(def);
                 if (def.isIndexed() && rel.operator() == Relation.Type.EQ)
                 {
                     hasQueriableIndex = true;
-                    if (name.kind == CFDefinition.Name.Kind.COLUMN_ALIAS)
+                    if (def.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
                         hasQueriableClusteringColumnIndex = true;
                 }
 
-                switch (name.kind)
+                switch (def.kind)
                 {
-                    case KEY_ALIAS:
-                        stmt.keyRestrictions[name.position] = updateRestriction(name, stmt.keyRestrictions[name.position], rel, names);
+                    case PARTITION_KEY:
+                        stmt.keyRestrictions[def.position()] = updateRestriction(def, stmt.keyRestrictions[def.position()], rel, names);
                         break;
-                    case COLUMN_ALIAS:
-                        stmt.columnRestrictions[name.position] = updateRestriction(name, stmt.columnRestrictions[name.position], rel, names);
+                    case CLUSTERING_COLUMN:
+                        stmt.columnRestrictions[def.position()] = updateRestriction(def, stmt.columnRestrictions[def.position()], rel, names);
                         break;
-                    case VALUE_ALIAS:
-                        throw new InvalidRequestException(String.format("Predicates on the non-primary-key column (%s) of a COMPACT table are not yet supported", name.name));
-                    case COLUMN_METADATA:
+                    case COMPACT_VALUE:
+                        throw new InvalidRequestException(String.format("Predicates on the non-primary-key column (%s) of a COMPACT table are not yet supported", def.name));
+                    case REGULAR:
                         // We only all IN on the row key and last clustering key so far, never on non-PK columns, and this even if there's an index
-                        Restriction r = updateRestriction(name, stmt.metadataRestrictions.get(name), rel, names);
+                        Restriction r = updateRestriction(def, stmt.metadataRestrictions.get(def), rel, names);
                         if (r.isIN() && !((Restriction.IN)r).canHaveOnlyOneValue())
                             // Note: for backward compatibility reason, we conside a IN of 1 value the same as a EQ, so we let that slide.
-                            throw new InvalidRequestException(String.format("IN predicates on non-primary-key columns (%s) is not yet supported", name));
-                        stmt.metadataRestrictions.put(name, r);
+                            throw new InvalidRequestException(String.format("IN predicates on non-primary-key columns (%s) is not yet supported", def.name));
+                        stmt.metadataRestrictions.put(def.name, r);
                         break;
                 }
             }
@@ -1145,12 +1142,12 @@ public class SelectStatement implements CQLStatement
             // If a component of the partition key is restricted by a relation, all preceding
             // components must have a EQ. Only the last partition key component can be in IN relation.
             boolean canRestrictFurtherComponents = true;
-            CFDefinition.Name previous = null;
+            ColumnDefinition previous = null;
             stmt.keyIsInRelation = false;
-            Iterator<CFDefinition.Name> iter = cfDef.keys.values().iterator();
+            Iterator<ColumnDefinition> iter = cfm.partitionKeyColumns().iterator();
             for (int i = 0; i < stmt.keyRestrictions.length; i++)
             {
-                CFDefinition.Name cname = iter.next();
+                ColumnDefinition cdef = iter.next();
                 Restriction restriction = stmt.keyRestrictions[i];
 
                 if (restriction == null)
@@ -1167,7 +1164,7 @@ public class SelectStatement implements CQLStatement
                             stmt.isKeyRange = true;
                             break;
                         }
-                        throw new InvalidRequestException(String.format("Partition key part %s must be restricted since preceding part is", cname));
+                        throw new InvalidRequestException(String.format("Partition key part %s must be restricted since preceding part is", cdef.name));
                     }
 
                     stmt.isKeyRange = true;
@@ -1180,7 +1177,7 @@ public class SelectStatement implements CQLStatement
                         stmt.usesSecondaryIndexing = true;
                         break;
                     }
-                    throw new InvalidRequestException(String.format("partition key part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cname, previous));
+                    throw new InvalidRequestException(String.format("partition key part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cdef.name, previous));
                 }
                 else if (restriction.isOnToken())
                 {
@@ -1198,7 +1195,7 @@ public class SelectStatement implements CQLStatement
                     {
                         // We only support IN for the last name so far
                         if (i != stmt.keyRestrictions.length - 1)
-                            throw new InvalidRequestException(String.format("Partition KEY part %s cannot be restricted by IN relation (only the last part of the partition key can)", cname));
+                            throw new InvalidRequestException(String.format("Partition KEY part %s cannot be restricted by IN relation (only the last part of the partition key can)", cdef.name));
                         stmt.keyIsInRelation = true;
                     }
                 }
@@ -1208,23 +1205,23 @@ public class SelectStatement implements CQLStatement
                     // Note: In theory we could allow it for 2ndary index queries with ALLOW FILTERING, but that would probably require some special casing
                     throw new InvalidRequestException("Only EQ and IN relation are supported on the partition key (unless you use the token() function)");
                 }
-                previous = cname;
+                previous = cdef;
             }
 
             // All (or none) of the partition key columns have been specified;
             // hence there is no need to turn these restrictions into index expressions.
             if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedNames.removeAll(cfDef.keys.values());
+                stmt.restrictedColumns.removeAll(cfm.partitionKeyColumns());
 
             // If a clustering key column is restricted by a non-EQ relation, all preceding
             // columns must have a EQ, and all following must have no restriction. Unless
             // the column is indexed that is.
             canRestrictFurtherComponents = true;
             previous = null;
-            iter = cfDef.columns.values().iterator();
+            iter = cfm.clusteringColumns().iterator();
             for (int i = 0; i < stmt.columnRestrictions.length; i++)
             {
-                CFDefinition.Name cname = iter.next();
+                ColumnDefinition cdef = iter.next();
                 Restriction restriction = stmt.columnRestrictions[i];
 
                 if (restriction == null)
@@ -1238,7 +1235,7 @@ public class SelectStatement implements CQLStatement
                         stmt.usesSecondaryIndexing = true; // handle gaps and non-keyrange cases.
                         break;
                     }
-                    throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cname, previous));
+                    throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cdef.name, previous));
                 }
                 else if (restriction.isSlice())
                 {
@@ -1246,7 +1243,7 @@ public class SelectStatement implements CQLStatement
                     Restriction.Slice slice = (Restriction.Slice)restriction;
                     // For non-composite slices, we don't support internally the difference between exclusive and
                     // inclusive bounds, so we deal with it manually.
-                    if (!cfDef.isComposite && (!slice.isInclusive(Bound.START) || !slice.isInclusive(Bound.END)))
+                    if (!cfm.hasCompositeComparator() && (!slice.isInclusive(Bound.START) || !slice.isInclusive(Bound.END)))
                         stmt.sliceRestriction = slice;
                 }
                 else if (restriction.isIN())
@@ -1254,12 +1251,12 @@ public class SelectStatement implements CQLStatement
                     // We only support IN for the last name and for compact storage so far
                     // TODO: #3885 allows us to extend to non compact as well, but that remains to be done
                     if (i != stmt.columnRestrictions.length - 1)
-                        throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cname));
+                        throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cdef.name));
                     else if (stmt.selectACollection())
-                        throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cname));
+                        throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cdef.name));
                 }
 
-                previous = cname;
+                previous = cdef;
             }
 
             // Covers indexes on the first clustering column (among others).
@@ -1267,7 +1264,7 @@ public class SelectStatement implements CQLStatement
                 stmt.usesSecondaryIndexing = true;
 
             if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedNames.removeAll(cfDef.columns.values());
+                stmt.restrictedColumns.removeAll(cfm.clusteringColumns());
 
             // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
             // there is restrictions not covered by the PK.
@@ -1294,11 +1291,11 @@ public class SelectStatement implements CQLStatement
                 // queried automatically, and then removing it from the resultSet afterwards if needed)
                 if (stmt.keyIsInRelation)
                 {
-                    stmt.orderingIndexes = new HashMap<CFDefinition.Name, Integer>();
+                    stmt.orderingIndexes = new HashMap<ColumnDefinition, Integer>();
                     for (ColumnIdentifier column : stmt.parameters.orderings.keySet())
                     {
-                        final CFDefinition.Name name = cfDef.get(column);
-                        if (name == null)
+                        final ColumnDefinition def = cfm.getColumnDefinition(column);
+                        if (def == null)
                         {
                             if (containsAlias(column))
                                 throw new InvalidRequestException(String.format("Aliases are not allowed in order by clause ('%s')", column));
@@ -1308,13 +1305,14 @@ public class SelectStatement implements CQLStatement
 
                         if (selectClause.isEmpty()) // wildcard
                         {
-                            stmt.orderingIndexes.put(name, Iterables.indexOf(cfDef, new Predicate<CFDefinition.Name>()
-                                                                                    {
-                                                                                        public boolean apply(CFDefinition.Name n)
-                                                                                        {
-                                                                                            return name.equals(n);
-                                                                                        }
-                                                                                    }));
+                            stmt.orderingIndexes.put(def, Iterators.indexOf(cfm.allColumnsInSelectOrder(),
+                                                                            new Predicate<ColumnDefinition>()
+                                                                            {
+                                                                                public boolean apply(ColumnDefinition n)
+                                                                                {
+                                                                                    return def.equals(n);
+                                                                                }
+                                                                            }));
                         }
                         else
                         {
@@ -1322,9 +1320,9 @@ public class SelectStatement implements CQLStatement
                             for (int i = 0; i < selectClause.size(); i++)
                             {
                                 RawSelector selector = selectClause.get(i);
-                                if (name.name.equals(selector.selectable))
+                                if (def.name.equals(selector.selectable))
                                 {
-                                    stmt.orderingIndexes.put(name, i);
+                                    stmt.orderingIndexes.put(def, i);
                                     hasColumn = true;
                                     break;
                                 }
@@ -1336,15 +1334,15 @@ public class SelectStatement implements CQLStatement
                     }
                 }
 
-                Boolean[] reversedMap = new Boolean[cfDef.columns.size()];
+                Boolean[] reversedMap = new Boolean[cfm.clusteringColumns().size()];
                 int i = 0;
                 for (Map.Entry<ColumnIdentifier, Boolean> entry : stmt.parameters.orderings.entrySet())
                 {
                     ColumnIdentifier column = entry.getKey();
                     boolean reversed = entry.getValue();
 
-                    CFDefinition.Name name = cfDef.get(column);
-                    if (name == null)
+                    ColumnDefinition def = cfm.getColumnDefinition(column);
+                    if (def == null)
                     {
                         if (containsAlias(column))
                             throw new InvalidRequestException(String.format("Aliases are not allowed in order by clause ('%s')", column));
@@ -1352,13 +1350,13 @@ public class SelectStatement implements CQLStatement
                             throw new InvalidRequestException(String.format("Order by on unknown column %s", column));
                     }
 
-                    if (name.kind != CFDefinition.Name.Kind.COLUMN_ALIAS)
+                    if (def.kind != ColumnDefinition.Kind.CLUSTERING_COLUMN)
                         throw new InvalidRequestException(String.format("Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", column));
 
-                    if (i++ != name.position)
+                    if (i++ != def.position())
                         throw new InvalidRequestException(String.format("Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY"));
 
-                    reversedMap[name.position] = (reversed != isReversedType(name));
+                    reversedMap[def.position()] = (reversed != isReversedType(def));
                 }
 
                 // Check that all boolean in reversedMap, if set, agrees
@@ -1387,7 +1385,7 @@ public class SelectStatement implements CQLStatement
                 // We will potentially filter data if either:
                 //  - Have more than one IndexExpression
                 //  - Have no index expression and the column filter is not the identity
-                if (stmt.restrictedNames.size() > 1 || (stmt.restrictedNames.isEmpty() && !stmt.columnFilterIsIdentity()))
+                if (stmt.restrictedColumns.size() > 1 || (stmt.restrictedColumns.isEmpty() && !stmt.columnFilterIsIdentity()))
                     throw new InvalidRequestException("Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. "
                                                     + "If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING");
             }
@@ -1395,16 +1393,16 @@ public class SelectStatement implements CQLStatement
             return new ParsedStatement.Prepared(stmt, names);
         }
 
-        private void validateDistinctSelection(Collection<CFDefinition.Name> requestedColumns, Collection<CFDefinition.Name> partitionKey)
+        private void validateDistinctSelection(Collection<ColumnDefinition> requestedColumns, Collection<ColumnDefinition> partitionKey)
         throws InvalidRequestException
         {
-            for (CFDefinition.Name name : requestedColumns)
-                if (!partitionKey.contains(name))
-                    throw new InvalidRequestException(String.format("SELECT DISTINCT queries must only request partition key columns (not %s)", name));
+            for (ColumnDefinition def : requestedColumns)
+                if (!partitionKey.contains(def))
+                    throw new InvalidRequestException(String.format("SELECT DISTINCT queries must only request partition key columns (not %s)", def.name));
 
-            for (CFDefinition.Name name : partitionKey)
-                if (!requestedColumns.contains(name))
-                    throw new InvalidRequestException(String.format("SELECT DISTINCT queries must request all the partition key columns (missing %s)", name));
+            for (ColumnDefinition def : partitionKey)
+                if (!requestedColumns.contains(def))
+                    throw new InvalidRequestException(String.format("SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name));
         }
 
         private boolean containsAlias(final ColumnIdentifier name)
@@ -1423,16 +1421,16 @@ public class SelectStatement implements CQLStatement
             return new ColumnSpecification(keyspace(), columnFamily(), new ColumnIdentifier("[limit]", true), Int32Type.instance);
         }
 
-        Restriction updateRestriction(CFDefinition.Name name, Restriction restriction, Relation newRel, VariableSpecifications boundNames) throws InvalidRequestException
+        Restriction updateRestriction(ColumnDefinition def, Restriction restriction, Relation newRel, VariableSpecifications boundNames) throws InvalidRequestException
         {
-            ColumnSpecification receiver = name;
+            ColumnSpecification receiver = def;
             if (newRel.onToken)
             {
-                if (name.kind != CFDefinition.Name.Kind.KEY_ALIAS)
-                    throw new InvalidRequestException(String.format("The token() function is only supported on the partition key, found on %s", name));
+                if (def.kind != ColumnDefinition.Kind.PARTITION_KEY)
+                    throw new InvalidRequestException(String.format("The token() function is only supported on the partition key, found on %s", def.name));
 
-                receiver = new ColumnSpecification(name.ksName,
-                                                   name.cfName,
+                receiver = new ColumnSpecification(def.ksName,
+                                                   def.cfName,
                                                    new ColumnIdentifier("partition key token", true),
                                                    StorageService.getPartitioner().getTokenValidator());
             }
@@ -1442,7 +1440,7 @@ public class SelectStatement implements CQLStatement
                 case EQ:
                     {
                         if (restriction != null)
-                            throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes an Equal", name));
+                            throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes an Equal", def.name));
                         Term t = newRel.getValue().prepare(receiver);
                         t.collectMarkerSpecification(boundNames);
                         restriction = new Restriction.EQ(t, newRel.onToken);
@@ -1450,7 +1448,7 @@ public class SelectStatement implements CQLStatement
                     break;
                 case IN:
                     if (restriction != null)
-                        throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes a IN", name));
+                        throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes a IN", def.name));
 
                     if (newRel.getInValues() == null)
                     {
@@ -1480,10 +1478,10 @@ public class SelectStatement implements CQLStatement
                         if (restriction == null)
                             restriction = new Restriction.Slice(newRel.onToken);
                         else if (!restriction.isSlice())
-                            throw new InvalidRequestException(String.format("%s cannot be restricted by both an equal and an inequal relation", name));
+                            throw new InvalidRequestException(String.format("%s cannot be restricted by both an equal and an inequal relation", def.name));
                         Term t = newRel.getValue().prepare(receiver);
                         t.collectMarkerSpecification(boundNames);
-                        ((Restriction.Slice)restriction).setBound(name.name, newRel.operator(), t);
+                        ((Restriction.Slice)restriction).setBound(def.name, newRel.operator(), t);
                     }
                     break;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index 600ee1b..7f13d29 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -21,9 +21,13 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.Iterators;
+
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.CounterColumn;
 import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.db.Column;
@@ -36,12 +40,12 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class Selection
 {
-    private final List<CFDefinition.Name> columnsList;
+    private final List<ColumnDefinition> columnsList;
     private final List<ColumnSpecification> metadata;
     private final boolean collectTimestamps;
     private final boolean collectTTLs;
 
-    protected Selection(List<CFDefinition.Name> columnsList, List<ColumnSpecification> metadata, boolean collectTimestamps, boolean collectTTLs)
+    protected Selection(List<ColumnDefinition> columnsList, List<ColumnSpecification> metadata, boolean collectTimestamps, boolean collectTTLs)
     {
         this.columnsList = columnsList;
         this.metadata = metadata;
@@ -54,15 +58,14 @@ public abstract class Selection
         return new ResultSet.Metadata(metadata);
     }
 
-    public static Selection wildcard(CFDefinition cfDef)
+    public static Selection wildcard(CFMetaData cfm)
     {
-        List<CFDefinition.Name> all = new ArrayList<CFDefinition.Name>();
-        for (CFDefinition.Name name : cfDef)
-            all.add(name);
+        List<ColumnDefinition> all = new ArrayList<ColumnDefinition>(cfm.allColumns().size());
+        Iterators.addAll(all, cfm.allColumnsInSelectOrder());
         return new SimpleSelection(all);
     }
 
-    public static Selection forColumns(List<CFDefinition.Name> columnsList)
+    public static Selection forColumns(List<ColumnDefinition> columnsList)
     {
         return new SimpleSelection(columnsList);
     }
@@ -77,54 +80,54 @@ public abstract class Selection
         return false;
     }
 
-    private static int addAndGetIndex(CFDefinition.Name name, List<CFDefinition.Name> l)
+    private static int addAndGetIndex(ColumnDefinition def, List<ColumnDefinition> l)
     {
-        int idx = l.indexOf(name);
+        int idx = l.indexOf(def);
         if (idx < 0)
         {
             idx = l.size();
-            l.add(name);
+            l.add(def);
         }
         return idx;
     }
 
-    private static Selector makeSelector(CFDefinition cfDef, RawSelector raw, List<CFDefinition.Name> names, List<ColumnSpecification> metadata) throws InvalidRequestException
+    private static Selector makeSelector(CFMetaData cfm, RawSelector raw, List<ColumnDefinition> defs, List<ColumnSpecification> metadata) throws InvalidRequestException
     {
         if (raw.selectable instanceof ColumnIdentifier)
         {
-            CFDefinition.Name name = cfDef.get((ColumnIdentifier)raw.selectable);
-            if (name == null)
+            ColumnDefinition def = cfm.getColumnDefinition((ColumnIdentifier)raw.selectable);
+            if (def == null)
                 throw new InvalidRequestException(String.format("Undefined name %s in selection clause", raw.selectable));
             if (metadata != null)
-                metadata.add(raw.alias == null ? name : makeAliasSpec(cfDef, name.type, raw.alias));
-            return new SimpleSelector(name.toString(), addAndGetIndex(name, names), name.type);
+                metadata.add(raw.alias == null ? def : makeAliasSpec(cfm, def.type, raw.alias));
+            return new SimpleSelector(def.name.toString(), addAndGetIndex(def, defs), def.type);
         }
         else if (raw.selectable instanceof Selectable.WritetimeOrTTL)
         {
             Selectable.WritetimeOrTTL tot = (Selectable.WritetimeOrTTL)raw.selectable;
-            CFDefinition.Name name = cfDef.get(tot.id);
-            if (name == null)
+            ColumnDefinition def = cfm.getColumnDefinition(tot.id);
+            if (def == null)
                 throw new InvalidRequestException(String.format("Undefined name %s in selection clause", tot.id));
-            if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
-                throw new InvalidRequestException(String.format("Cannot use selection function %s on PRIMARY KEY part %s", tot.isWritetime ? "writeTime" : "ttl", name));
-            if (name.type.isCollection())
+            if (def.kind != ColumnDefinition.Kind.REGULAR && def.kind != ColumnDefinition.Kind.COMPACT_VALUE)
+                throw new InvalidRequestException(String.format("Cannot use selection function %s on PRIMARY KEY part %s", tot.isWritetime ? "writeTime" : "ttl", def.name));
+            if (def.type.isCollection())
                 throw new InvalidRequestException(String.format("Cannot use selection function %s on collections", tot.isWritetime ? "writeTime" : "ttl"));
 
             if (metadata != null)
-                metadata.add(makeWritetimeOrTTLSpec(cfDef, tot, raw.alias));
-            return new WritetimeOrTTLSelector(name.toString(), addAndGetIndex(name, names), tot.isWritetime);
+                metadata.add(makeWritetimeOrTTLSpec(cfm, tot, raw.alias));
+            return new WritetimeOrTTLSelector(def.name.toString(), addAndGetIndex(def, defs), tot.isWritetime);
         }
         else
         {
             Selectable.WithFunction withFun = (Selectable.WithFunction)raw.selectable;
             List<Selector> args = new ArrayList<Selector>(withFun.args.size());
             for (Selectable rawArg : withFun.args)
-                args.add(makeSelector(cfDef, new RawSelector(rawArg, null), names, null));
+                args.add(makeSelector(cfm, new RawSelector(rawArg, null), defs, null));
 
-            AbstractType<?> returnType = Functions.getReturnType(withFun.functionName, cfDef.cfm.ksName, cfDef.cfm.cfName);
+            AbstractType<?> returnType = Functions.getReturnType(withFun.functionName, cfm.ksName, cfm.cfName);
             if (returnType == null)
                 throw new InvalidRequestException(String.format("Unknown function '%s'", withFun.functionName));
-            ColumnSpecification spec = makeFunctionSpec(cfDef, withFun, returnType, raw.alias);
+            ColumnSpecification spec = makeFunctionSpec(cfm, withFun, returnType, raw.alias);
             Function fun = Functions.get(withFun.functionName, args, spec);
             if (metadata != null)
                 metadata.add(spec);
@@ -132,15 +135,15 @@ public abstract class Selection
         }
     }
 
-    private static ColumnSpecification makeWritetimeOrTTLSpec(CFDefinition cfDef, Selectable.WritetimeOrTTL tot, ColumnIdentifier alias)
+    private static ColumnSpecification makeWritetimeOrTTLSpec(CFMetaData cfm, Selectable.WritetimeOrTTL tot, ColumnIdentifier alias)
     {
-        return new ColumnSpecification(cfDef.cfm.ksName,
-                                       cfDef.cfm.cfName,
+        return new ColumnSpecification(cfm.ksName,
+                                       cfm.cfName,
                                        alias == null ? new ColumnIdentifier(tot.toString(), true) : alias,
                                        tot.isWritetime ? LongType.instance : Int32Type.instance);
     }
 
-    private static ColumnSpecification makeFunctionSpec(CFDefinition cfDef,
+    private static ColumnSpecification makeFunctionSpec(CFMetaData cfm,
                                                         Selectable.WithFunction fun,
                                                         AbstractType<?> returnType,
                                                         ColumnIdentifier alias) throws InvalidRequestException
@@ -148,31 +151,31 @@ public abstract class Selection
         if (returnType == null)
             throw new InvalidRequestException(String.format("Unknown function %s called in selection clause", fun.functionName));
 
-        return new ColumnSpecification(cfDef.cfm.ksName,
-                                       cfDef.cfm.cfName,
+        return new ColumnSpecification(cfm.ksName,
+                                       cfm.cfName,
                                        alias == null ? new ColumnIdentifier(fun.toString(), true) : alias,
                                        returnType);
     }
 
-    private static ColumnSpecification makeAliasSpec(CFDefinition cfDef, AbstractType<?> type, ColumnIdentifier alias)
+    private static ColumnSpecification makeAliasSpec(CFMetaData cfm, AbstractType<?> type, ColumnIdentifier alias)
     {
-        return new ColumnSpecification(cfDef.cfm.ksName, cfDef.cfm.cfName, alias, type);
+        return new ColumnSpecification(cfm.ksName, cfm.cfName, alias, type);
     }
 
-    public static Selection fromSelectors(CFDefinition cfDef, List<RawSelector> rawSelectors) throws InvalidRequestException
+    public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors) throws InvalidRequestException
     {
         boolean usesFunction = isUsingFunction(rawSelectors);
 
         if (usesFunction)
         {
-            List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>();
+            List<ColumnDefinition> defs = new ArrayList<ColumnDefinition>();
             List<ColumnSpecification> metadata = new ArrayList<ColumnSpecification>(rawSelectors.size());
             List<Selector> selectors = new ArrayList<Selector>(rawSelectors.size());
             boolean collectTimestamps = false;
             boolean collectTTLs = false;
             for (RawSelector rawSelector : rawSelectors)
             {
-                Selector selector = makeSelector(cfDef, rawSelector, names, metadata);
+                Selector selector = makeSelector(cfm, rawSelector, defs, metadata);
                 selectors.add(selector);
                 if (selector instanceof WritetimeOrTTLSelector)
                 {
@@ -180,22 +183,22 @@ public abstract class Selection
                     collectTTLs |= !((WritetimeOrTTLSelector)selector).isWritetime;
                 }
             }
-            return new SelectionWithFunctions(names, metadata, selectors, collectTimestamps, collectTTLs);
+            return new SelectionWithFunctions(defs, metadata, selectors, collectTimestamps, collectTTLs);
         }
         else
         {
-            List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>(rawSelectors.size());
+            List<ColumnDefinition> defs = new ArrayList<ColumnDefinition>(rawSelectors.size());
             List<ColumnSpecification> metadata = new ArrayList<ColumnSpecification>(rawSelectors.size());
             for (RawSelector rawSelector : rawSelectors)
             {
                 assert rawSelector.selectable instanceof ColumnIdentifier;
-                CFDefinition.Name name = cfDef.get((ColumnIdentifier)rawSelector.selectable);
-                if (name == null)
+                ColumnDefinition def = cfm.getColumnDefinition((ColumnIdentifier)rawSelector.selectable);
+                if (def == null)
                     throw new InvalidRequestException(String.format("Undefined name %s in selection clause", rawSelector.selectable));
-                names.add(name);
-                metadata.add(rawSelector.alias == null ? name : makeAliasSpec(cfDef, name.type, rawSelector.alias));
+                defs.add(def);
+                metadata.add(rawSelector.alias == null ? def : makeAliasSpec(cfm, def.type, rawSelector.alias));
             }
-            return new SimpleSelection(names, metadata);
+            return new SimpleSelection(defs, metadata);
         }
     }
 
@@ -207,10 +210,10 @@ public abstract class Selection
     public List<ColumnIdentifier> regularColumnsToFetch()
     {
         List<ColumnIdentifier> toFetch = new ArrayList<ColumnIdentifier>();
-        for (CFDefinition.Name name : columnsList)
+        for (ColumnDefinition def : columnsList)
         {
-            if (name.kind == CFDefinition.Name.Kind.COLUMN_METADATA)
-                toFetch.add(name.name);
+            if (def.kind == ColumnDefinition.Kind.REGULAR)
+                toFetch.add(def.name);
         }
         return toFetch;
     }
@@ -218,7 +221,7 @@ public abstract class Selection
     /**
      * @return the list of CQL3 columns value this SelectionClause needs.
      */
-    public List<CFDefinition.Name> getColumnsList()
+    public List<ColumnDefinition> getColumnsList()
     {
         return columnsList;
     }
@@ -307,12 +310,12 @@ public abstract class Selection
     // Special cased selection for when no function is used (this save some allocations).
     private static class SimpleSelection extends Selection
     {
-        public SimpleSelection(List<CFDefinition.Name> columnsList)
+        public SimpleSelection(List<ColumnDefinition> columnsList)
         {
             this(columnsList, new ArrayList<ColumnSpecification>(columnsList));
         }
 
-        public SimpleSelection(List<CFDefinition.Name> columnsList, List<ColumnSpecification> metadata)
+        public SimpleSelection(List<ColumnDefinition> columnsList, List<ColumnSpecification> metadata)
         {
             /*
              * In theory, even a simple selection could have multiple time the same column, so we
@@ -444,7 +447,7 @@ public abstract class Selection
     {
         private final List<Selector> selectors;
 
-        public SelectionWithFunctions(List<CFDefinition.Name> columnsList, List<ColumnSpecification> metadata, List<Selector> selectors, boolean collectTimestamps, boolean collectTTLs)
+        public SelectionWithFunctions(List<ColumnDefinition> columnsList, List<ColumnSpecification> metadata, List<Selector> selectors, boolean collectTimestamps, boolean collectTTLs)
         {
             super(columnsList, metadata, collectTimestamps, collectTTLs);
             this.selectors = selectors;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 12348df..7a8340d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -22,7 +22,9 @@ import java.util.*;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -48,8 +50,6 @@ public class UpdateStatement extends ModificationStatement
     public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
     throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
-
         // Inserting the CQL row marker (see #4361)
         // We always need to insert a marker, because of the following situation:
         //   CREATE TABLE t ( k int PRIMARY KEY, c text );
@@ -61,7 +61,7 @@ public class UpdateStatement extends ModificationStatement
         // 'DELETE FROM t WHERE k = 1' does remove the row entirely)
         //
         // We never insert markers for Super CF as this would confuse the thrift side.
-        if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper())
+        if (cfm.hasCompositeComparator() && !cfm.isDense() && !cfm.isSuper())
         {
             ByteBuffer name = builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
             cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
@@ -69,23 +69,24 @@ public class UpdateStatement extends ModificationStatement
 
         List<Operation> updates = getOperations();
 
-        if (cfDef.isCompact)
+        if (cfm.isDense())
         {
             if (builder.componentCount() == 0)
-                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfDef.columns.values().iterator().next()));
+                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfm.clusteringColumns().iterator().next()));
 
-            if (cfDef.value == null)
+            // An empty name for the compact value is what we use to recognize the case where there is not column
+            // outside the PK, see CreateStatement.
+            if (!cfm.compactValueColumn().name.bytes.hasRemaining())
             {
-                // compact + no compact value implies there is no column outside the PK. So no operation could
-                // have passed through validation
+                // There is no column outside the PK. So no operation could have passed through validation
                 assert updates.isEmpty();
                 setToEmptyOperation.execute(key, cf, builder.copy(), params);
             }
             else
             {
-                // compact means we don't have a row marker, so don't accept to set only the PK. See CASSANDRA-5648.
+                // dense means we don't have a row marker, so don't accept to set only the PK. See CASSANDRA-5648.
                 if (updates.isEmpty())
-                    throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfDef.value));
+                    throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfm.compactValueColumn().name));
 
                 for (Operation update : updates)
                     update.execute(key, cf, builder.copy(), params);
@@ -129,9 +130,9 @@ public class UpdateStatement extends ModificationStatement
             this.columnValues = columnValues;
         }
 
-        protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
+        protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            UpdateStatement stmt = new UpdateStatement(boundNames.size(), cfDef.cfm, attrs);
+            UpdateStatement stmt = new UpdateStatement(boundNames.size(), cfm, attrs);
 
             // Created from an INSERT
             if (stmt.isCounter())
@@ -143,27 +144,27 @@ public class UpdateStatement extends ModificationStatement
 
             for (int i = 0; i < columnNames.size(); i++)
             {
-                CFDefinition.Name name = cfDef.get(columnNames.get(i));
-                if (name == null)
+                ColumnDefinition def = cfm.getColumnDefinition(columnNames.get(i));
+                if (def == null)
                     throw new InvalidRequestException(String.format("Unknown identifier %s", columnNames.get(i)));
 
                 for (int j = 0; j < i; j++)
-                    if (name.name.equals(columnNames.get(j)))
-                        throw new InvalidRequestException(String.format("Multiple definitions found for column %s", name));
+                    if (def.name.equals(columnNames.get(j)))
+                        throw new InvalidRequestException(String.format("Multiple definitions found for column %s", def.name));
 
                 Term.Raw value = columnValues.get(i);
 
-                switch (name.kind)
+                switch (def.kind)
                 {
-                    case KEY_ALIAS:
-                    case COLUMN_ALIAS:
-                        Term t = value.prepare(name);
+                    case PARTITION_KEY:
+                    case CLUSTERING_COLUMN:
+                        Term t = value.prepare(def);
                         t.collectMarkerSpecification(boundNames);
-                        stmt.addKeyValue(name.name, t);
+                        stmt.addKeyValue(def.name, t);
                         break;
-                    case VALUE_ALIAS:
-                    case COLUMN_METADATA:
-                        Operation operation = new Operation.SetValue(value).prepare(name);
+                    case COMPACT_VALUE:
+                    case REGULAR:
+                        Operation operation = new Operation.SetValue(value).prepare(def);
                         operation.collectMarkerSpecification(boundNames);
                         stmt.addOperation(operation);
                         break;
@@ -199,26 +200,26 @@ public class UpdateStatement extends ModificationStatement
             this.whereClause = whereClause;
         }
 
-        protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
+        protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            UpdateStatement stmt = new UpdateStatement(boundNames.size(), cfDef.cfm, attrs);
+            UpdateStatement stmt = new UpdateStatement(boundNames.size(), cfm, attrs);
 
             for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : updates)
             {
-                CFDefinition.Name name = cfDef.get(entry.left);
-                if (name == null)
+                ColumnDefinition def = cfm.getColumnDefinition(entry.left);
+                if (def == null)
                     throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
 
-                Operation operation = entry.right.prepare(name);
+                Operation operation = entry.right.prepare(def);
                 operation.collectMarkerSpecification(boundNames);
 
-                switch (name.kind)
+                switch (def.kind)
                 {
-                    case KEY_ALIAS:
-                    case COLUMN_ALIAS:
+                    case PARTITION_KEY:
+                    case CLUSTERING_COLUMN:
                         throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
-                    case VALUE_ALIAS:
-                    case COLUMN_METADATA:
+                    case COMPACT_VALUE:
+                    case REGULAR:
                         stmt.addOperation(operation);
                         break;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 0968df2..bba3ffe 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -284,7 +284,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     private static ByteBuffer columnName(String name)
     {
-        return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
+        return CFMetaData.BatchlogCf.getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
     }
 
     // force flush + compaction to reclaim space from the replayed batches

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index b0d22fb..e103cd3 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -30,7 +30,6 @@ import java.util.List;
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.cql3.CFDefinition;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -298,15 +297,8 @@ public class Column implements OnDiskAtom
     public void validateFields(CFMetaData metadata) throws MarshalException
     {
         validateName(metadata);
-        CFDefinition cfdef = metadata.getCfDef();
 
-        // If this is a CQL table, we need to pull out the CQL column name to look up the correct column type.
-        // (Note that COMPACT composites are handled by validateName, above.)
-        ByteBuffer internalName = (cfdef.isComposite && !cfdef.isCompact)
-                                ? ((CompositeType) metadata.comparator).extractLastComponent(name)
-                                : name;
-
-        AbstractType<?> valueValidator = metadata.getValueValidator(internalName);
+        AbstractType<?> valueValidator = metadata.getValueValidatorFromCellName(name());
         if (valueValidator != null)
             valueValidator.validate(value());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 7f8693e..89d6683 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -180,7 +180,7 @@ public abstract class ExtendedFilter
              * We also don't want to do for paging ranges as the actual filter depends on the row key (it would
              * probably be possible to make it work but we won't really use it so we don't bother).
              */
-            if (cfs.getComparator() instanceof CompositeType || dataRange instanceof DataRange.Paging)
+            if (cfs.metadata.hasCompositeComparator() || dataRange instanceof DataRange.Paging)
                 return null;
 
             IDiskAtomFilter filter = dataRange.columnFilter(null); // ok since not a paging range
@@ -251,7 +251,7 @@ public abstract class ExtendedFilter
              * 2) We don't yet allow non-indexed range slice with filters in CQL3 (i.e. this will never be
              * called by CFS.filter() for composites).
              */
-            assert !(cfs.getComparator() instanceof CompositeType);
+            assert !cfs.metadata.hasCompositeComparator();
 
             if (!needsExtraQuery(rowKey.key, data))
                 return null;
@@ -302,7 +302,7 @@ public abstract class ExtendedFilter
                 else
                 {
                     dataValue = extractDataValue(def, rowKey.key, data, builder);
-                    validator = def.getValidator();
+                    validator = def.type;
                 }
 
                 if (dataValue == null)
@@ -317,16 +317,16 @@ public abstract class ExtendedFilter
 
         private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
         {
-            switch (def.type)
+            switch (def.kind)
             {
                 case PARTITION_KEY:
-                    return def.componentIndex == null
+                    return def.isOnAllComponents()
                          ? rowKey
-                         : ((CompositeType)data.metadata().getKeyValidator()).split(rowKey)[def.componentIndex];
-                case CLUSTERING_KEY:
-                    return builder.get(def.componentIndex);
+                         : ((CompositeType)data.metadata().getKeyValidator()).split(rowKey)[def.position()];
+                case CLUSTERING_COLUMN:
+                    return builder.get(def.position());
                 case REGULAR:
-                    ByteBuffer colName = builder == null ? def.name : builder.copy().add(def.name).build();
+                    ByteBuffer colName = builder == null ? def.name.bytes : builder.copy().add(def.name).build();
                     Column column = data.getColumn(colName);
                     return column == null ? null : column.value();
                 case COMPACT_VALUE:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index ebd4730..699b391 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -50,7 +50,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              indexedCfMetadata.cfName,
-                                                             new LocalPartitioner(columnDef.getValidator()),
+                                                             new LocalPartitioner(columnDef.type),
                                                              indexedCfMetadata);
     }
 
@@ -66,7 +66,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
                              baseCfs.name,
                              getExpressionComparator().getString(expr.column),
                              expr.operator,
-                             baseCfs.metadata.getColumnDefinition(expr.column).getValidator().getString(expr.value));
+                             baseCfs.metadata.getColumnDefinition(expr.column).type.getString(expr.value));
     }
 
     public void delete(ByteBuffer rowKey, Column column)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 10f0e95..7a822dc 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -118,13 +118,13 @@ public abstract class SecondaryIndex
     public void setIndexBuilt()
     {
         for (ColumnDefinition columnDef : columnDefs)
-            SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name));
+            SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
     }
 
     public void setIndexRemoved()
     {
         for (ColumnDefinition columnDef : columnDefs)
-            SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name));
+            SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
     }
 
     /**
@@ -204,7 +204,7 @@ public abstract class SecondaryIndex
         boolean allAreBuilt = true;
         for (ColumnDefinition cdef : columnDefs)
         {
-            if (!SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(cdef.name)))
+            if (!SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(cdef.name.bytes)))
             {
                 allAreBuilt = false;
                 break;
@@ -268,8 +268,8 @@ public abstract class SecondaryIndex
     public DecoratedKey getIndexKeyFor(ByteBuffer value)
     {
         // FIXME: this imply one column definition per index
-        ByteBuffer name = columnDefs.iterator().next().name;
-        return new DecoratedKey(new LocalToken(baseCfs.metadata.getColumnDefinition(name).getValidator(), value), value);
+        ByteBuffer name = columnDefs.iterator().next().name.bytes;
+        return new DecoratedKey(new LocalToken(baseCfs.metadata.getColumnDefinition(name).type, value), value);
     }
 
     /**
@@ -282,7 +282,7 @@ public abstract class SecondaryIndex
     {
         for (ColumnDefinition columnDef : columnDefs)
         {
-            if (baseCfs.getComparator().compare(columnDef.name, name) == 0)
+            if (baseCfs.getComparator().compare(columnDef.name.bytes, name) == 0)
                 return true;
         }
         return false;


[3/4] Remove CFDefinition

Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index b6d5656..b15dc82 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -117,7 +117,7 @@ public class QueryProcessor
         // ...a range (slice) of column names
         else
         {
-            AbstractType<?> comparator = select.getComparator(metadata.ksName);
+            AbstractType<?> comparator = metadata.comparator;
             ByteBuffer start = select.getColumnStart().getByteBuffer(comparator,variables);
             ByteBuffer finish = select.getColumnFinish().getByteBuffer(comparator,variables);
 
@@ -187,7 +187,7 @@ public class QueryProcessor
         {
             // Left and right side of relational expression encoded according to comparator/validator.
             ByteBuffer entity = columnRelation.getEntity().getByteBuffer(metadata.comparator, variables);
-            ByteBuffer value = columnRelation.getValue().getByteBuffer(select.getValueValidator(metadata.ksName, entity), variables);
+            ByteBuffer value = columnRelation.getValue().getByteBuffer(metadata.getValueValidatorFromCellName(entity), variables);
 
             expressions.add(new IndexExpression(entity,
                                                 IndexExpression.Operator.valueOf(columnRelation.operator().toString()),
@@ -264,8 +264,9 @@ public class QueryProcessor
 
         if (select.getColumnRelations().size() > 0)
         {
-            AbstractType<?> comparator = select.getComparator(keyspace);
-            SecondaryIndexManager idxManager = Keyspace.open(keyspace).getColumnFamilyStore(select.getColumnFamily()).indexManager;
+            ColumnFamilyStore cfstore = Keyspace.open(keyspace).getColumnFamilyStore(select.getColumnFamily());
+            AbstractType<?> comparator = cfstore.metadata.comparator;
+            SecondaryIndexManager idxManager = cfstore.indexManager;
             for (Relation relation : select.getColumnRelations())
             {
                 ByteBuffer name = relation.getEntity().getByteBuffer(comparator, variables);
@@ -323,7 +324,7 @@ public class QueryProcessor
     throws InvalidRequestException
     {
         validateColumnName(name);
-        AbstractType<?> validator = metadata.getValueValidator(name);
+        AbstractType<?> validator = metadata.getValueValidatorFromCellName(name);
 
         try
         {
@@ -466,9 +467,9 @@ public class QueryProcessor
                                 if (c.isMarkedForDelete(now))
                                     continue;
 
-                                ColumnDefinition cd = metadata.getColumnDefinitionFromColumnName(c.name());
+                                ColumnDefinition cd = metadata.getColumnDefinitionFromCellName(c.name());
                                 if (cd != null)
-                                    result.schema.value_types.put(c.name(), TypeParser.getShortName(cd.getValidator()));
+                                    result.schema.value_types.put(c.name(), TypeParser.getShortName(cd.type));
 
                                 thriftColumns.add(thriftify(c));
                             }
@@ -504,9 +505,9 @@ public class QueryProcessor
                                 throw new AssertionError(e);
                             }
 
-                            ColumnDefinition cd = metadata.getColumnDefinitionFromColumnName(name);
+                            ColumnDefinition cd = metadata.getColumnDefinitionFromCellName(name);
                             if (cd != null)
-                                result.schema.value_types.put(name, TypeParser.getShortName(cd.getValidator()));
+                                result.schema.value_types.put(name, TypeParser.getShortName(cd.type));
                             org.apache.cassandra.db.Column c = row.cf.getColumn(name);
                             if (c == null || c.isMarkedForDelete(now))
                                 thriftColumns.add(new Column().setName(name));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/SelectStatement.java b/src/java/org/apache/cassandra/cql/SelectStatement.java
index 2314b73..b77e9ec 100644
--- a/src/java/org/apache/cassandra/cql/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql/SelectStatement.java
@@ -172,16 +172,6 @@ public class SelectStatement
         clause.extractKeysFromColumns(cfm);
     }
 
-    public AbstractType<?> getComparator(String keyspace)
-    {
-        return Schema.instance.getComparator(keyspace, columnFamily);
-    }
-
-    public AbstractType<?> getValueValidator(String keyspace, ByteBuffer column)
-    {
-        return Schema.instance.getValueValidator(keyspace, columnFamily, column);
-    }
-
     public List<Relation> getClauseRelations()
     {
         return clause.getClauseRelations();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/UpdateStatement.java b/src/java/org/apache/cassandra/cql/UpdateStatement.java
index 0e1250f..59d797f 100644
--- a/src/java/org/apache/cassandra/cql/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql/UpdateStatement.java
@@ -177,7 +177,7 @@ public class UpdateStatement extends AbstractModification
     throws InvalidRequestException
     {
         validateKey(key);
-        AbstractType<?> comparator = getComparator(keyspace);
+        AbstractType<?> comparator = metadata.comparator;
 
         // if true we need to wrap RowMutation into CounterMutation
         boolean hasCounterColumn = false;
@@ -193,7 +193,7 @@ public class UpdateStatement extends AbstractModification
                 if (hasCounterColumn)
                     throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
 
-                ByteBuffer colValue = op.a.getByteBuffer(getValueValidator(keyspace, colName),variables);
+                ByteBuffer colValue = op.a.getByteBuffer(metadata.getValueValidatorFromCellName(colName),variables);
 
                 validateColumn(metadata, colName, colValue);
                 rm.add(columnFamily,
@@ -277,16 +277,6 @@ public class UpdateStatement extends AbstractModification
         return Schema.instance.getCFMetaData(keyspace, columnFamily).getKeyValidator();
     }
 
-    public AbstractType<?> getComparator(String keyspace)
-    {
-        return Schema.instance.getComparator(keyspace, columnFamily);
-    }
-
-    public AbstractType<?> getValueValidator(String keyspace, ByteBuffer column)
-    {
-        return Schema.instance.getValueValidator(keyspace, columnFamily, column);
-    }
-
     public List<Term> getColumnNames()
     {
         return columnNames;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
deleted file mode 100644
index 54ca2b8..0000000
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.ColumnToCollectionType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * Holds metadata on a CF preprocessed for use by CQL queries.
- */
-public class CFDefinition implements Iterable<CFDefinition.Name>
-{
-    public static final AbstractType<?> definitionType = UTF8Type.instance;
-
-
-    public final CFMetaData cfm;
-    // LinkedHashMap because the order does matter (it is the order in the composite type)
-    public final LinkedHashMap<ColumnIdentifier, Name> keys = new LinkedHashMap<ColumnIdentifier, Name>();
-    public final LinkedHashMap<ColumnIdentifier, Name> columns = new LinkedHashMap<ColumnIdentifier, Name>();
-    public final Name value;
-    // Keep metadata lexicographically ordered so that wildcard expansion have a deterministic order
-    public final Map<ColumnIdentifier, Name> metadata = new TreeMap<ColumnIdentifier, Name>();
-
-    public final boolean isComposite;
-    public final boolean hasCompositeKey;
-    // Note that isCompact means here that no componet of the comparator correspond to the column names
-    // defined in the CREATE TABLE QUERY. This is not exactly equivalent to the 'WITH COMPACT STORAGE'
-    // option when creating a table in that "static CF" without a composite type will have isCompact == false
-    // even though one must use 'WITH COMPACT STORAGE' to declare them.
-    public final boolean isCompact;
-    public final boolean hasCollections;
-
-    public CFDefinition(CFMetaData cfm)
-    {
-        this.cfm = cfm;
-
-        this.hasCompositeKey = cfm.getKeyValidator() instanceof CompositeType;
-        for (int i = 0; i < cfm.partitionKeyColumns().size(); ++i)
-        {
-            ColumnIdentifier id = new ColumnIdentifier(cfm.partitionKeyColumns().get(i).name, definitionType);
-            this.keys.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.KEY_ALIAS, i, cfm.getKeyValidator().getComponents().get(i)));
-        }
-
-        this.isComposite = cfm.comparator instanceof CompositeType;
-        this.hasCollections = cfm.comparator.getComponents().get(cfm.comparator.componentsCount() - 1) instanceof ColumnToCollectionType;
-        this.isCompact = cfm.clusteringKeyColumns().size() == cfm.comparator.componentsCount();
-        for (int i = 0; i < cfm.clusteringKeyColumns().size(); ++i)
-        {
-            ColumnIdentifier id = new ColumnIdentifier(cfm.clusteringKeyColumns().get(i).name, definitionType);
-            this.columns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, cfm.comparator.getComponents().get(i)));
-        }
-
-        if (isCompact)
-        {
-            this.value = createValue(cfm);
-        }
-        else
-        {
-            this.value = null;
-            for (ColumnDefinition def : cfm.regularColumns())
-            {
-                ColumnIdentifier id = new ColumnIdentifier(def.name, cfm.getColumnDefinitionComparator(def));
-                this.metadata.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_METADATA, def.getValidator()));
-            }
-        }
-    }
-
-    public ColumnToCollectionType getCollectionType()
-    {
-        if (!hasCollections)
-            return null;
-
-        CompositeType composite = (CompositeType)cfm.comparator;
-        return (ColumnToCollectionType)composite.types.get(composite.types.size() - 1);
-    }
-
-    private static Name createValue(CFMetaData cfm)
-    {
-        ColumnIdentifier alias = new ColumnIdentifier(cfm.compactValueColumn().name, definitionType);
-        // That's how we distinguish between 'no value alias because coming from thrift' and 'I explicitely did not
-        // define a value' (see CreateTableStatement)
-        return alias.key.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-               ? null
-               : new Name(cfm.ksName, cfm.cfName, alias, Name.Kind.VALUE_ALIAS, cfm.getDefaultValidator());
-    }
-
-    public Name get(ColumnIdentifier name)
-    {
-        CFDefinition.Name kdef = keys.get(name);
-        if (kdef != null)
-            return kdef;
-        if (value != null && name.equals(value.name))
-            return value;
-        CFDefinition.Name def = columns.get(name);
-        if (def != null)
-            return def;
-        return metadata.get(name);
-    }
-
-    public Iterator<Name> iterator()
-    {
-        return new AbstractIterator<Name>()
-        {
-            private final Iterator<Name> keyIter = keys.values().iterator();
-            private final Iterator<Name> columnIter = columns.values().iterator();
-            private boolean valueDone;
-            private final Iterator<Name> metadataIter = metadata.values().iterator();
-
-            protected Name computeNext()
-            {
-                if (keyIter.hasNext())
-                    return keyIter.next();
-
-                if (columnIter.hasNext())
-                    return columnIter.next();
-
-                if (value != null && !valueDone)
-                {
-                    valueDone = true;
-                    return value;
-                }
-
-                if (metadataIter.hasNext())
-                    return metadataIter.next();
-
-                return endOfData();
-            }
-        };
-    }
-
-    public ColumnNameBuilder getKeyNameBuilder()
-    {
-        return hasCompositeKey
-             ? new CompositeType.Builder((CompositeType)cfm.getKeyValidator())
-             : new NonCompositeBuilder(cfm.getKeyValidator());
-    }
-
-    public ColumnNameBuilder getColumnNameBuilder()
-    {
-        return isComposite
-             ? new CompositeType.Builder((CompositeType)cfm.comparator)
-             : new NonCompositeBuilder(cfm.comparator);
-    }
-
-    public static class Name extends ColumnSpecification
-    {
-        public static enum Kind
-        {
-            KEY_ALIAS, COLUMN_ALIAS, VALUE_ALIAS, COLUMN_METADATA
-        }
-
-        private Name(String ksName, String cfName, ColumnIdentifier name, Kind kind, AbstractType<?> type)
-        {
-            this(ksName, cfName, name, kind, -1, type);
-        }
-
-        private Name(String ksName, String cfName, ColumnIdentifier name, Kind kind, int position, AbstractType<?> type)
-        {
-            super(ksName, cfName, name, type);
-            this.kind = kind;
-            this.position = position;
-        }
-
-        public final Kind kind;
-        public final int position; // only make sense for KEY_ALIAS and COLUMN_ALIAS
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if(!(o instanceof Name))
-                return false;
-            Name that = (Name)o;
-            return Objects.equal(ksName, that.ksName)
-                && Objects.equal(cfName, that.cfName)
-                && Objects.equal(name, that.name)
-                && Objects.equal(type, that.type)
-                && kind == that.kind
-                && position == that.position;
-        }
-
-        @Override
-        public final int hashCode()
-        {
-            return Objects.hashCode(ksName, cfName, name, type, kind, position);
-        }
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append(Joiner.on(", ").join(keys.values()));
-        if (!columns.isEmpty())
-            sb.append(", ").append(Joiner.on(", ").join(columns.values()));
-        sb.append(" => ");
-        if (value != null)
-            sb.append(value.name);
-        if (!metadata.isEmpty())
-            sb.append("{").append(Joiner.on(", ").join(metadata.values())).append(" }");
-        return sb.toString();
-    }
-
-    private static class NonCompositeBuilder implements ColumnNameBuilder
-    {
-        private final AbstractType<?> type;
-        private ByteBuffer columnName;
-
-        private NonCompositeBuilder(AbstractType<?> type)
-        {
-            this.type = type;
-        }
-
-        public NonCompositeBuilder add(ByteBuffer bb)
-        {
-            if (columnName != null)
-                throw new IllegalStateException("Column name is already constructed");
-
-            columnName = bb;
-            return this;
-        }
-
-        public NonCompositeBuilder add(ByteBuffer bb, Relation.Type op)
-        {
-            return add(bb);
-        }
-
-        public int componentCount()
-        {
-            return columnName == null ? 0 : 1;
-        }
-
-        public int remainingCount()
-        {
-            return columnName == null ? 1 : 0;
-        }
-
-        public ByteBuffer get(int i)
-        {
-            if (i < 0 || i >= (columnName == null ? 0 : 1))
-                throw new IllegalArgumentException();
-
-            return columnName;
-        }
-
-        public ByteBuffer build()
-        {
-            return columnName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : columnName;
-        }
-
-        public ByteBuffer buildAsEndOfRange()
-        {
-            return build();
-        }
-
-        public NonCompositeBuilder copy()
-        {
-            NonCompositeBuilder newBuilder = new NonCompositeBuilder(type);
-            newBuilder.columnName = columnName;
-            return newBuilder;
-        }
-
-        public ByteBuffer getComponent(int i)
-        {
-            if (i != 0 || columnName == null)
-                throw new IllegalArgumentException();
-
-            return columnName;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index 330eec0..a11094e 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -29,25 +29,25 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public class ColumnIdentifier implements Selectable, Comparable<ColumnIdentifier>
 {
-    public final ByteBuffer key;
+    public final ByteBuffer bytes;
     private final String text;
 
     public ColumnIdentifier(String rawText, boolean keepCase)
     {
         this.text = keepCase ? rawText : rawText.toLowerCase(Locale.US);
-        this.key = ByteBufferUtil.bytes(this.text);
+        this.bytes = ByteBufferUtil.bytes(this.text);
     }
 
-    public ColumnIdentifier(ByteBuffer key, AbstractType type)
+    public ColumnIdentifier(ByteBuffer bytes, AbstractType type)
     {
-        this.key = key;
-        this.text = type.getString(key);
+        this.bytes = bytes;
+        this.text = type.getString(bytes);
     }
 
     @Override
     public final int hashCode()
     {
-        return key.hashCode();
+        return bytes.hashCode();
     }
 
     @Override
@@ -56,7 +56,7 @@ public class ColumnIdentifier implements Selectable, Comparable<ColumnIdentifier
         if(!(o instanceof ColumnIdentifier))
             return false;
         ColumnIdentifier that = (ColumnIdentifier)o;
-        return key.equals(that.key);
+        return bytes.equals(that.bytes);
     }
 
     @Override
@@ -67,6 +67,6 @@ public class ColumnIdentifier implements Selectable, Comparable<ColumnIdentifier
 
     public int compareTo(ColumnIdentifier other)
     {
-        return key.compareTo(other.key);
+        return bytes.compareTo(other.bytes);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java b/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
index b6625ab..384916d 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
@@ -32,6 +32,8 @@ public interface ColumnNameBuilder
      */
     public ColumnNameBuilder add(ByteBuffer bb);
 
+    public ColumnNameBuilder add(ColumnIdentifier name);
+
     /**
      * Add a new ByteBuffer as the next component for this name.
      * @param t the ByteBuffer to add

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnSpecification.java b/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
index 4dae701..d2e08f9 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
@@ -33,11 +33,4 @@ public class ColumnSpecification
         this.name = name;
         this.type = type;
     }
-
-    @Override
-    public String toString()
-    {
-        // Not fully conventional, but convenient (for error message to users in particular)
-        return name.toString();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index bcfe00d..34b3f8d 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -296,7 +296,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
+            ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName).build();
             ByteBuffer value = t.bindAndGet(params.variables);
             cf.addColumn(value == null ? params.makeTombstone(cname) : params.makeColumn(cname, value));
         }
@@ -315,7 +315,7 @@ public abstract class Constants
             if (bytes == null)
                 throw new InvalidRequestException("Invalid null value for counter increment");
             long increment = ByteBufferUtil.toLong(bytes);
-            ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
+            ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName).build();
             cf.addCounter(cname, increment);
         }
     }
@@ -337,7 +337,7 @@ public abstract class Constants
             if (increment == Long.MIN_VALUE)
                 throw new InvalidRequestException("The negation of " + increment + " overflows supported counter precision (signed 8 bytes integer)");
 
-            ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
+            ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName).build();
             cf.addCounter(cname, -increment);
         }
     }
@@ -356,7 +356,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = prefix.add(columnName);
 
             if (isCollection)
                 cf.addAtom(params.makeRangeTombstone(column.build(), column.buildAsEndOfRange()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index 4ca5eb3..35da98e 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -266,7 +266,7 @@ public abstract class Lists
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + append
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = prefix.add(columnName);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Appender.doAppend(t, cf, column, params);
         }
@@ -303,7 +303,7 @@ public abstract class Lists
             if (index == null)
                 throw new InvalidRequestException("Invalid null value for list index");
 
-            List<Pair<ByteBuffer, Column>> existingList = params.getPrefetchedList(rowKey, columnName.key);
+            List<Pair<ByteBuffer, Column>> existingList = params.getPrefetchedList(rowKey, columnName);
             int idx = ByteBufferUtil.toInt(index);
             if (idx < 0 || idx >= existingList.size())
                 throw new InvalidRequestException(String.format("List index %d out of bound, list has size %d", idx, existingList.size()));
@@ -336,7 +336,7 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doAppend(t, cf, prefix.add(columnName.key), params);
+            doAppend(t, cf, prefix.add(columnName), params);
         }
 
         static void doAppend(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -376,7 +376,7 @@ public abstract class Lists
             long time = PrecisionTime.REFERENCE_TIME - (System.currentTimeMillis() - PrecisionTime.REFERENCE_TIME);
 
             List<ByteBuffer> toAdd = ((Lists.Value)value).elements;
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = prefix.add(columnName);
             for (int i = 0; i < toAdd.size(); i++)
             {
                 ColumnNameBuilder b = i == toAdd.size() - 1 ? column : column.copy();
@@ -403,7 +403,7 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            List<Pair<ByteBuffer, Column>> existingList = params.getPrefetchedList(rowKey, columnName.key);
+            List<Pair<ByteBuffer, Column>> existingList = params.getPrefetchedList(rowKey, columnName);
             if (existingList.isEmpty())
                 return;
 
@@ -448,7 +448,7 @@ public abstract class Lists
 
             assert index instanceof Constants.Value;
 
-            List<Pair<ByteBuffer, Column>> existingList = params.getPrefetchedList(rowKey, columnName.key);
+            List<Pair<ByteBuffer, Column>> existingList = params.getPrefetchedList(rowKey, columnName);
             int idx = ByteBufferUtil.toInt(((Constants.Value)index).bytes);
             if (idx < 0 || idx >= existingList.size())
                 throw new InvalidRequestException(String.format("List index %d out of bound, list has size %d", idx, existingList.size()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index 30d796c..4107b95 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -244,7 +244,7 @@ public abstract class Maps
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + put
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = prefix.add(columnName);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Putter.doPut(t, cf, column, params);
         }
@@ -274,7 +274,7 @@ public abstract class Maps
             if (key == null)
                 throw new InvalidRequestException("Invalid null map key");
 
-            ByteBuffer cellName = prefix.add(columnName.key).add(key).build();
+            ByteBuffer cellName = prefix.add(columnName).add(key).build();
 
             if (value == null)
             {
@@ -302,7 +302,7 @@ public abstract class Maps
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doPut(t, cf, prefix.add(columnName.key), params);
+            doPut(t, cf, prefix.add(columnName), params);
         }
 
         static void doPut(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -335,7 +335,7 @@ public abstract class Maps
                 throw new InvalidRequestException("Invalid null map key");
             assert key instanceof Constants.Value;
 
-            ByteBuffer cellName = prefix.add(columnName.key).add(((Constants.Value)key).bytes).build();
+            ByteBuffer cellName = prefix.add(columnName).add(((Constants.Value)key).bytes).build();
             cf.addColumn(params.makeTombstone(cellName));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index d84f57b..e0272b7 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -109,7 +110,7 @@ public abstract class Operation
          * be a true column.
          * @return the prepared update operation.
          */
-        public Operation prepare(CFDefinition.Name receiver) throws InvalidRequestException;
+        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException;
 
         /**
          * @return whether this operation can be applied alongside the {@code
@@ -155,7 +156,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFDefinition.Name receiver) throws InvalidRequestException
+        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
         {
             Term v = value.prepare(receiver);
 
@@ -163,7 +164,7 @@ public abstract class Operation
                 throw new InvalidRequestException(String.format("Cannot set the value of counter column %s (counters can only be incremented/decremented, not set)", receiver));
 
             if (!(receiver.type instanceof CollectionType))
-                return new Constants.Setter(receiver.kind == CFDefinition.Name.Kind.VALUE_ALIAS ? null : receiver.name, v);
+                return new Constants.Setter(receiver.kind == ColumnDefinition.Kind.COMPACT_VALUE ? null : receiver.name, v);
 
             switch (((CollectionType)receiver.type).kind)
             {
@@ -201,7 +202,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFDefinition.Name receiver) throws InvalidRequestException
+        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 throw new InvalidRequestException(String.format("Invalid operation (%s) for non collection column %s", toString(receiver), receiver));
@@ -244,7 +245,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFDefinition.Name receiver) throws InvalidRequestException
+        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
         {
             Term v = value.prepare(receiver);
 
@@ -252,7 +253,7 @@ public abstract class Operation
             {
                 if (!(receiver.type instanceof CounterColumnType))
                     throw new InvalidRequestException(String.format("Invalid operation (%s) for non counter column %s", toString(receiver), receiver));
-                return new Constants.Adder(receiver.kind == CFDefinition.Name.Kind.VALUE_ALIAS ? null : receiver.name, v);
+                return new Constants.Adder(receiver.kind == ColumnDefinition.Kind.COMPACT_VALUE ? null : receiver.name, v);
             }
 
             switch (((CollectionType)receiver.type).kind)
@@ -287,7 +288,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFDefinition.Name receiver) throws InvalidRequestException
+        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
         {
             Term v = value.prepare(receiver);
 
@@ -295,7 +296,7 @@ public abstract class Operation
             {
                 if (!(receiver.type instanceof CounterColumnType))
                     throw new InvalidRequestException(String.format("Invalid operation (%s) for non counter column %s", toString(receiver), receiver));
-                return new Constants.Substracter(receiver.kind == CFDefinition.Name.Kind.VALUE_ALIAS ? null : receiver.name, v);
+                return new Constants.Substracter(receiver.kind == ColumnDefinition.Kind.COMPACT_VALUE ? null : receiver.name, v);
             }
 
             switch (((CollectionType)receiver.type).kind)
@@ -330,7 +331,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(CFDefinition.Name receiver) throws InvalidRequestException
+        public Operation prepare(ColumnDefinition receiver) throws InvalidRequestException
         {
             Term v = value.prepare(receiver);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index e4f27f9..79459df 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -124,11 +124,11 @@ public class ResultSet
                 // The 2 following ones shouldn't be needed in CQL3
                 UTF8, UTF8);
 
-        for (ColumnSpecification name : metadata.names)
+        for (ColumnSpecification spec : metadata.names)
         {
-            ByteBuffer colName = ByteBufferUtil.bytes(name.toString());
+            ByteBuffer colName = ByteBufferUtil.bytes(spec.name.toString());
             schema.name_types.put(colName, UTF8);
-            AbstractType<?> normalizedType = name.type instanceof ReversedType ? ((ReversedType)name.type).baseType : name.type;
+            AbstractType<?> normalizedType = spec.type instanceof ReversedType ? ((ReversedType)spec.type).baseType : spec.type;
             schema.value_types.put(colName, normalizedType.toString());
 
         }
@@ -139,7 +139,7 @@ public class ResultSet
             List<Column> thriftCols = new ArrayList<Column>(metadata.names.size());
             for (int i = 0; i < metadata.names.size(); i++)
             {
-                Column col = new Column(ByteBufferUtil.bytes(metadata.names.get(i).toString()));
+                Column col = new Column(ByteBufferUtil.bytes(metadata.names.get(i).name.toString()));
                 col.setValue(row.get(i));
                 thriftCols.add(col);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index 0fcb8bf..01ac1e7 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -230,7 +230,7 @@ public abstract class Sets
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + add
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = prefix.add(columnName);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Adder.doAdd(t, cf, column, params);
         }
@@ -245,7 +245,7 @@ public abstract class Sets
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doAdd(t, cf, prefix.add(columnName.key), params);
+            doAdd(t, cf, prefix.add(columnName), params);
         }
 
         static void doAdd(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -283,7 +283,7 @@ public abstract class Sets
                                       ? Collections.singleton(((Constants.Value)value).bytes)
                                       : ((Sets.Value)value).elements;
 
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = prefix.add(columnName);
             for (ByteBuffer bb : toDiscard)
             {
                 ByteBuffer cellName = column.copy().add(bb).build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index e5bd863..8f1aaf5 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -83,7 +83,7 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row>
         {
             this.columns.addAll(names);
             for (int i = 0; i < names.size(); i++)
-                data.put(names.get(i).toString(), columns.get(i));
+                data.put(names.get(i).name.toString(), columns.get(i));
         }
 
         public boolean has(String column)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 0034b16..1288576 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -78,12 +78,12 @@ public class UpdateParameters
         return new RangeTombstone(start, end, timestamp - 1, localDeletionTime);
     }
 
-    public List<Pair<ByteBuffer, Column>> getPrefetchedList(ByteBuffer rowKey, ByteBuffer cql3ColumnName)
+    public List<Pair<ByteBuffer, Column>> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName)
     {
         if (prefetchedLists == null)
             return Collections.emptyList();
 
         ColumnGroupMap m = prefetchedLists.get(rowKey);
-        return m == null ? Collections.<Pair<ByteBuffer, Column>>emptyList() : m.getCollection(cql3ColumnName);
+        return m == null ? Collections.<Pair<ByteBuffer, Column>>emptyList() : m.getCollection(cql3ColumnName.bytes);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index ac6b999..cb6e6c4 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.CFDefinition;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
@@ -42,26 +42,26 @@ public class TokenFct extends AbstractFunction
         }
     };
 
-    private final CFDefinition cfDef;
+    private final CFMetaData cfm;
 
     public TokenFct(CFMetaData cfm)
     {
         super("token", partitioner.getTokenValidator(), getKeyTypes(cfm));
-        this.cfDef = cfm.getCfDef();
+        this.cfm = cfm;
     }
 
     private static AbstractType[] getKeyTypes(CFMetaData cfm)
     {
-        AbstractType[] types = new AbstractType[cfm.getCfDef().keys.size()];
+        AbstractType[] types = new AbstractType[cfm.partitionKeyColumns().size()];
         int i = 0;
-        for (CFDefinition.Name name : cfm.getCfDef().keys.values())
-            types[i++] = name.type;
+        for (ColumnDefinition def : cfm.partitionKeyColumns())
+            types[i++] = def.type;
         return types;
     }
 
     public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException
     {
-        ColumnNameBuilder builder = cfDef.getKeyNameBuilder();
+        ColumnNameBuilder builder = cfm.getKeyNameBuilder();
         for (int i = 0; i < parameters.size(); i++)
         {
             ByteBuffer bb = parameters.get(i);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index f740ea6..049f90d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -73,21 +73,20 @@ public class AlterTableStatement extends SchemaAlteringStatement
         CFMetaData meta = validateColumnFamily(keyspace(), columnFamily());
         CFMetaData cfm = meta.clone();
 
-        CFDefinition cfDef = meta.getCfDef();
-        CFDefinition.Name name = columnName == null ? null : cfDef.get(columnName);
+        ColumnDefinition def = columnName == null ? null : cfm.getColumnDefinition(columnName);
         switch (oType)
         {
             case ADD:
-                if (cfDef.isCompact)
+                if (cfm.isDense())
                     throw new InvalidRequestException("Cannot add new column to a compact CF");
-                if (name != null)
+                if (def != null)
                 {
-                    switch (name.kind)
+                    switch (def.kind)
                     {
-                        case KEY_ALIAS:
-                        case COLUMN_ALIAS:
+                        case PARTITION_KEY:
+                        case CLUSTERING_COLUMN:
                             throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with a PRIMARY KEY part", columnName));
-                        case COLUMN_METADATA:
+                        case REGULAR:
                             throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with an existing column", columnName));
                     }
                 }
@@ -95,52 +94,52 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 AbstractType<?> type = validator.getType();
                 if (type instanceof CollectionType)
                 {
-                    if (!cfDef.isComposite)
+                    if (!cfm.hasCompositeComparator())
                         throw new InvalidRequestException("Cannot use collection types with non-composite PRIMARY KEY");
-                    if (cfDef.cfm.isSuper())
+                    if (cfm.isSuper())
                         throw new InvalidRequestException("Cannot use collection types with Super column family");
 
-                    Map<ByteBuffer, CollectionType> collections = cfDef.hasCollections
-                                                                ? new HashMap<ByteBuffer, CollectionType>(cfDef.getCollectionType().defined)
+                    Map<ByteBuffer, CollectionType> collections = cfm.hasCollections()
+                                                                ? new HashMap<ByteBuffer, CollectionType>(cfm.getCollectionType().defined)
                                                                 : new HashMap<ByteBuffer, CollectionType>();
 
-                    collections.put(columnName.key, (CollectionType)type);
+                    collections.put(columnName.bytes, (CollectionType)type);
                     ColumnToCollectionType newColType = ColumnToCollectionType.getInstance(collections);
                     List<AbstractType<?>> ctypes = new ArrayList<AbstractType<?>>(((CompositeType)cfm.comparator).types);
-                    if (cfDef.hasCollections)
+                    if (cfm.hasCollections())
                         ctypes.set(ctypes.size() - 1, newColType);
                     else
                         ctypes.add(newColType);
                     cfm.comparator = CompositeType.getInstance(ctypes);
                 }
 
-                Integer componentIndex = cfDef.isComposite
-                                       ? ((CompositeType)meta.comparator).types.size() - (cfDef.hasCollections ? 2 : 1)
+                Integer componentIndex = meta.hasCompositeComparator()
+                                       ? ((CompositeType)meta.comparator).types.size() - (meta.hasCollections() ? 2 : 1)
                                        : null;
-                cfm.addColumnDefinition(ColumnDefinition.regularDef(columnName.key, type, componentIndex));
+                cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, columnName.bytes, type, componentIndex));
                 break;
 
             case ALTER:
-                if (name == null)
+                if (def == null)
                     throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily()));
 
-                switch (name.kind)
+                switch (def.kind)
                 {
-                    case KEY_ALIAS:
+                    case PARTITION_KEY:
                         AbstractType<?> newType = validator.getType();
                         if (newType instanceof CounterColumnType)
                             throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", columnName));
-                        if (cfDef.hasCompositeKey)
+                        if (cfm.getKeyValidator() instanceof CompositeType)
                         {
                             List<AbstractType<?>> oldTypes = ((CompositeType) cfm.getKeyValidator()).types;
-                            if (!newType.isValueCompatibleWith(oldTypes.get(name.position)))
+                            if (!newType.isValueCompatibleWith(oldTypes.get(def.position())))
                                 throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are incompatible.",
                                                                                columnName,
-                                                                               oldTypes.get(name.position).asCQL3Type(),
+                                                                               oldTypes.get(def.position()).asCQL3Type(),
                                                                                validator));
 
                             List<AbstractType<?>> newTypes = new ArrayList<AbstractType<?>>(oldTypes);
-                            newTypes.set(name.position, newType);
+                            newTypes.set(def.position(), newType);
                             cfm.keyValidator(CompositeType.getInstance(newTypes));
                         }
                         else
@@ -153,22 +152,22 @@ public class AlterTableStatement extends SchemaAlteringStatement
                             cfm.keyValidator(newType);
                         }
                         break;
-                    case COLUMN_ALIAS:
-                        assert cfDef.isComposite;
+                    case CLUSTERING_COLUMN:
+                        assert cfm.hasCompositeComparator();
                         List<AbstractType<?>> oldTypes = ((CompositeType) cfm.comparator).types;
                         // Note that CFMetaData.validateCompatibility already validate the change we're about to do. However, the error message it
                         // sends is a bit cryptic for a CQL3 user, so validating here for a sake of returning a better error message
                         // Do note that we need isCompatibleWith here, not just isValueCompatibleWith.
-                        if (!validator.getType().isCompatibleWith(oldTypes.get(name.position)))
+                        if (!validator.getType().isCompatibleWith(oldTypes.get(def.position())))
                             throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are not order-compatible.",
                                                                            columnName,
-                                                                           oldTypes.get(name.position).asCQL3Type(),
+                                                                           oldTypes.get(def.position()).asCQL3Type(),
                                                                            validator));
                         List<AbstractType<?>> newTypes = new ArrayList<AbstractType<?>>(oldTypes);
-                        newTypes.set(name.position, validator.getType());
+                        newTypes.set(def.position(), validator.getType());
                         cfm.comparator = CompositeType.getInstance(newTypes);
                         break;
-                    case VALUE_ALIAS:
+                    case COMPACT_VALUE:
                         // See below
                         if (!validator.getType().isValueCompatibleWith(cfm.getDefaultValidator()))
                             throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are incompatible.",
@@ -177,42 +176,42 @@ public class AlterTableStatement extends SchemaAlteringStatement
                                                                            validator));
                         cfm.defaultValidator(validator.getType());
                         break;
-                    case COLUMN_METADATA:
-                        ColumnDefinition column = cfm.getColumnDefinition(columnName.key);
+                    case REGULAR:
                         // Thrift allows to change a column validator so CFMetaData.validateCompatibility will let it slide
                         // if we change to an incompatible type (contrarily to the comparator case). But we don't want to
                         // allow it for CQL3 (see #5882) so validating it explicitly here. We only care about value compatibility
                         // though since we won't compare values (except when there is an index, but that is validated by
                         // ColumnDefinition already).
-                        if (!validator.getType().isValueCompatibleWith(column.getValidator()))
+                        if (!validator.getType().isValueCompatibleWith(def.type))
                             throw new ConfigurationException(String.format("Cannot change %s from type %s to type %s: types are incompatible.",
                                                                            columnName,
-                                                                           column.getValidator().asCQL3Type(),
+                                                                           def.type.asCQL3Type(),
                                                                            validator));
 
-                        column.setValidator(validator.getType());
                         break;
                 }
+                // In any case, we update the column definition
+                cfm.addOrReplaceColumnDefinition(def.withNewType(validator.getType()));
                 break;
 
             case DROP:
-                if (cfDef.isCompact)
+                if (cfm.isDense())
                     throw new InvalidRequestException("Cannot drop columns from a compact CF");
-                if (!cfDef.isComposite)
+                if (!cfm.hasCompositeComparator())
                     throw new InvalidRequestException("Cannot drop columns from a non-CQL3 CF");
-                if (name == null)
+                if (def == null)
                     throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily()));
 
-                switch (name.kind)
+                switch (def.kind)
                 {
-                    case KEY_ALIAS:
-                    case COLUMN_ALIAS:
+                    case PARTITION_KEY:
+                    case CLUSTERING_COLUMN:
                         throw new InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", columnName));
-                    case COLUMN_METADATA:
+                    case REGULAR:
                         ColumnDefinition toDelete = null;
                         for (ColumnDefinition columnDef : cfm.regularColumns())
                         {
-                            if (columnDef.name.equals(columnName.key))
+                            if (columnDef.name.equals(columnName))
                                 toDelete = columnDef;
                         }
                         assert toDelete != null;
@@ -230,11 +229,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 break;
             case RENAME:
                 for (Map.Entry<ColumnIdentifier, ColumnIdentifier> entry : renames.entrySet())
-                {
-                    ColumnIdentifier from = entry.getKey();
-                    ColumnIdentifier to = entry.getValue();
-                    cfm.renameColumn(from.key, from.toString(), to.key, to.toString());
-                }
+                    cfm.renameColumn(entry.getKey(), entry.getValue());
                 break;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 1c0627f..b040121 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.service.ClientState;
@@ -67,7 +68,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         if (cfm.getDefaultValidator().isCommutative())
             throw new InvalidRequestException("Secondary indexes are not supported on counter tables");
 
-        ColumnDefinition cd = cfm.getColumnDefinition(columnName.key);
+        ColumnDefinition cd = cfm.getColumnDefinition(columnName);
 
         if (cd == null)
             throw new InvalidRequestException("No column definition found for column " + columnName);
@@ -87,13 +88,13 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             throw new InvalidRequestException("Cannot specify index class for a non-CUSTOM index");
 
         // TODO: we could lift that limitation
-        if (cfm.getCfDef().isCompact && cd.type != ColumnDefinition.Type.REGULAR)
-            throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.type, columnName));
+        if (cfm.isDense() && cd.kind != ColumnDefinition.Kind.REGULAR)
+            throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.kind, columnName));
 
-        if (cd.getValidator().isCollection() && !isCustom)
+        if (cd.type.isCollection() && !isCustom)
             throw new InvalidRequestException("Indexes on collections are no yet supported");
 
-        if (cd.type == ColumnDefinition.Type.PARTITION_KEY && cd.componentIndex == null)
+        if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
             throw new InvalidRequestException(String.format("Cannot add secondary index to already primarily indexed column %s", columnName));
     }
 
@@ -101,14 +102,14 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     {
         logger.debug("Updating column {} definition for index {}", columnName, indexName);
         CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily()).clone();
-        ColumnDefinition cd = cfm.getColumnDefinition(columnName.key);
+        ColumnDefinition cd = cfm.getColumnDefinition(columnName);
 
         if (cd.getIndexType() != null && ifNotExists)
             return;
 
         if (isCustom)
             cd.setIndexType(IndexType.CUSTOM, Collections.singletonMap(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, indexClass));
-        else if (cfm.getCfDef().isComposite)
+        else if (cfm.hasCompositeComparator())
             cd.setIndexType(IndexType.COMPOSITES, Collections.<String, String>emptyMap());
         else
             cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 74f7570..809e0dc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -87,11 +87,11 @@ public class CreateTableStatement extends SchemaAlteringStatement
     }
 
     // Column definitions
-    private Map<ByteBuffer, ColumnDefinition> getColumns()
+    private Map<ByteBuffer, ColumnDefinition> getColumns(CFMetaData cfm)
     {
         Map<ByteBuffer, ColumnDefinition> columnDefs = new HashMap<ByteBuffer, ColumnDefinition>();
         Integer componentIndex = null;
-        if (comparator instanceof CompositeType)
+        if (cfm.hasCompositeComparator())
         {
             CompositeType ct = (CompositeType) comparator;
             componentIndex = ct.types.get(ct.types.size() - 1) instanceof ColumnToCollectionType
@@ -101,7 +101,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
         for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
         {
-            columnDefs.put(col.getKey().key, ColumnDefinition.regularDef(col.getKey().key, col.getValue(), componentIndex));
+            columnDefs.put(col.getKey().bytes, ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
         }
 
         return columnDefs;
@@ -148,12 +148,12 @@ public class CreateTableStatement extends SchemaAlteringStatement
     {
         cfmd.defaultValidator(defaultValidator)
             .keyValidator(keyValidator)
-            .columnMetadata(getColumns());
+            .columnMetadata(getColumns(cfmd));
 
-        cfmd.addColumnMetadataFromAliases(keyAliases, keyValidator, ColumnDefinition.Type.PARTITION_KEY);
-        cfmd.addColumnMetadataFromAliases(columnAliases, comparator, ColumnDefinition.Type.CLUSTERING_KEY);
+        cfmd.addColumnMetadataFromAliases(keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
+        cfmd.addColumnMetadataFromAliases(columnAliases, comparator, ColumnDefinition.Kind.CLUSTERING_COLUMN);
         if (valueAlias != null)
-            cfmd.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(valueAlias), defaultValidator, ColumnDefinition.Type.COMPACT_VALUE);
+            cfmd.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
 
         properties.applyToCFMetadata(cfmd);
     }
@@ -206,7 +206,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 {
                     if (definedCollections == null)
                         definedCollections = new HashMap<ByteBuffer, CollectionType>();
-                    definedCollections.put(id.key, (CollectionType)pt.getType());
+                    definedCollections.put(id.bytes, (CollectionType)pt.getType());
                 }
                 stmt.columns.put(id, pt.getType()); // we'll remove what is not a column below
             }
@@ -221,7 +221,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
             List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>(kAliases.size());
             for (ColumnIdentifier alias : kAliases)
             {
-                stmt.keyAliases.add(alias.key);
+                stmt.keyAliases.add(alias.bytes);
                 AbstractType<?> t = getTypeAndRemove(stmt.columns, alias);
                 if (t instanceof CounterColumnType)
                     throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
@@ -241,12 +241,12 @@ public class CreateTableStatement extends SchemaAlteringStatement
                     if (definedCollections != null)
                         throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
 
-                    stmt.comparator = CFDefinition.definitionType;
+                    stmt.comparator = UTF8Type.instance;
                 }
                 else
                 {
                     List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(definedCollections == null ? 1 : 2);
-                    types.add(CFDefinition.definitionType);
+                    types.add(UTF8Type.instance);
                     if (definedCollections != null)
                         types.add(ColumnToCollectionType.getInstance(definedCollections));
                     stmt.comparator = CompositeType.getInstance(types);
@@ -260,7 +260,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 {
                     if (definedCollections != null)
                         throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
-                    stmt.columnAliases.add(columnAliases.get(0).key);
+                    stmt.columnAliases.add(columnAliases.get(0).bytes);
                     stmt.comparator = getTypeAndRemove(stmt.columns, columnAliases.get(0));
                     if (stmt.comparator instanceof CounterColumnType)
                         throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
@@ -270,11 +270,11 @@ public class CreateTableStatement extends SchemaAlteringStatement
                     List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(columnAliases.size() + 1);
                     for (ColumnIdentifier t : columnAliases)
                     {
-                        stmt.columnAliases.add(t.key);
+                        stmt.columnAliases.add(t.bytes);
 
                         AbstractType<?> type = getTypeAndRemove(stmt.columns, t);
                         if (type instanceof CounterColumnType)
-                            throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t.key));
+                            throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t));
                         types.add(type);
                     }
 
@@ -287,7 +287,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
                     {
                         // For sparse, we must add the last UTF8 component
                         // and the collection type if there is one
-                        types.add(CFDefinition.definitionType);
+                        types.add(UTF8Type.instance);
                         if (definedCollections != null)
                             types.add(ColumnToCollectionType.getInstance(definedCollections));
                     }
@@ -317,7 +317,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
                     Map.Entry<ColumnIdentifier, AbstractType> lastEntry = stmt.columns.entrySet().iterator().next();
                     stmt.defaultValidator = lastEntry.getValue();
-                    stmt.valueAlias = lastEntry.getKey().key;
+                    stmt.valueAlias = lastEntry.getKey().bytes;
                     stmt.columns.remove(lastEntry.getKey());
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 3704e14..db991c0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.Pair;
@@ -44,12 +45,11 @@ public class DeleteStatement extends ModificationStatement
     public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
     throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
         ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
         List<Operation> deletions = getOperations();
 
-        boolean fullKey = builder.componentCount() == cfDef.columns.size();
-        boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || deletions.isEmpty());
+        boolean fullKey = builder.componentCount() == cfm.clusteringColumns().size();
+        boolean isRange = cfm.isDense() ? !fullKey : (!fullKey || deletions.isEmpty());
 
         if (!deletions.isEmpty() && isRange)
             throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletions.get(0).columnName));
@@ -71,7 +71,7 @@ public class DeleteStatement extends ModificationStatement
             else
             {
                 // Delete specific columns
-                if (cfDef.isCompact)
+                if (cfm.isDense())
                 {
                     ByteBuffer columnName = builder.build();
                     cf.addColumn(params.makeTombstone(columnName));
@@ -103,22 +103,22 @@ public class DeleteStatement extends ModificationStatement
             this.whereClause = whereClause;
         }
 
-        protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
+        protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            DeleteStatement stmt = new DeleteStatement(boundNames.size(), cfDef.cfm, attrs);
+            DeleteStatement stmt = new DeleteStatement(boundNames.size(), cfm, attrs);
 
             for (Operation.RawDeletion deletion : deletions)
             {
-                CFDefinition.Name name = cfDef.get(deletion.affectedColumn());
-                if (name == null)
+                ColumnDefinition def = cfm.getColumnDefinition(deletion.affectedColumn());
+                if (def == null)
                     throw new InvalidRequestException(String.format("Unknown identifier %s", deletion.affectedColumn()));
 
                 // For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
                 // list. However, we support having the value name for coherence with the static/sparse case
-                if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
-                    throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", name));
+                if (def.kind != ColumnDefinition.Kind.REGULAR && def.kind != ColumnDefinition.Kind.COMPACT_VALUE)
+                    throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", def.name));
 
-                Operation op = deletion.prepare(name);
+                Operation op = deletion.prepare(def);
                 op.collectMarkerSpecification(boundNames);
                 stmt.addOperation(op);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 0f425b8..d8e0945 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnSlice;
@@ -152,30 +153,29 @@ public abstract class ModificationStatement implements CQLStatement
 
     public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
         for (Relation rel : whereClause)
         {
-            CFDefinition.Name name = cfDef.get(rel.getEntity());
-            if (name == null)
+            ColumnDefinition def = cfm.getColumnDefinition(rel.getEntity());
+            if (def == null)
                 throw new InvalidRequestException(String.format("Unknown key identifier %s", rel.getEntity()));
 
-            switch (name.kind)
+            switch (def.kind)
             {
-                case KEY_ALIAS:
-                case COLUMN_ALIAS:
+                case PARTITION_KEY:
+                case CLUSTERING_COLUMN:
                     Restriction restriction;
 
                     if (rel.operator() == Relation.Type.EQ)
                     {
-                        Term t = rel.getValue().prepare(name);
+                        Term t = rel.getValue().prepare(def);
                         t.collectMarkerSpecification(names);
                         restriction = new Restriction.EQ(t, false);
                     }
-                    else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator() == Relation.Type.IN)
+                    else if (def.kind == ColumnDefinition.Kind.PARTITION_KEY && rel.operator() == Relation.Type.IN)
                     {
                         if (rel.getValue() != null)
                         {
-                            Term t = rel.getValue().prepare(name);
+                            Term t = rel.getValue().prepare(def);
                             t.collectMarkerSpecification(names);
                             restriction = Restriction.IN.create(t);
                         }
@@ -184,7 +184,7 @@ public abstract class ModificationStatement implements CQLStatement
                             List<Term> values = new ArrayList<Term>(rel.getInValues().size());
                             for (Term.Raw raw : rel.getInValues())
                             {
-                                Term t = raw.prepare(name);
+                                Term t = raw.prepare(def);
                                 t.collectMarkerSpecification(names);
                                 values.add(t);
                             }
@@ -193,14 +193,14 @@ public abstract class ModificationStatement implements CQLStatement
                     }
                     else
                     {
-                        throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name));
+                        throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), def.name));
                     }
 
-                    addKeyValues(name.name, restriction);
+                    addKeyValues(def.name, restriction);
                     break;
-                case VALUE_ALIAS:
-                case COLUMN_METADATA:
-                    throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name));
+                case COMPACT_VALUE:
+                case REGULAR:
+                    throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", def.name));
             }
         }
     }
@@ -208,14 +208,13 @@ public abstract class ModificationStatement implements CQLStatement
     public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
     throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
-        ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
+        ColumnNameBuilder keyBuilder = cfm.getKeyNameBuilder();
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-        for (CFDefinition.Name name : cfDef.keys.values())
+        for (ColumnDefinition def : cfm.partitionKeyColumns())
         {
-            Restriction r = processedKeys.get(name.name);
+            Restriction r = processedKeys.get(def.name);
             if (r == null)
-                throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+                throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
 
             List<ByteBuffer> values = r.values(variables);
 
@@ -224,7 +223,7 @@ public abstract class ModificationStatement implements CQLStatement
                 for (ByteBuffer val : values)
                 {
                     if (val == null)
-                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
                     keys.add(keyBuilder.copy().add(val).build());
                 }
             }
@@ -234,7 +233,7 @@ public abstract class ModificationStatement implements CQLStatement
                     throw new InvalidRequestException("IN is only supported on the last column of the partition key");
                 ByteBuffer val = values.get(0);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
                 keyBuilder.add(val);
             }
         }
@@ -244,21 +243,20 @@ public abstract class ModificationStatement implements CQLStatement
     public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
     throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
-        ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
-        CFDefinition.Name firstEmptyKey = null;
-        for (CFDefinition.Name name : cfDef.columns.values())
+        ColumnNameBuilder builder = cfm.getColumnNameBuilder();
+        ColumnDefinition firstEmptyKey = null;
+        for (ColumnDefinition def : cfm.clusteringColumns())
         {
-            Restriction r = processedKeys.get(name.name);
+            Restriction r = processedKeys.get(def.name);
             if (r == null)
             {
-                firstEmptyKey = name;
-                if (requireFullClusteringKey() && cfDef.isComposite && !cfDef.isCompact)
-                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+                firstEmptyKey = def;
+                if (requireFullClusteringKey() && cfm.hasCompositeComparator() && !cfm.isDense())
+                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
             }
             else if (firstEmptyKey != null)
             {
-                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, name.name));
+                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, def.name));
             }
             else
             {
@@ -266,19 +264,19 @@ public abstract class ModificationStatement implements CQLStatement
                 assert values.size() == 1; // We only allow IN for row keys so far
                 ByteBuffer val = values.get(0);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name));
+                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", def.name));
                 builder.add(val);
             }
         }
         return builder;
     }
 
-    protected CFDefinition.Name getFirstEmptyKey()
+    protected ColumnDefinition getFirstEmptyKey()
     {
-        for (CFDefinition.Name name : cfm.getCfDef().columns.values())
+        for (ColumnDefinition def : cfm.clusteringColumns())
         {
-            if (processedKeys.get(name.name) == null)
-                return name;
+            if (processedKeys.get(def.name) == null)
+                return def;
         }
         return null;
     }
@@ -287,21 +285,21 @@ public abstract class ModificationStatement implements CQLStatement
     throws RequestExecutionException, RequestValidationException
     {
         // Lists SET operation incurs a read.
-        Set<ByteBuffer> toRead = null;
+        Set<ColumnIdentifier> toRead = null;
         for (Operation op : columnOperations)
         {
             if (op.requiresRead())
             {
                 if (toRead == null)
-                    toRead = new TreeSet<ByteBuffer>(UTF8Type.instance);
-                toRead.add(op.columnName.key);
+                    toRead = new TreeSet<ColumnIdentifier>();
+                toRead.add(op.columnName);
             }
         }
 
         return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
     }
 
-    private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+    private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ColumnIdentifier> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         try
@@ -315,7 +313,7 @@ public abstract class ModificationStatement implements CQLStatement
 
         ColumnSlice[] slices = new ColumnSlice[toRead.size()];
         int i = 0;
-        for (ByteBuffer name : toRead)
+        for (ColumnIdentifier name : toRead)
         {
             ByteBuffer start = clusteringPrefix.copy().add(name).build();
             ByteBuffer finish = clusteringPrefix.copy().add(name).buildAsEndOfRange();
@@ -450,24 +448,22 @@ public abstract class ModificationStatement implements CQLStatement
 
     private ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
-
         Selection selection;
         if (ifNotExists)
         {
-            selection = Selection.wildcard(cfDef);
+            selection = Selection.wildcard(cfm);
         }
         else
         {
-            List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>(columnConditions.size());
+            List<ColumnDefinition> defs = new ArrayList<>(columnConditions.size());
             for (Operation condition : columnConditions)
-                names.add(cfDef.get(condition.columnName));
-            selection = Selection.forColumns(names);
+                defs.add(cfm.getColumnDefinition(condition.columnName));
+            selection = Selection.forColumns(defs);
         }
 
         long now = System.currentTimeMillis();
         Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
-        SelectStatement.forSelection(cfDef, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), now, builder);
+        SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), now, builder);
 
         return builder.build();
     }
@@ -538,8 +534,7 @@ public abstract class ModificationStatement implements CQLStatement
         ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
 
         // CQL row marker
-        CFDefinition cfDef = cfm.getCfDef();
-        if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper())
+        if (cfm.hasCompositeComparator() && !cfm.isDense() && !cfm.isSuper())
         {
             ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
             cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
@@ -577,12 +572,11 @@ public abstract class ModificationStatement implements CQLStatement
         public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException
         {
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
-            CFDefinition cfDef = metadata.getCfDef();
 
             Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
             preparedAttributes.collectMarkerSpecification(boundNames);
 
-            ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes);
+            ModificationStatement stmt = prepareInternal(metadata, boundNames, preparedAttributes);
 
             if (ifNotExists || (conditions != null && !conditions.isEmpty()))
             {
@@ -603,8 +597,8 @@ public abstract class ModificationStatement implements CQLStatement
                 {
                     for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : conditions)
                     {
-                        CFDefinition.Name name = cfDef.get(entry.left);
-                        if (name == null)
+                        ColumnDefinition def = metadata.getColumnDefinition(entry.left);
+                        if (def == null)
                             throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
 
                         /*
@@ -613,21 +607,21 @@ public abstract class ModificationStatement implements CQLStatement
                          * now, we just refuse lists, which also save use from having to bother about the read that some
                          * list operation involve.
                          */
-                        if (name.type instanceof ListType)
-                            throw new InvalidRequestException(String.format("List operation (%s) are not allowed in conditional updates", name));
+                        if (def.type instanceof ListType)
+                            throw new InvalidRequestException(String.format("List operation (%s) are not allowed in conditional updates", def.name));
 
-                        Operation condition = entry.right.prepare(name);
+                        Operation condition = entry.right.prepare(def);
                         assert !condition.requiresRead();
 
                         condition.collectMarkerSpecification(boundNames);
 
-                        switch (name.kind)
+                        switch (def.kind)
                         {
-                            case KEY_ALIAS:
-                            case COLUMN_ALIAS:
+                            case PARTITION_KEY:
+                            case CLUSTERING_COLUMN:
                                 throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
-                            case VALUE_ALIAS:
-                            case COLUMN_METADATA:
+                            case COMPACT_VALUE:
+                            case REGULAR:
                                 stmt.addCondition(condition);
                                 break;
                         }
@@ -637,6 +631,6 @@ public abstract class ModificationStatement implements CQLStatement
             return stmt;
         }
 
-        protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException;
+        protected abstract ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException;
     }
 }


[4/4] git commit: Remove CFDefinition

Posted by sl...@apache.org.
Remove CFDefinition

patch by slebresne; reviewed by iamaleksey for CASSANDRA-6253


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5f5905d5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5f5905d5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5f5905d5

Branch: refs/heads/trunk
Commit: 5f5905d51561b2adb616a9b1b0b29f1358372ba4
Parents: 91d60fb
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Oct 25 15:13:41 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Oct 30 17:10:09 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java | 534 +++++++++++--------
 .../cassandra/config/ColumnDefinition.java      | 239 +++++----
 .../org/apache/cassandra/config/Schema.java     |  31 --
 .../cassandra/config/TriggerDefinition.java     |   4 +-
 .../cassandra/cql/AlterTableStatement.java      |   4 +-
 .../cql/CreateColumnFamilyStatement.java        |  11 +-
 .../apache/cassandra/cql/QueryProcessor.java    |  19 +-
 .../apache/cassandra/cql/SelectStatement.java   |  10 -
 .../apache/cassandra/cql/UpdateStatement.java   |  14 +-
 .../org/apache/cassandra/cql3/CFDefinition.java | 297 -----------
 .../apache/cassandra/cql3/ColumnIdentifier.java |  16 +-
 .../cassandra/cql3/ColumnNameBuilder.java       |   2 +
 .../cassandra/cql3/ColumnSpecification.java     |   7 -
 .../org/apache/cassandra/cql3/Constants.java    |   8 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |  12 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |   8 +-
 .../org/apache/cassandra/cql3/Operation.java    |  19 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |   8 +-
 src/java/org/apache/cassandra/cql3/Sets.java    |   6 +-
 .../apache/cassandra/cql3/UntypedResultSet.java |   2 +-
 .../apache/cassandra/cql3/UpdateParameters.java |   4 +-
 .../cassandra/cql3/functions/TokenFct.java      |  14 +-
 .../cql3/statements/AlterTableStatement.java    |  91 ++--
 .../cql3/statements/CreateIndexStatement.java   |  15 +-
 .../cql3/statements/CreateTableStatement.java   |  32 +-
 .../cql3/statements/DeleteStatement.java        |  22 +-
 .../cql3/statements/ModificationStatement.java  | 120 ++---
 .../cql3/statements/SelectStatement.java        | 358 +++++++------
 .../cassandra/cql3/statements/Selection.java    | 103 ++--
 .../cql3/statements/UpdateStatement.java        |  69 +--
 .../apache/cassandra/db/BatchlogManager.java    |   2 +-
 src/java/org/apache/cassandra/db/Column.java    |  10 +-
 .../cassandra/db/filter/ExtendedFilter.java     |  18 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |   4 +-
 .../cassandra/db/index/SecondaryIndex.java      |  12 +-
 .../db/index/SecondaryIndexManager.java         |  10 +-
 .../db/index/composites/CompositesIndex.java    |   8 +-
 .../CompositesIndexOnClusteringKey.java         |  22 +-
 .../CompositesIndexOnPartitionKey.java          |   8 +-
 .../composites/CompositesIndexOnRegular.java    |  12 +-
 .../db/index/composites/CompositesSearcher.java |   2 +-
 .../cassandra/db/index/keys/KeysIndex.java      |   4 +-
 .../cassandra/db/marshal/CompositeType.java     |   8 +
 .../hadoop/pig/AbstractCassandraStorage.java    |  34 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  13 +-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../cassandra/service/pager/QueryPagers.java    |   2 +-
 .../cassandra/thrift/ThriftValidation.java      |  16 +-
 .../apache/cassandra/tools/SSTableExport.java   |   2 +-
 .../apache/cassandra/tools/SSTableImport.java   |   2 +-
 .../org/apache/cassandra/tracing/Tracing.java   |   2 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  87 +--
 .../cassandra/config/ColumnDefinitionTest.java  |  19 +-
 .../org/apache/cassandra/config/DefsTest.java   |  23 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   4 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  18 +-
 .../cassandra/thrift/ThriftValidationTest.java  |   4 +-
 58 files changed, 1114 insertions(+), 1314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a8f7fd9..e51f598 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
  * Remove 1.2 network compatibility code (CASSANDRA-5960)
  * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
 
 
 2.0.3

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 2fa2221..e2bd3eb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -27,6 +27,7 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.ArrayUtils;
@@ -37,10 +38,7 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.CFDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
@@ -89,6 +87,15 @@ public final class CFMetaData
     // Note that this is the default only for user created tables
     public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
 
+    // Note that this need to come *before* any CFMetaData is defined so before the compile below.
+    private static final Comparator<ColumnDefinition> regularColumnComparator = new Comparator<ColumnDefinition>()
+    {
+        public int compare(ColumnDefinition def1, ColumnDefinition def2)
+        {
+            return def1.name.compareTo(def2.name);
+        }
+    };
+
     public static final CFMetaData IndexCf = compile("CREATE TABLE \"" + SystemKeyspace.INDEX_CF + "\" ("
                                                      + "table_name text,"
                                                      + "index_name text,"
@@ -389,11 +396,11 @@ public final class CFMetaData
     private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
     private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
     private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
-    private volatile Map<ByteBuffer, Long> droppedColumns = new HashMap<>();
+    private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
     private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
 
     /*
-     * All CQL3 columns definition are stored in the column_metadata map.
+     * All CQL3 columns definition are stored in the columnMetadata map.
      * On top of that, we keep separated collection of each kind of definition, to
      * 1) allow easy access to each kind and 2) for the partition key and
      * clustering key ones, those list are ordered by the "component index" of the
@@ -403,10 +410,10 @@ public final class CFMetaData
     public static final String DEFAULT_COLUMN_ALIAS = "column";
     public static final String DEFAULT_VALUE_ALIAS = "value";
 
-    private volatile Map<ByteBuffer, ColumnDefinition> column_metadata = new HashMap<>();
+    private volatile Map<ByteBuffer, ColumnDefinition> columnMetadata = new HashMap<>();
     private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
-    private volatile List<ColumnDefinition> clusteringKeyColumns; // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
-    private volatile Set<ColumnDefinition> regularColumns;
+    private volatile List<ColumnDefinition> clusteringColumns;    // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
+    private volatile SortedSet<ColumnDefinition> regularColumns;  // We use a sorted set so iteration is of predictable order (for SELECT for instance)
     private volatile ColumnDefinition compactValueColumn;
 
     public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -414,11 +421,6 @@ public final class CFMetaData
 
     public volatile CompressionParameters compressionParameters = new CompressionParameters(null);
 
-    // Processed infos used by CQL. This can be fully reconstructed from the CFMedata,
-    // so it's not saved on disk. It is however costlyish to recreate for each query
-    // so we cache it here (and update on each relevant CFMetadata change)
-    private volatile CFDefinition cqlCfDef;
-
     public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;}
     public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
     public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
@@ -428,7 +430,7 @@ public final class CFMetaData
     public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;}
     public CFMetaData minCompactionThreshold(int prop) {minCompactionThreshold = prop; return this;}
     public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
-    public CFMetaData columnMetadata(Map<ByteBuffer,ColumnDefinition> prop) {column_metadata = prop; return this;}
+    public CFMetaData columnMetadata(Map<ByteBuffer,ColumnDefinition> prop) {columnMetadata = prop; return this;}
     public CFMetaData compactionStrategyClass(Class<? extends AbstractCompactionStrategy> prop) {compactionStrategyClass = prop; return this;}
     public CFMetaData compactionStrategyOptions(Map<String, String> prop) {compactionStrategyOptions = prop; return this;}
     public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
@@ -439,7 +441,7 @@ public final class CFMetaData
     public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
     public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
     public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;}
-    public CFMetaData droppedColumns(Map<ByteBuffer, Long> cols) {droppedColumns = cols; return this;}
+    public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
     public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
 
     public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
@@ -528,7 +530,7 @@ public final class CFMetaData
                              : Caching.NONE;
 
         return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, columnComparator, (AbstractType)null)
-                             .keyValidator(info.getValidator())
+                             .keyValidator(info.type)
                              .readRepairChance(0.0)
                              .dcLocalReadRepairChance(0.0)
                              .gcGraceSeconds(0)
@@ -564,10 +566,10 @@ public final class CFMetaData
     static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
     {
         Map<ByteBuffer, ColumnDefinition> clonedColumns = new HashMap<>();
-        for (ColumnDefinition cd : oldCFMD.column_metadata.values())
+        for (ColumnDefinition cd : oldCFMD.allColumns())
         {
             ColumnDefinition cloned = cd.copy();
-            clonedColumns.put(cloned.name, cloned);
+            clonedColumns.put(cloned.name.bytes, cloned);
         }
 
         return newCFMD.comment(oldCFMD.comment)
@@ -606,7 +608,7 @@ public final class CFMetaData
     public String indexColumnFamilyName(ColumnDefinition info)
     {
         // TODO simplify this when info.index_name is guaranteed to be set
-        return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name) : info.getIndexName());
+        return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name.bytes) : info.getIndexName());
     }
 
     public String getComment()
@@ -682,16 +684,9 @@ public final class CFMetaData
         if (partitionKeyColumns.size() > 1)
             throw new IllegalStateException("Cannot acces column family with composite key from CQL < 3.0.0");
 
-        try
-        {
-            // For compatibility sake, we uppercase if it's the default alias as we used to return it that way in resultsets.
-            String str = ByteBufferUtil.string(partitionKeyColumns.get(0).name);
-            return str.equalsIgnoreCase(DEFAULT_KEY_ALIAS) ? str.toUpperCase() : str;
-        }
-        catch (CharacterCodingException e)
-        {
-            throw new RuntimeException(e.getMessage(), e);
-        }
+        // For compatibility sake, we uppercase if it's the default alias as we used to return it that way in resultsets.
+        String str = partitionKeyColumns.get(0).name.toString();
+        return str.equalsIgnoreCase(DEFAULT_KEY_ALIAS) ? str.toUpperCase() : str;
     }
 
     public CompressionParameters compressionParameters()
@@ -701,7 +696,42 @@ public final class CFMetaData
 
     public Collection<ColumnDefinition> allColumns()
     {
-        return column_metadata.values();
+        return columnMetadata.values();
+    }
+
+    // An iterator over all column definitions but that respect the order of a SELECT *.
+    public Iterator<ColumnDefinition> allColumnsInSelectOrder()
+    {
+        return new AbstractIterator<ColumnDefinition>()
+        {
+            private final Iterator<ColumnDefinition> partitionKeyIter = partitionKeyColumns.iterator();
+            private final Iterator<ColumnDefinition> clusteringIter = clusteringColumns.iterator();
+            private boolean valueDone;
+            private final Iterator<ColumnDefinition> regularIter = regularColumns.iterator();
+
+            protected ColumnDefinition computeNext()
+            {
+                if (partitionKeyIter.hasNext())
+                    return partitionKeyIter.next();
+
+                if (clusteringIter.hasNext())
+                    return clusteringIter.next();
+
+                if (compactValueColumn != null && !valueDone)
+                {
+                    valueDone = true;
+                    // If the compactValueColumn is empty, this means we have a dense table but
+                    // with only a PK. As far as selects are concerned, we should ignore the value.
+                    if (compactValueColumn.name.bytes.hasRemaining())
+                        return compactValueColumn;
+                }
+
+                if (regularIter.hasNext())
+                    return regularIter.next();
+
+                return endOfData();
+            }
+        };
     }
 
     public List<ColumnDefinition> partitionKeyColumns()
@@ -709,9 +739,9 @@ public final class CFMetaData
         return partitionKeyColumns;
     }
 
-    public List<ColumnDefinition> clusteringKeyColumns()
+    public List<ColumnDefinition> clusteringColumns()
     {
-        return clusteringKeyColumns;
+        return clusteringColumns;
     }
 
     public Set<ColumnDefinition> regularColumns()
@@ -724,6 +754,20 @@ public final class CFMetaData
         return compactValueColumn;
     }
 
+    public ColumnNameBuilder getKeyNameBuilder()
+    {
+        return keyValidator instanceof CompositeType
+             ? new CompositeType.Builder((CompositeType)keyValidator)
+             : new NonCompositeBuilder(keyValidator);
+    }
+
+    public ColumnNameBuilder getColumnNameBuilder()
+    {
+        return comparator instanceof CompositeType
+             ? new CompositeType.Builder((CompositeType)comparator)
+             : new NonCompositeBuilder(comparator);
+    }
+
     public double getBloomFilterFpChance()
     {
         // we disallow bFFPC==null starting in 1.2.1 but tolerated it before that
@@ -757,7 +801,7 @@ public final class CFMetaData
         return defaultTimeToLive;
     }
 
-    public Map<ByteBuffer, Long> getDroppedColumns()
+    public Map<ColumnIdentifier, Long> getDroppedColumns()
     {
         return droppedColumns;
     }
@@ -789,7 +833,7 @@ public final class CFMetaData
             .append(minCompactionThreshold, rhs.minCompactionThreshold)
             .append(maxCompactionThreshold, rhs.maxCompactionThreshold)
             .append(cfId, rhs.cfId)
-            .append(column_metadata, rhs.column_metadata)
+            .append(columnMetadata, rhs.columnMetadata)
             .append(compactionStrategyClass, rhs.compactionStrategyClass)
             .append(compactionStrategyOptions, rhs.compactionStrategyOptions)
             .append(compressionParameters, rhs.compressionParameters)
@@ -822,7 +866,7 @@ public final class CFMetaData
             .append(minCompactionThreshold)
             .append(maxCompactionThreshold)
             .append(cfId)
-            .append(column_metadata)
+            .append(columnMetadata)
             .append(compactionStrategyClass)
             .append(compactionStrategyOptions)
             .append(compressionParameters)
@@ -838,7 +882,17 @@ public final class CFMetaData
             .toHashCode();
     }
 
-    public AbstractType<?> getValueValidator(ByteBuffer column)
+    public AbstractType<?> getValueValidatorFromCellName(ByteBuffer cellName)
+    {
+        // If this is a CQL table, we need to pull out the CQL column name to look up the correct column type.
+        if (!hasCompositeComparator() || isDense())
+            return getValueValidator(new ColumnIdentifier(cellName, comparator));
+
+        ByteBuffer name = ((CompositeType)comparator).extractLastComponent(cellName);
+        return getValueValidator(new ColumnIdentifier(name, UTF8Type.instance));
+    }
+
+    public AbstractType<?> getValueValidator(ColumnIdentifier column)
     {
         return getValueValidator(getColumnDefinition(column));
     }
@@ -847,7 +901,7 @@ public final class CFMetaData
     {
         return columnDefinition == null
                ? defaultValidator
-               : columnDefinition.getValidator();
+               : columnDefinition.type;
     }
 
     /** applies implicit defaults to cf definition. useful in updates */
@@ -930,13 +984,14 @@ public final class CFMetaData
             if (cf_def.isSetKey_validation_class()) { newCFMD.keyValidator(TypeParser.parse(cf_def.key_validation_class)); }
             if (cf_def.isSetKey_alias() && !(newCFMD.keyValidator instanceof CompositeType))
             {
-                newCFMD.column_metadata.put(cf_def.key_alias, ColumnDefinition.partitionKeyDef(cf_def.key_alias, newCFMD.keyValidator, null));
+                newCFMD.columnMetadata.put(cf_def.key_alias,
+                                           ColumnDefinition.partitionKeyDef(newCFMD, cf_def.key_alias, newCFMD.keyValidator, null));
             }
 
             return newCFMD.comment(cf_def.comment)
                           .replicateOnWrite(cf_def.replicate_on_write)
                           .defaultValidator(TypeParser.parse(cf_def.default_validation_class))
-                          .columnMetadata(ColumnDefinition.fromThrift(cf_def.column_metadata, newCFMD.isSuper()))
+                          .columnMetadata(ColumnDefinition.fromThrift(newCFMD, cf_def.column_metadata))
                           .compressionParameters(cp)
                           .rebuild();
         }
@@ -1025,19 +1080,19 @@ public final class CFMetaData
         if (!cfm.droppedColumns.isEmpty())
             droppedColumns = cfm.droppedColumns;
 
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, cfm.column_metadata);
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, cfm.columnMetadata);
         // columns that are no longer needed
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
-            column_metadata.remove(cd.name);
+            removeColumnDefinition(cd);
         // newly added columns
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
-            column_metadata.put(cd.name, cd);
+            addColumnDefinition(cd);
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
         {
-            ColumnDefinition oldDef = column_metadata.get(name);
-            ColumnDefinition def = cfm.column_metadata.get(name);
-            oldDef.apply(def, getColumnDefinitionComparator(oldDef));
+            ColumnDefinition oldDef = columnMetadata.get(name);
+            ColumnDefinition def = cfm.columnMetadata.get(name);
+            addOrReplaceColumnDefinition(oldDef.apply(def));
         }
 
         compactionStrategyClass = cfm.compactionStrategyClass;
@@ -1158,8 +1213,8 @@ public final class CFMetaData
         def.setMax_compaction_threshold(maxCompactionThreshold);
         // We only return the alias if only one is set since thrift don't know about multiple key aliases
         if (partitionKeyColumns.size() == 1)
-            def.setKey_alias(partitionKeyColumns.get(0).name);
-        def.setColumn_metadata(ColumnDefinition.toThrift(column_metadata));
+            def.setKey_alias(partitionKeyColumns.get(0).name.bytes);
+        def.setColumn_metadata(ColumnDefinition.toThrift(columnMetadata));
         def.setCompaction_strategy(compactionStrategyClass.getName());
         def.setCompaction_strategy_options(new HashMap<>(compactionStrategyOptions));
         def.setCompression_options(compressionParameters.asThriftOptions());
@@ -1177,39 +1232,43 @@ public final class CFMetaData
 
     /**
      * Returns the ColumnDefinition for {@code name}.
-     *
-     * Note that {@code name} correspond to the returned ColumnDefinition name,
-     * and in particular for composite cfs, it should usually be only a
-     * component of the full column name. If you have a full column name, use
-     * getColumnDefinitionFromColumnName instead.
      */
+    public ColumnDefinition getColumnDefinition(ColumnIdentifier name)
+    {
+        return columnMetadata.get(name.bytes);
+    }
+
+    // In general it is preferable to work with ColumnIdentifier to make it
+    // clear that we are talking about a CQL column, not a cell name, but there
+    // is a few cases where all we have is a ByteBuffer (when dealing with IndexExpression
+    // for instance) so...
     public ColumnDefinition getColumnDefinition(ByteBuffer name)
     {
-            return column_metadata.get(name);
+        return columnMetadata.get(name);
     }
 
     /**
      * Returns a ColumnDefinition given a full (internal) column name.
      */
-    public ColumnDefinition getColumnDefinitionFromColumnName(ByteBuffer columnName)
+    public ColumnDefinition getColumnDefinitionFromCellName(ByteBuffer cellName)
     {
-        if (!isSuper() && (comparator instanceof CompositeType))
+        if (!isSuper() && hasCompositeComparator())
         {
             CompositeType composite = (CompositeType)comparator;
-            ByteBuffer[] components = composite.split(columnName);
-            for (ColumnDefinition def : column_metadata.values())
+            ByteBuffer[] components = composite.split(cellName);
+            for (ColumnDefinition def : allColumns())
             {
                 ByteBuffer toCompare;
-                if (def.componentIndex == null)
+                if (def.isOnAllComponents())
                 {
-                    toCompare = columnName;
+                    toCompare = cellName;
                 }
                 else
                 {
-                    if (def.componentIndex >= components.length)
+                    if (def.position() >= components.length)
                         break;
 
-                    toCompare = components[def.componentIndex];
+                    toCompare = components[def.position()];
                 }
                 if (def.name.equals(toCompare))
                     return def;
@@ -1218,7 +1277,7 @@ public final class CFMetaData
         }
         else
         {
-            return column_metadata.get(columnName);
+            return columnMetadata.get(cellName);
         }
     }
 
@@ -1226,7 +1285,7 @@ public final class CFMetaData
 
     public ColumnDefinition getColumnDefinitionForIndex(String indexName)
     {
-        for (ColumnDefinition def : column_metadata.values())
+        for (ColumnDefinition def : allColumns())
         {
             if (indexName.equals(def.getIndexName()))
                 return def;
@@ -1245,14 +1304,12 @@ public final class CFMetaData
         {
             CFMetaData cfm = Schema.instance.getCFMetaData(cfId);
 
-            for (Map.Entry<ByteBuffer, ColumnDefinition> entry : column_metadata.entrySet())
+            for (ColumnDefinition newDef : allColumns())
             {
-                ColumnDefinition newDef = entry.getValue();
-
-                if (!cfm.column_metadata.containsKey(entry.getKey()) || newDef.getIndexType() == null)
+                if (!cfm.columnMetadata.containsKey(newDef.name.bytes) || newDef.getIndexType() == null)
                     continue;
 
-                String oldIndexName = cfm.column_metadata.get(entry.getKey()).getIndexName();
+                String oldIndexName = cfm.getColumnDefinition(newDef.name).getIndexName();
 
                 if (oldIndexName == null)
                     continue;
@@ -1265,11 +1322,11 @@ public final class CFMetaData
         }
 
         Set<String> existingNames = existingIndexNames(null);
-        for (ColumnDefinition column : column_metadata.values())
+        for (ColumnDefinition column : allColumns())
         {
             if (column.getIndexType() != null && column.getIndexName() == null)
             {
-                String baseName = getDefaultIndexName(cfName, getColumnDefinitionComparator(column), column.name);
+                String baseName = getDefaultIndexName(cfName, column.name);
                 String indexName = baseName;
                 int i = 0;
                 while (existingNames.contains(indexName))
@@ -1279,9 +1336,9 @@ public final class CFMetaData
         }
     }
 
-    public static String getDefaultIndexName(String cfName, AbstractType<?> comparator, ByteBuffer columnName)
+    public static String getDefaultIndexName(String cfName, ColumnIdentifier columnName)
     {
-        return (cfName + "_" + comparator.getString(columnName) + "_idx").replaceAll("\\W", "");
+        return (cfName + "_" + columnName + "_idx").replaceAll("\\W", "");
     }
 
     public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, Descriptor.Version version)
@@ -1327,39 +1384,20 @@ public final class CFMetaData
         if (defaultValidator instanceof CounterColumnType)
         {
             for (ColumnDefinition def : regularColumns)
-                if (!(def.getValidator() instanceof CounterColumnType))
-                    throw new ConfigurationException("Cannot add a non counter column (" + getColumnDefinitionComparator(def).getString(def.name) + ") in a counter column family");
+                if (!(def.type instanceof CounterColumnType))
+                    throw new ConfigurationException("Cannot add a non counter column (" + def.name + ") in a counter column family");
         }
         else
         {
-            for (ColumnDefinition def : column_metadata.values())
-                if (def.getValidator() instanceof CounterColumnType)
-                    throw new ConfigurationException("Cannot add a counter column (" + getColumnDefinitionComparator(def).getString(def.name) + ") in a non counter column family");
+            for (ColumnDefinition def : allColumns())
+                if (def.type instanceof CounterColumnType)
+                    throw new ConfigurationException("Cannot add a counter column (" + def.name + ") in a non counter column family");
         }
 
-        for (ColumnDefinition def : partitionKeyColumns)
-            validateAlias(def, "Key");
-        for (ColumnDefinition def : clusteringKeyColumns)
-            validateAlias(def, "Column");
-        if (compactValueColumn != null)
-            validateAlias(compactValueColumn, "Value");
-
         // initialize a set of names NOT in the CF under consideration
         Set<String> indexNames = existingIndexNames(cfName);
-        for (ColumnDefinition c : column_metadata.values())
+        for (ColumnDefinition c : allColumns())
         {
-            AbstractType<?> comparator = getColumnDefinitionComparator(c);
-
-            try
-            {
-                comparator.validate(c.name);
-            }
-            catch (MarshalException e)
-            {
-                throw new ConfigurationException(String.format("Column name %s is not valid for comparator %s",
-                                                               ByteBufferUtil.bytesToHex(c.name), comparator));
-            }
-
             if (c.getIndexType() == null)
             {
                 if (c.getIndexName() != null)
@@ -1407,18 +1445,6 @@ public final class CFMetaData
         return indexNames;
     }
 
-    private static void validateAlias(ColumnDefinition alias, String msg) throws ConfigurationException
-    {
-        try
-        {
-            UTF8Type.instance.validate(alias.name);
-        }
-        catch (MarshalException e)
-        {
-            throw new ConfigurationException(msg + " alias must be UTF8");
-        }
-    }
-
     private void validateCompactionThresholds() throws ConfigurationException
     {
         if (maxCompactionThreshold == 0)
@@ -1451,28 +1477,28 @@ public final class CFMetaData
 
         newState.toSchemaNoColumnsNoTriggers(rm, modificationTimestamp);
 
-        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, newState.column_metadata);
+        MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, newState.columnMetadata);
 
         // columns that are no longer needed
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
         {
             // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type
             // are being deleted just because they are not here.
-            if (fromThrift && cd.type != ColumnDefinition.Type.REGULAR)
+            if (fromThrift && cd.kind != ColumnDefinition.Kind.REGULAR)
                 continue;
 
-            cd.deleteFromSchema(rm, cfName, getColumnDefinitionComparator(cd), modificationTimestamp);
+            cd.deleteFromSchema(rm, modificationTimestamp);
         }
 
         // newly added columns
         for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
-            cd.toSchema(rm, cfName, getColumnDefinitionComparator(cd), modificationTimestamp);
+            cd.toSchema(rm, modificationTimestamp);
 
         // old columns with updated attributes
         for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
         {
-            ColumnDefinition cd = newState.getColumnDefinition(name);
-            cd.toSchema(rm, cfName, getColumnDefinitionComparator(cd), modificationTimestamp);
+            ColumnDefinition cd = newState.columnMetadata.get(name);
+            cd.toSchema(rm, modificationTimestamp);
         }
 
         MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(triggers, newState.triggers);
@@ -1501,12 +1527,12 @@ public final class CFMetaData
         ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = SchemaColumnFamiliesCf.getCfDef().getColumnNameBuilder();
+        ColumnNameBuilder builder = SchemaColumnFamiliesCf.getColumnNameBuilder();
         builder.add(ByteBufferUtil.bytes(cfName));
         cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
 
-        for (ColumnDefinition cd : column_metadata.values())
-            cd.deleteFromSchema(rm, cfName, getColumnDefinitionComparator(cd), timestamp);
+        for (ColumnDefinition cd : allColumns())
+            cd.deleteFromSchema(rm, timestamp);
 
         for (TriggerDefinition td : triggers.values())
             td.deleteFromSchema(rm, cfName, timestamp);
@@ -1518,8 +1544,8 @@ public final class CFMetaData
     {
         toSchemaNoColumnsNoTriggers(rm, timestamp);
 
-        for (ColumnDefinition cd : column_metadata.values())
-            cd.toSchema(rm, cfName, getColumnDefinitionComparator(cd), timestamp);
+        for (ColumnDefinition cd : allColumns())
+            cd.toSchema(rm, timestamp);
     }
 
     private void toSchemaNoColumnsNoTriggers(RowMutation rm, long timestamp)
@@ -1568,14 +1594,14 @@ public final class CFMetaData
         cf.addColumn(Column.create(indexInterval, timestamp, cfName, "index_interval"));
         cf.addColumn(Column.create(speculativeRetry.toString(), timestamp, cfName, "speculative_retry"));
 
-        for (Map.Entry<ByteBuffer, Long> entry : droppedColumns.entrySet())
+        for (Map.Entry<ColumnIdentifier, Long> entry : droppedColumns.entrySet())
             cf.addColumn(new Column(makeDroppedColumnName(entry.getKey()), LongType.instance.decompose(entry.getValue()), timestamp));
 
         // Save the CQL3 metadata "the old way" for compatibility sake
         cf.addColumn(Column.create(aliasesToJson(partitionKeyColumns), timestamp, cfName, "key_aliases"));
-        cf.addColumn(Column.create(aliasesToJson(clusteringKeyColumns), timestamp, cfName, "column_aliases"));
+        cf.addColumn(Column.create(aliasesToJson(clusteringColumns), timestamp, cfName, "column_aliases"));
         cf.addColumn(compactValueColumn == null ? DeletedColumn.create(ldt, timestamp, cfName, "value_alias")
-                                                : Column.create(compactValueColumn.name, timestamp, cfName, "value_alias"));
+                                                : Column.create(compactValueColumn.name.bytes, timestamp, cfName, "value_alias"));
     }
 
     // Package protected for use by tests
@@ -1627,19 +1653,19 @@ public final class CFMetaData
                 cfm.populateIoCacheOnFlush(result.getBoolean("populate_io_cache_on_flush"));
 
             /*
-             * The info previously hold by key_aliases, column_aliases and value_alias is now stored in column_metadata (because 1) this
+             * The info previously hold by key_aliases, column_aliases and value_alias is now stored in columnMetadata (because 1) this
              * make more sense and 2) this allow to store indexing information).
              * However, for upgrade sake we need to still be able to read those old values. Moreover, we cannot easily
-             * remove those old columns once "converted" to column_metadata because that would screw up nodes that may
+             * remove those old columns once "converted" to columnMetadata because that would screw up nodes that may
              * not have upgraded. So for now we keep the both info and in sync, even though its redundant.
              * In other words, the ColumnDefinition the following lines add may be replaced later when ColumnDefinition.fromSchema
              * is called but that's ok.
              */
-            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("key_aliases"))), cfm.keyValidator, ColumnDefinition.Type.PARTITION_KEY);
-            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("column_aliases"))), cfm.comparator, ColumnDefinition.Type.CLUSTERING_KEY);
+            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("key_aliases"))), cfm.keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
+            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("column_aliases"))), cfm.comparator, ColumnDefinition.Kind.CLUSTERING_COLUMN);
 
             if (result.has("value_alias"))
-                cfm.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(result.getBytes("value_alias")), cfm.defaultValidator, ColumnDefinition.Type.COMPACT_VALUE);
+                cfm.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(result.getBytes("value_alias")), cfm.defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
 
             if (result.has("dropped_columns"))
                 cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
@@ -1652,7 +1678,7 @@ public final class CFMetaData
         }
     }
 
-    public void addColumnMetadataFromAliases(List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Type type)
+    public void addColumnMetadataFromAliases(List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
     {
         if (comparator instanceof CompositeType)
         {
@@ -1660,14 +1686,16 @@ public final class CFMetaData
             for (int i = 0; i < aliases.size(); ++i)
             {
                 if (aliases.get(i) != null)
-                    column_metadata.put(aliases.get(i), new ColumnDefinition(aliases.get(i), ct.types.get(i), i, type));
+                {
+                    addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(i), ct.types.get(i), i, kind));
+                }
             }
         }
         else
         {
             assert aliases.size() <= 1;
             if (!aliases.isEmpty() && aliases.get(0) != null)
-                column_metadata.put(aliases.get(0), new ColumnDefinition(aliases.get(0), comparator, null, type));
+                addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(0), comparator, null, kind));
         }
     }
 
@@ -1697,7 +1725,7 @@ public final class CFMetaData
     {
         List<String> aliases = new ArrayList<String>(rawAliases.size());
         for (ColumnDefinition rawAlias : rawAliases)
-            aliases.add(UTF8Type.instance.compose(rawAlias.name));
+            aliases.add(rawAlias.name.toString());
         return json(aliases);
     }
 
@@ -1709,17 +1737,17 @@ public final class CFMetaData
         return rawAliases;
     }
 
-    private static Map<ByteBuffer, Long> convertDroppedColumns(Map<String, Long> raw)
+    private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw)
     {
-        Map<ByteBuffer, Long> converted = Maps.newHashMap();
+        Map<ColumnIdentifier, Long> converted = Maps.newHashMap();
         for (Map.Entry<String, Long> entry : raw.entrySet())
-            converted.put(UTF8Type.instance.decompose(entry.getKey()), entry.getValue());
+            converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue());
         return converted;
     }
 
-    private ByteBuffer makeDroppedColumnName(ByteBuffer column)
+    private ByteBuffer makeDroppedColumnName(ColumnIdentifier column)
     {
-        ColumnNameBuilder builder = SchemaColumnFamiliesCf.cqlCfDef.getColumnNameBuilder();
+        ColumnNameBuilder builder = SchemaColumnFamiliesCf.getColumnNameBuilder();
         builder.add(UTF8Type.instance.decompose(cfName));
         builder.add(UTF8Type.instance.decompose("dropped_columns"));
         return builder.add(column).build();
@@ -1745,12 +1773,12 @@ public final class CFMetaData
 
     public AbstractType<?> getColumnDefinitionComparator(ColumnDefinition def)
     {
-        return getComponentComparator(def.componentIndex, def.type);
+        return getComponentComparator(def.isOnAllComponents() ? null : def.position(), def.kind);
     }
 
-    public AbstractType<?> getComponentComparator(Integer componentIndex, ColumnDefinition.Type type)
+    public AbstractType<?> getComponentComparator(Integer componentIndex, ColumnDefinition.Kind kind)
     {
-        switch (type)
+        switch (kind)
         {
             case REGULAR:
                 AbstractType<?> cfComparator = cfType == ColumnFamilyType.Super ? ((CompositeType)comparator).types.get(1) : comparator;
@@ -1775,31 +1803,32 @@ public final class CFMetaData
     }
 
     // Package protected for use by tests
-    static CFMetaData addColumnDefinitionsFromSchema(CFMetaData cfDef, Row serializedColumnDefinitions)
+    static CFMetaData addColumnDefinitionsFromSchema(CFMetaData cfm, Row serializedColumnDefinitions)
     {
-        for (ColumnDefinition cd : ColumnDefinition.fromSchema(serializedColumnDefinitions, cfDef))
-            cfDef.column_metadata.put(cd.name, cd);
-        return cfDef.rebuild();
+        for (ColumnDefinition cd : ColumnDefinition.fromSchema(serializedColumnDefinitions, cfm))
+            cfm.addOrReplaceColumnDefinition(cd);
+        return cfm.rebuild();
     }
 
-    public void addColumnDefinition(ColumnDefinition def) throws ConfigurationException
+    public CFMetaData addColumnDefinition(ColumnDefinition def) throws ConfigurationException
     {
-        if (column_metadata.containsKey(def.name))
-            throw new ConfigurationException(String.format("Cannot add column %s, a column with the same name already exists", getColumnDefinitionComparator(def).getString(def.name)));
+        if (columnMetadata.containsKey(def.name.bytes))
+            throw new ConfigurationException(String.format("Cannot add column %s, a column with the same name already exists", def.name));
 
-        addOrReplaceColumnDefinition(def);
+        return addOrReplaceColumnDefinition(def);
     }
 
     // This method doesn't check if a def of the same name already exist and should only be used when we
     // know this cannot happen.
-    public void addOrReplaceColumnDefinition(ColumnDefinition def)
+    public CFMetaData addOrReplaceColumnDefinition(ColumnDefinition def)
     {
-        column_metadata.put(def.name, def);
+        columnMetadata.put(def.name.bytes, def);
+        return this;
     }
 
     public boolean removeColumnDefinition(ColumnDefinition def)
     {
-        return column_metadata.remove(def.name) != null;
+        return columnMetadata.remove(def.name.bytes) != null;
     }
 
     private static void addTriggerDefinitionsFromSchema(CFMetaData cfDef, Row serializedTriggerDefinitions)
@@ -1822,76 +1851,58 @@ public final class CFMetaData
 
     public void recordColumnDrop(ColumnDefinition def)
     {
-        assert def.componentIndex != null;
+        assert !def.isOnAllComponents();
         droppedColumns.put(def.name, FBUtilities.timestampMicros());
     }
 
-    public void renameColumn(ByteBuffer from, String strFrom, ByteBuffer to, String strTo) throws InvalidRequestException
+    public void renameColumn(ColumnIdentifier from, ColumnIdentifier to) throws InvalidRequestException
     {
-        ColumnDefinition def = column_metadata.get(from);
+        ColumnDefinition def = getColumnDefinition(from);
         if (def == null)
-            throw new InvalidRequestException(String.format("Cannot rename unknown column %s in keyspace %s", strFrom, cfName));
+            throw new InvalidRequestException(String.format("Cannot rename unknown column %s in keyspace %s", from, cfName));
 
-        if (column_metadata.get(to) != null)
-            throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", strFrom, strTo, cfName));
+        if (getColumnDefinition(to) != null)
+            throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", from, to, cfName));
 
-        if (def.type == ColumnDefinition.Type.REGULAR)
+        if (def.kind == ColumnDefinition.Kind.REGULAR)
         {
-            throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", strFrom));
+            throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", from));
         }
         else if (def.isIndexed())
         {
-            throw new InvalidRequestException(String.format("Cannot rename column %s because it is secondary indexed", strFrom));
+            throw new InvalidRequestException(String.format("Cannot rename column %s because it is secondary indexed", from));
         }
 
-        ColumnDefinition newDef = def.cloneWithNewName(to);
+        ColumnDefinition newDef = def.withNewName(to);
         // don't call addColumnDefinition/removeColumnDefition because we want to avoid recomputing
         // the CQL3 cfDef between those two operation
-        column_metadata.put(newDef.name, newDef);
-        column_metadata.remove(def.name);
+        addOrReplaceColumnDefinition(newDef);
+        removeColumnDefinition(def);
     }
 
     public CFMetaData rebuild()
     {
-        /*
-         * TODO: There is definitively some repetition between the CQL3  metadata stored in this
-         * object (partitionKeyColumns, ...) and the one stored in CFDefinition.
-         * Ultimately, we should probably merge both. However, there is enough details to fix that
-         * it's worth doing that in a separate issue.
-         */
-        rebuildCQL3Metadata();
-        cqlCfDef = new CFDefinition(this);
-        return this;
-    }
-
-    public CFDefinition getCfDef()
-    {
-        assert cqlCfDef != null;
-        return cqlCfDef;
-    }
-
-    private void rebuildCQL3Metadata()
-    {
         List<ColumnDefinition> pkCols = nullInitializedList(keyValidator.componentsCount());
-        boolean isDense = isDense(comparator, column_metadata.values());
+        boolean isDense = isDense(comparator, allColumns());
         int nbCkCols = isDense
                      ? comparator.componentsCount()
-                     : comparator.componentsCount() - (hasCollection() ? 2 : 1);
+                     : comparator.componentsCount() - (hasCollections() ? 2 : 1);
         List<ColumnDefinition> ckCols = nullInitializedList(nbCkCols);
-        Set<ColumnDefinition> regCols = new HashSet<ColumnDefinition>();
+        // We keep things sorted to get consistent/predicatable order in select queries
+        SortedSet<ColumnDefinition> regCols = new TreeSet<ColumnDefinition>(regularColumnComparator);
         ColumnDefinition compactCol = null;
 
-        for (ColumnDefinition def : column_metadata.values())
+        for (ColumnDefinition def : allColumns())
         {
-            switch (def.type)
+            switch (def.kind)
             {
                 case PARTITION_KEY:
-                    assert !(def.componentIndex == null && keyValidator instanceof CompositeType);
-                    pkCols.set(def.componentIndex == null ? 0 : def.componentIndex, def);
+                    assert !(def.isOnAllComponents() && keyValidator instanceof CompositeType);
+                    pkCols.set(def.position(), def);
                     break;
-                case CLUSTERING_KEY:
-                    assert !(def.componentIndex == null && comparator instanceof CompositeType);
-                    ckCols.set(def.componentIndex == null ? 0 : def.componentIndex, def);
+                case CLUSTERING_COLUMN:
+                    assert !(def.isOnAllComponents() && comparator instanceof CompositeType);
+                    ckCols.set(def.position(), def);
                     break;
                 case REGULAR:
                     regCols.add(def);
@@ -1905,9 +1916,10 @@ public final class CFMetaData
 
         // Now actually assign the correct value. This is not atomic, but then again, updating CFMetaData is never atomic anyway.
         partitionKeyColumns = addDefaultKeyAliases(pkCols);
-        clusteringKeyColumns = addDefaultColumnAliases(ckCols);
+        clusteringColumns = addDefaultColumnAliases(ckCols);
         regularColumns = regCols;
         compactValueColumn = addDefaultValueAlias(compactCol, isDense);
+        return this;
     }
 
     private List<ColumnDefinition> addDefaultKeyAliases(List<ColumnDefinition> pkCols)
@@ -1926,8 +1938,8 @@ public final class CFMetaData
                 // For compatibility sake, we call the first alias 'key' rather than 'key1'. This
                 // is inconsistent with column alias, but it's probably not worth risking breaking compatibility now.
                 ByteBuffer name = ByteBufferUtil.bytes(i == 0 ? DEFAULT_KEY_ALIAS : DEFAULT_KEY_ALIAS + (i + 1));
-                ColumnDefinition newDef = ColumnDefinition.partitionKeyDef(name, type, idx);
-                column_metadata.put(newDef.name, newDef);
+                ColumnDefinition newDef = ColumnDefinition.partitionKeyDef(this, name, type, idx);
+                addOrReplaceColumnDefinition(newDef);
                 pkCols.set(i, newDef);
             }
         }
@@ -1942,14 +1954,14 @@ public final class CFMetaData
             {
                 Integer idx = null;
                 AbstractType<?> type = comparator;
-                if (comparator instanceof CompositeType)
+                if (hasCompositeComparator())
                 {
                     idx = i;
                     type = ((CompositeType)comparator).types.get(i);
                 }
                 ByteBuffer name = ByteBufferUtil.bytes(DEFAULT_COLUMN_ALIAS + (i + 1));
-                ColumnDefinition newDef = ColumnDefinition.clusteringKeyDef(name, type, idx);
-                column_metadata.put(newDef.name, newDef);
+                ColumnDefinition newDef = ColumnDefinition.clusteringKeyDef(this, name, type, idx);
+                addOrReplaceColumnDefinition(newDef);
                 ckCols.set(i, newDef);
             }
         }
@@ -1963,8 +1975,8 @@ public final class CFMetaData
             if (compactValueDef != null)
                 return compactValueDef;
 
-            ColumnDefinition newDef = ColumnDefinition.compactValueDef(ByteBufferUtil.bytes(DEFAULT_VALUE_ALIAS), defaultValidator);
-            column_metadata.put(newDef.name, newDef);
+            ColumnDefinition newDef = ColumnDefinition.compactValueDef(this, ByteBufferUtil.bytes(DEFAULT_VALUE_ALIAS), defaultValidator);
+            addOrReplaceColumnDefinition(newDef);
             return newDef;
         }
         else
@@ -1974,13 +1986,24 @@ public final class CFMetaData
         }
     }
 
-    private boolean hasCollection()
+    public boolean hasCollections()
     {
-        if (isSuper() || !(comparator instanceof CompositeType))
-            return false;
+        return getCollectionType() != null;
+    }
+
+    public boolean hasCompositeComparator()
+    {
+        return comparator instanceof CompositeType;
+    }
+
+    public ColumnToCollectionType getCollectionType()
+    {
+        if (isSuper() || !hasCompositeComparator())
+            return null;
 
-        List<AbstractType<?>> types = ((CompositeType)comparator).types;
-        return types.get(types.size() - 1) instanceof ColumnToCollectionType;
+        CompositeType composite = (CompositeType)comparator;
+        AbstractType<?> last = composite.types.get(composite.types.size() - 1);
+        return last instanceof ColumnToCollectionType ? (ColumnToCollectionType)last : null;
     }
 
     /*
@@ -1988,7 +2011,7 @@ public final class CFMetaData
      * component is used to store a regular column names. In other words, non-composite static "thrift"
      * and CQL3 CF are *not* dense.
      * Note that his method is only used by rebuildCQL3Metadata. Once said metadata are built, finding
-     * if a CF is dense amounts more simply to check if clusteringKeyColumns.size() == comparator.componentsCount().
+     * if a CF is dense amounts more simply to check if clusteringColumns.size() == comparator.componentsCount().
      */
     private static boolean isDense(AbstractType<?> comparator, Collection<ColumnDefinition> defs)
     {
@@ -1996,9 +2019,9 @@ public final class CFMetaData
          * As said above, this method is only here because we need to deal with thrift upgrades.
          * Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
          * then checking for isDense amounts to looking whether the maximum componentIndex for the
-         * CLUSTERING_KEY ColumnDefinitions is equal to comparator.componentsCount() - 1 or not.
+         * CLUSTERING_COLUMN ColumnDefinitions is equal to comparator.componentsCount() - 1 or not.
          *
-         * But non-upgraded thrift CF will have no such CLUSTERING_KEY column definitions, so we need
+         * But non-upgraded thrift CF will have no such CLUSTERING_COLUMN column definitions, so we need
          * to infer that information without relying on them in that case. And for the most part this is
          * easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
          * having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
@@ -2006,7 +2029,7 @@ public final class CFMetaData
          *
          * So we need to recognize those special case CQL3 table with only a primary key. If we have some
          * clustering columns, we're fine as said above. So the only problem is that we cannot decide for
-         * sure if a CF without REGULAR columns nor CLUSTERING_KEY definition is meant to be dense, or if it
+         * sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
          * has been created in CQL3 by say:
          *    CREATE TABLE test (k int PRIMARY KEY)
          * in which case it should not be dense. However, we can limit our margin of error by assuming we are
@@ -2016,10 +2039,10 @@ public final class CFMetaData
         int maxClusteringIdx = -1;
         for (ColumnDefinition def : defs)
         {
-            switch (def.type)
+            switch (def.kind)
             {
-                case CLUSTERING_KEY:
-                    maxClusteringIdx = Math.max(maxClusteringIdx, def.componentIndex == null ? 0 : def.componentIndex);
+                case CLUSTERING_COLUMN:
+                    maxClusteringIdx = Math.max(maxClusteringIdx, def.position());
                     break;
                 case REGULAR:
                     hasRegular = true;
@@ -2033,6 +2056,12 @@ public final class CFMetaData
 
     }
 
+    // See above.
+    public boolean isDense()
+    {
+        return clusteringColumns.size() == comparator.componentsCount();
+    }
+
     private static boolean isCQL3OnlyPKComparator(AbstractType<?> comparator)
     {
         if (!(comparator instanceof CompositeType))
@@ -2060,11 +2089,11 @@ public final class CFMetaData
         if (isSuper())
             return true;
 
-        for (ColumnDefinition def : column_metadata.values())
+        for (ColumnDefinition def : allColumns())
         {
             // Non-REGULAR ColumnDefinition are not "thrift compatible" per-se, but it's ok because they hold metadata
             // this is only of use to CQL3, so we will just skip them in toThrift.
-            if (def.type == ColumnDefinition.Type.REGULAR && !def.isThriftCompatible())
+            if (def.kind == ColumnDefinition.Kind.REGULAR && !def.isThriftCompatible())
                 return false;
         }
         return true;
@@ -2094,7 +2123,7 @@ public final class CFMetaData
             .append("keyValidator", keyValidator)
             .append("minCompactionThreshold", minCompactionThreshold)
             .append("maxCompactionThreshold", maxCompactionThreshold)
-            .append("column_metadata", column_metadata)
+            .append("columnMetadata", columnMetadata)
             .append("compactionStrategyClass", compactionStrategyClass)
             .append("compactionStrategyOptions", compactionStrategyOptions)
             .append("compressionOptions", compressionParameters.asThriftOptions())
@@ -2109,4 +2138,77 @@ public final class CFMetaData
             .append("triggers", triggers)
             .toString();
     }
+
+    private static class NonCompositeBuilder implements ColumnNameBuilder
+    {
+        private final AbstractType<?> type;
+        private ByteBuffer columnName;
+
+        private NonCompositeBuilder(AbstractType<?> type)
+        {
+            this.type = type;
+        }
+
+        public NonCompositeBuilder add(ByteBuffer bb)
+        {
+            if (columnName != null)
+                throw new IllegalStateException("Column name is already constructed");
+
+            columnName = bb;
+            return this;
+        }
+
+        public NonCompositeBuilder add(ColumnIdentifier name)
+        {
+            return add(name.bytes);
+        }
+
+        public NonCompositeBuilder add(ByteBuffer bb, Relation.Type op)
+        {
+            return add(bb);
+        }
+
+        public int componentCount()
+        {
+            return columnName == null ? 0 : 1;
+        }
+
+        public int remainingCount()
+        {
+            return columnName == null ? 1 : 0;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            if (i < 0 || i >= (columnName == null ? 0 : 1))
+                throw new IllegalArgumentException();
+
+            return columnName;
+        }
+
+        public ByteBuffer build()
+        {
+            return columnName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : columnName;
+        }
+
+        public ByteBuffer buildAsEndOfRange()
+        {
+            return build();
+        }
+
+        public NonCompositeBuilder copy()
+        {
+            NonCompositeBuilder newBuilder = new NonCompositeBuilder(type);
+            newBuilder.columnName = columnName;
+            return newBuilder;
+        }
+
+        public ByteBuffer getComponent(int i)
+        {
+            if (i != 0 || columnName == null)
+                throw new IllegalArgumentException();
+
+            return columnName;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index fee8b1b..05a10bc 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -24,11 +24,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.Maps;
 
-import org.apache.cassandra.cql3.ColumnNameBuilder;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -36,16 +35,16 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.utils.FBUtilities.json;
 
-public class ColumnDefinition
+public class ColumnDefinition extends ColumnSpecification
 {
     // system.schema_columns column names
     private static final String COLUMN_NAME = "column_name";
-    private static final String VALIDATOR = "validator";
+    private static final String TYPE = "validator";
     private static final String INDEX_TYPE = "index_type";
     private static final String INDEX_OPTIONS = "index_options";
     private static final String INDEX_NAME = "index_name";
     private static final String COMPONENT_INDEX = "component_index";
-    private static final String TYPE = "type";
+    private static final String KIND = "type";
 
     /*
      * The type of CQL3 column this definition represents.
@@ -58,79 +57,118 @@ public class ColumnDefinition
      * Note that thrift/CQL2 only know about definitions of type REGULAR (and
      * the ones whose componentIndex == null).
      */
-    public enum Type
+    public enum Kind
     {
         PARTITION_KEY,
-        CLUSTERING_KEY,
+        CLUSTERING_COLUMN,
         REGULAR,
-        COMPACT_VALUE
+        COMPACT_VALUE;
+
+        public String serialize()
+        {
+            // For backward compatibility we need to special case CLUSTERING_COLUMN
+            return this == CLUSTERING_COLUMN ? "clustering_key" : this.toString().toLowerCase();
+        }
+
+        public static Kind deserialize(String value)
+        {
+            if (value.equalsIgnoreCase("clustering_key"))
+                return CLUSTERING_COLUMN;
+            return Enum.valueOf(Kind.class, value.toUpperCase());
+        }
     }
 
-    public final ByteBuffer name;
-    private AbstractType<?> validator;
+    public final Kind kind;
+
+    private String indexName;
     private IndexType indexType;
     private Map<String,String> indexOptions;
-    private String indexName;
-    public final Type type;
 
     /*
      * If the column comparator is a composite type, indicates to which
      * component this definition refers to. If null, the definition refers to
      * the full column name.
      */
-    public final Integer componentIndex;
+    private final Integer componentIndex;
 
-    public static ColumnDefinition partitionKeyDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    public static ColumnDefinition partitionKeyDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(name, validator, componentIndex, Type.PARTITION_KEY);
+        return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.PARTITION_KEY);
     }
 
-    public static ColumnDefinition clusteringKeyDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    public static ColumnDefinition clusteringKeyDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(name, validator, componentIndex, Type.CLUSTERING_KEY);
+        return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.CLUSTERING_COLUMN);
     }
 
-    public static ColumnDefinition regularDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    public static ColumnDefinition regularDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(name, validator, componentIndex, Type.REGULAR);
+        return new ColumnDefinition(cfm, name, validator, componentIndex, Kind.REGULAR);
     }
 
-    public static ColumnDefinition compactValueDef(ByteBuffer name, AbstractType<?> validator)
+    public static ColumnDefinition compactValueDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator)
     {
-        return new ColumnDefinition(name, validator, null, Type.COMPACT_VALUE);
+        return new ColumnDefinition(cfm, name, validator, null, Kind.COMPACT_VALUE);
     }
 
-    public ColumnDefinition(ByteBuffer name, AbstractType<?> validator, Integer componentIndex, Type type)
+    public ColumnDefinition(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex, Kind kind)
     {
-        this(name, validator, null, null, null, componentIndex, type);
+        this(cfm.ksName,
+             cfm.cfName,
+             new ColumnIdentifier(name, cfm.getComponentComparator(componentIndex, kind)),
+             validator,
+             null,
+             null,
+             null,
+             componentIndex,
+             kind);
     }
 
     @VisibleForTesting
-    public ColumnDefinition(ByteBuffer name,
+    public ColumnDefinition(String ksName,
+                            String cfName,
+                            ColumnIdentifier name,
                             AbstractType<?> validator,
                             IndexType indexType,
                             Map<String, String> indexOptions,
                             String indexName,
                             Integer componentIndex,
-                            Type type)
+                            Kind kind)
     {
+        super(ksName, cfName, name, validator);
         assert name != null && validator != null;
-        this.name = name;
+        this.kind = kind;
         this.indexName = indexName;
-        this.validator = validator;
         this.componentIndex = componentIndex;
         this.setIndexType(indexType, indexOptions);
-        this.type = type;
     }
 
     public ColumnDefinition copy()
     {
-        return new ColumnDefinition(name, validator, indexType, indexOptions, indexName, componentIndex, type);
+        return new ColumnDefinition(ksName, cfName, name, type, indexType, indexOptions, indexName, componentIndex, kind);
     }
 
-    public ColumnDefinition cloneWithNewName(ByteBuffer newName)
+    public ColumnDefinition withNewName(ColumnIdentifier newName)
     {
-        return new ColumnDefinition(newName, validator, indexType, indexOptions, indexName, componentIndex, type);
+        return new ColumnDefinition(ksName, cfName, newName, type, indexType, indexOptions, indexName, componentIndex, kind);
+    }
+
+    public ColumnDefinition withNewType(AbstractType<?> newType)
+    {
+        return new ColumnDefinition(ksName, cfName, name, newType, indexType, indexOptions, indexName, componentIndex, kind);
+    }
+
+    public boolean isOnAllComponents()
+    {
+        return componentIndex == null;
+    }
+
+    // The componentIndex. This never return null however for convenience sake:
+    // if componentIndex == null, this return 0. So caller should first check
+    // isOnAllComponents() to distinguish if that's a possibility.
+    public int position()
+    {
+        return componentIndex == null ? 0 : componentIndex;
     }
 
     @Override
@@ -144,8 +182,11 @@ public class ColumnDefinition
 
         ColumnDefinition cd = (ColumnDefinition) o;
 
-        return Objects.equal(name, cd.name)
-            && Objects.equal(validator, cd.validator)
+        return Objects.equal(ksName, cd.ksName)
+            && Objects.equal(cfName, cd.cfName)
+            && Objects.equal(name, cd.name)
+            && Objects.equal(type, cd.type)
+            && Objects.equal(kind, cd.kind)
             && Objects.equal(componentIndex, cd.componentIndex)
             && Objects.equal(indexName, cd.indexName)
             && Objects.equal(indexType, cd.indexType)
@@ -155,16 +196,16 @@ public class ColumnDefinition
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, validator, componentIndex, indexName, indexType, indexOptions);
+        return Objects.hashCode(ksName, cfName, name, type, kind, componentIndex, indexName, indexType, indexOptions);
     }
 
     @Override
     public String toString()
     {
         return Objects.toStringHelper(this)
-                      .add("name", ByteBufferUtil.bytesToHex(name))
-                      .add("validator", validator)
+                      .add("name", name)
                       .add("type", type)
+                      .add("kind", kind)
                       .add("componentIndex", componentIndex)
                       .add("indexName", indexName)
                       .add("indexType", indexType)
@@ -173,14 +214,14 @@ public class ColumnDefinition
 
     public boolean isThriftCompatible()
     {
-        return type == ColumnDefinition.Type.REGULAR && componentIndex == null;
+        return kind == ColumnDefinition.Kind.REGULAR && componentIndex == null;
     }
 
     public static List<ColumnDef> toThrift(Map<ByteBuffer, ColumnDefinition> columns)
     {
         List<ColumnDef> thriftDefs = new ArrayList<>(columns.size());
         for (ColumnDefinition def : columns.values())
-            if (def.type == ColumnDefinition.Type.REGULAR)
+            if (def.kind == ColumnDefinition.Kind.REGULAR)
                 thriftDefs.add(def.toThrift());
         return thriftDefs;
     }
@@ -189,8 +230,8 @@ public class ColumnDefinition
     {
         ColumnDef cd = new ColumnDef();
 
-        cd.setName(ByteBufferUtil.clone(name));
-        cd.setValidation_class(validator.toString());
+        cd.setName(ByteBufferUtil.clone(name.bytes));
+        cd.setValidation_class(type.toString());
         cd.setIndex_type(indexType == null ? null : org.apache.cassandra.thrift.IndexType.valueOf(indexType.name()));
         cd.setIndex_name(indexName == null ? null : indexName);
         cd.setIndex_options(indexOptions == null ? null : Maps.newHashMap(indexOptions));
@@ -198,26 +239,43 @@ public class ColumnDefinition
         return cd;
     }
 
-    public static ColumnDefinition fromThrift(ColumnDef thriftColumnDef, boolean isSuper) throws SyntaxException, ConfigurationException
+    public static ColumnDefinition fromThrift(CFMetaData cfm, ColumnDef thriftColumnDef) throws SyntaxException, ConfigurationException
     {
         // For super columns, the componentIndex is 1 because the ColumnDefinition applies to the column component.
-        return new ColumnDefinition(ByteBufferUtil.clone(thriftColumnDef.name),
-                                    TypeParser.parse(thriftColumnDef.validation_class),
-                                    thriftColumnDef.index_type == null ? null : IndexType.valueOf(thriftColumnDef.index_type.name()),
-                                    thriftColumnDef.index_options,
-                                    thriftColumnDef.index_name,
-                                    isSuper ? 1 : null,
-                                    Type.REGULAR);
+        Integer componentIndex = cfm.isSuper() ? 1 : null;
+        AbstractType<?> comparator = cfm.getComponentComparator(componentIndex, Kind.REGULAR);
+        try
+        {
+            comparator.validate(thriftColumnDef.name);
+        }
+        catch (MarshalException e)
+        {
+            throw new ConfigurationException(String.format("Column name %s is not valid for comparator %s", ByteBufferUtil.bytesToHex(thriftColumnDef.name), comparator));
+        }
+
+        ColumnDefinition cd = new ColumnDefinition(cfm,
+                                                   ByteBufferUtil.clone(thriftColumnDef.name),
+                                                   TypeParser.parse(thriftColumnDef.validation_class),
+                                                   componentIndex,
+                                                   Kind.REGULAR);
+
+        cd.setIndex(thriftColumnDef.index_name,
+                    thriftColumnDef.index_type == null ? null : IndexType.valueOf(thriftColumnDef.index_type.name()),
+                    thriftColumnDef.index_options);
+        return cd;
     }
 
-    public static Map<ByteBuffer, ColumnDefinition> fromThrift(List<ColumnDef> thriftDefs, boolean isSuper) throws SyntaxException, ConfigurationException
+    public static Map<ByteBuffer, ColumnDefinition> fromThrift(CFMetaData cfm, List<ColumnDef> thriftDefs) throws SyntaxException, ConfigurationException
     {
         if (thriftDefs == null)
             return new HashMap<>();
 
         Map<ByteBuffer, ColumnDefinition> cds = new TreeMap<>();
         for (ColumnDef thriftColumnDef : thriftDefs)
-            cds.put(ByteBufferUtil.clone(thriftColumnDef.name), fromThrift(thriftColumnDef, isSuper));
+        {
+            ColumnDefinition def = fromThrift(cfm, thriftColumnDef);
+            cds.put(def.name.bytes, def);
+        }
 
         return cds;
     }
@@ -229,55 +287,61 @@ public class ColumnDefinition
      * @param cfName     The name of the parent ColumnFamily
      * @param timestamp  The timestamp to use for column modification
      */
-    public void deleteFromSchema(RowMutation rm, String cfName, AbstractType<?> comparator, long timestamp)
+    public void deleteFromSchema(RowMutation rm, long timestamp)
     {
         ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaColumnsCf.getCfDef().getColumnNameBuilder();
-        // Note: the following is necessary for backward compatibility. For CQL3, comparator will be UTF8 and nameBytes == name
-        ByteBuffer nameBytes = ByteBufferUtil.bytes(comparator.getString(name));
+        ColumnNameBuilder builder = CFMetaData.SchemaColumnsCf.getColumnNameBuilder();
+        // Note: the following is necessary for backward compatibility. For CQL3, BBU.bytes(name.toString()) == name
+        ByteBuffer nameBytes = ByteBufferUtil.bytes(name.toString());
         builder.add(ByteBufferUtil.bytes(cfName)).add(nameBytes);
         cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
     }
 
-    public void toSchema(RowMutation rm, String cfName, AbstractType<?> comparator, long timestamp)
+    public void toSchema(RowMutation rm, long timestamp)
     {
         ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        cf.addColumn(Column.create("", timestamp, cfName, comparator.getString(name), ""));
-        cf.addColumn(Column.create(validator.toString(), timestamp, cfName, comparator.getString(name), VALIDATOR));
-        cf.addColumn(indexType == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), INDEX_TYPE)
-                                       : Column.create(indexType.toString(), timestamp, cfName, comparator.getString(name), INDEX_TYPE));
-        cf.addColumn(indexOptions == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), INDEX_OPTIONS)
-                                          : Column.create(json(indexOptions), timestamp, cfName, comparator.getString(name), INDEX_OPTIONS));
-        cf.addColumn(indexName == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), INDEX_NAME)
-                                       : Column.create(indexName, timestamp, cfName, comparator.getString(name), INDEX_NAME));
-        cf.addColumn(componentIndex == null ? DeletedColumn.create(ldt, timestamp, cfName, comparator.getString(name), COMPONENT_INDEX)
-                                            : Column.create(componentIndex, timestamp, cfName, comparator.getString(name), COMPONENT_INDEX));
-        cf.addColumn(Column.create(type.toString().toLowerCase(), timestamp, cfName, comparator.getString(name), TYPE));
+        cf.addColumn(Column.create("", timestamp, cfName, name.toString(), ""));
+        cf.addColumn(Column.create(type.toString(), timestamp, cfName, name.toString(), TYPE));
+        cf.addColumn(indexType == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_TYPE)
+                                       : Column.create(indexType.toString(), timestamp, cfName, name.toString(), INDEX_TYPE));
+        cf.addColumn(indexOptions == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_OPTIONS)
+                                          : Column.create(json(indexOptions), timestamp, cfName, name.toString(), INDEX_OPTIONS));
+        cf.addColumn(indexName == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_NAME)
+                                       : Column.create(indexName, timestamp, cfName, name.toString(), INDEX_NAME));
+        cf.addColumn(componentIndex == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), COMPONENT_INDEX)
+                                            : Column.create(componentIndex, timestamp, cfName, name.toString(), COMPONENT_INDEX));
+        cf.addColumn(Column.create(kind.serialize(), timestamp, cfName, name.toString(), KIND));
     }
 
-    public void apply(ColumnDefinition def, AbstractType<?> comparator)  throws ConfigurationException
+    public ColumnDefinition apply(ColumnDefinition def)  throws ConfigurationException
     {
-        assert type == def.type && Objects.equal(componentIndex, def.componentIndex);
+        assert kind == def.kind && Objects.equal(componentIndex, def.componentIndex);
 
         if (getIndexType() != null && def.getIndexType() != null)
         {
             // If an index is set (and not drop by this update), the validator shouldn't be change to a non-compatible one
             // (and we want true comparator compatibility, not just value one, since the validator is used by LocalPartitioner to order index rows)
-            if (!def.getValidator().isCompatibleWith(getValidator()))
-                throw new ConfigurationException(String.format("Cannot modify validator to a non-order-compatible one for column %s since an index is set", comparator.getString(name)));
+            if (!def.type.isCompatibleWith(type))
+                throw new ConfigurationException(String.format("Cannot modify validator to a non-order-compatible one for column %s since an index is set", name));
 
             assert getIndexName() != null;
             if (!getIndexName().equals(def.getIndexName()))
                 throw new ConfigurationException("Cannot modify index name");
         }
 
-        setValidator(def.getValidator());
-        setIndexType(def.getIndexType(), def.getIndexOptions());
-        setIndexName(def.getIndexName());
+        return new ColumnDefinition(ksName,
+                                    cfName,
+                                    name,
+                                    def.type,
+                                    def.getIndexType(),
+                                    def.getIndexOptions(),
+                                    def.getIndexName(),
+                                    componentIndex,
+                                    kind);
     }
 
     /**
@@ -293,22 +357,25 @@ public class ColumnDefinition
         String query = String.format("SELECT * FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.SCHEMA_COLUMNS_CF);
         for (UntypedResultSet.Row row : QueryProcessor.resultify(query, serializedColumns))
         {
-            Type type = row.has(TYPE)
-                      ? Enum.valueOf(Type.class, row.getString(TYPE).toUpperCase())
-                      : Type.REGULAR;
+            Kind kind = row.has(KIND)
+                      ? Kind.deserialize(row.getString(KIND))
+                      : Kind.REGULAR;
 
             Integer componentIndex = null;
             if (row.has(COMPONENT_INDEX))
                 componentIndex = row.getInt(COMPONENT_INDEX);
-            else if (type == Type.CLUSTERING_KEY && cfm.isSuper())
+            else if (kind == Kind.CLUSTERING_COLUMN && cfm.isSuper())
                 componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
 
-            ByteBuffer name = cfm.getComponentComparator(componentIndex, type).fromString(row.getString(COLUMN_NAME));
+            // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
+            // we need to use the comparator fromString method
+            AbstractType<?> comparator = cfm.getComponentComparator(componentIndex, kind);
+            ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString(COLUMN_NAME)), comparator);
 
             AbstractType<?> validator;
             try
             {
-                validator = TypeParser.parse(row.getString(VALIDATOR));
+                validator = TypeParser.parse(row.getString(TYPE));
             }
             catch (RequestValidationException e)
             {
@@ -327,7 +394,7 @@ public class ColumnDefinition
             if (row.has(INDEX_NAME))
                 indexName = row.getString(INDEX_NAME);
 
-            cds.add(new ColumnDefinition(name, validator, indexType, indexOptions, indexName, componentIndex, type));
+            cds.add(new ColumnDefinition(cfm.ksName, cfm.cfName, name, validator, indexType, indexOptions, indexName, componentIndex, kind));
         }
 
         return cds;
@@ -370,14 +437,4 @@ public class ColumnDefinition
     {
         return indexOptions;
     }
-
-    public AbstractType<?> getValidator()
-    {
-        return validator;
-    }
-
-    public void setValidator(AbstractType<?> validator)
-    {
-        this.validator = validator;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index d822704..112dc87 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -216,37 +216,6 @@ public class Schema
     }
 
     /**
-     * Get column comparator for ColumnFamily but it's keyspace/name
-     *
-     * @param ksName The keyspace name
-     * @param cfName The ColumnFamily name
-     *
-     * @return The comparator of the ColumnFamily
-     */
-    public AbstractType<?> getComparator(String ksName, String cfName)
-    {
-        assert ksName != null;
-        CFMetaData cfmd = getCFMetaData(ksName, cfName);
-        if (cfmd == null)
-            throw new IllegalArgumentException("Unknown ColumnFamily " + cfName + " in keyspace " + ksName);
-        return cfmd.comparator;
-    }
-
-    /**
-     * Get value validator for specific column
-     *
-     * @param ksName The keyspace name
-     * @param cfName The ColumnFamily name
-     * @param column The name of the column
-     *
-     * @return value validator specific to the column or default (per-cf) one
-     */
-    public AbstractType<?> getValueValidator(String ksName, String cfName, ByteBuffer column)
-    {
-        return getCFMetaData(ksName, cfName).getValueValidator(column);
-    }
-
-    /**
      * Get metadata about keyspace by its name
      *
      * @param keyspaceName The name of the keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/config/TriggerDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/TriggerDefinition.java b/src/java/org/apache/cassandra/config/TriggerDefinition.java
index 69e06b1..e08f97c 100644
--- a/src/java/org/apache/cassandra/config/TriggerDefinition.java
+++ b/src/java/org/apache/cassandra/config/TriggerDefinition.java
@@ -84,7 +84,7 @@ public class TriggerDefinition
     {
         ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getCfDef().getColumnNameBuilder();
+        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getColumnNameBuilder();
         builder.add(bytes(cfName)).add(bytes(name));
 
         cf.addColumn(builder.copy().add(bytes("")).build(), bytes(""), timestamp); // the row marker
@@ -103,7 +103,7 @@ public class TriggerDefinition
         ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getCfDef().getColumnNameBuilder();
+        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getColumnNameBuilder();
         builder.add(bytes(cfName)).add(bytes(name));
         cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 48e64c8..74c6593 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -74,7 +74,7 @@ public class AlterTableStatement
         switch (oType)
         {
             case ADD:
-                cfm.addColumnDefinition(ColumnDefinition.regularDef(columnName, TypeParser.parse(validator), null));
+                cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, columnName, TypeParser.parse(validator), null));
                 break;
 
             case ALTER:
@@ -102,7 +102,7 @@ public class AlterTableStatement
                                     this.columnName,
                                     columnFamily));
 
-                    toUpdate.setValidator(TypeParser.parse(validator));
+                    cfm.addOrReplaceColumnDefinition(toUpdate.withNewType(TypeParser.parse(validator)));
                 }
                 break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f5905d5/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index dd56387..a140d02 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TypeParser;
@@ -122,7 +123,7 @@ public class CreateColumnFamilyStatement
     }
 
     // Column definitions
-    private Map<ByteBuffer, ColumnDefinition> getColumns(AbstractType<?> comparator) throws InvalidRequestException
+    private Map<ByteBuffer, ColumnDefinition> getColumns(CFMetaData cfm) throws InvalidRequestException
     {
         Map<ByteBuffer, ColumnDefinition> columnDefs = new HashMap<ByteBuffer, ColumnDefinition>();
 
@@ -130,12 +131,12 @@ public class CreateColumnFamilyStatement
         {
             try
             {
-                ByteBuffer columnName = comparator.fromStringCQL2(col.getKey().getText());
+                ByteBuffer columnName = cfm.comparator.fromStringCQL2(col.getKey().getText());
                 String validatorClassName = CFPropDefs.comparators.containsKey(col.getValue())
                                           ? CFPropDefs.comparators.get(col.getValue())
                                           : col.getValue();
                 AbstractType<?> validator = TypeParser.parse(validatorClassName);
-                columnDefs.put(columnName, ColumnDefinition.regularDef(columnName, validator, null));
+                columnDefs.put(columnName, ColumnDefinition.regularDef(cfm, columnName, validator, null));
             }
             catch (ConfigurationException e)
             {
@@ -192,7 +193,7 @@ public class CreateColumnFamilyStatement
                    .defaultValidator(cfProps.getValidator())
                    .minCompactionThreshold(minCompactionThreshold)
                    .maxCompactionThreshold(maxCompactionThreshold)
-                   .columnMetadata(getColumns(comparator))
+                   .columnMetadata(getColumns(newCFMD))
                    .keyValidator(TypeParser.parse(CFPropDefs.comparators.get(getKeyType())))
                    .compactionStrategyClass(cfProps.compactionStrategyClass)
                    .compactionStrategyOptions(cfProps.compactionStrategyOptions)
@@ -206,7 +207,7 @@ public class CreateColumnFamilyStatement
 
             // CQL2 can have null keyAliases
             if (keyAlias != null)
-                newCFMD.addColumnDefinition(ColumnDefinition.partitionKeyDef(keyAlias, newCFMD.getKeyValidator(), null));
+                newCFMD.addColumnDefinition(ColumnDefinition.partitionKeyDef(newCFMD, keyAlias, newCFMD.getKeyValidator(), null));
         }
         catch (ConfigurationException e)
         {