You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:49 UTC

[13/13] git commit: Push composites support in the storage engine

Push composites support in the storage engine

patch by slebresne; reviewed by benedict for CASSANDRA-5417


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/362cc053
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/362cc053
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/362cc053

Branch: refs/heads/trunk
Commit: 362cc05352ec67e707e0ac790732e96a15e63f6b
Parents: f943433
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Oct 29 11:03:52 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Dec 17 17:03:21 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/auth/Auth.java    |   2 +-
 .../cassandra/auth/CassandraAuthorizer.java     |   2 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   2 +-
 .../apache/cassandra/cache/AutoSavingCache.java |   5 +-
 .../org/apache/cassandra/config/CFMetaData.java | 510 ++++++++-----------
 .../cassandra/config/ColumnDefinition.java      |  77 +--
 .../org/apache/cassandra/config/KSMetaData.java |  18 +-
 .../cassandra/config/TriggerDefinition.java     |  17 +-
 .../org/apache/cassandra/config/UTMetaData.java |  13 +-
 .../cassandra/cql/AlterTableStatement.java      |   2 +-
 .../cql/CreateColumnFamilyStatement.java        |  17 +-
 .../apache/cassandra/cql/DeleteStatement.java   |   4 +-
 .../apache/cassandra/cql/QueryProcessor.java    |  84 +--
 .../apache/cassandra/cql/SelectStatement.java   |   3 -
 .../apache/cassandra/cql/UpdateStatement.java   |   9 +-
 src/java/org/apache/cassandra/cql3/CQL3Row.java |  36 ++
 .../apache/cassandra/cql3/ColumnIdentifier.java |  20 +-
 .../cassandra/cql3/ColumnNameBuilder.java       |  90 ----
 .../org/apache/cassandra/cql3/Constants.java    |  44 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |  65 ++-
 src/java/org/apache/cassandra/cql3/Maps.java    |  35 +-
 .../org/apache/cassandra/cql3/Operation.java    |  57 ++-
 .../apache/cassandra/cql3/QueryProcessor.java   |  28 +-
 src/java/org/apache/cassandra/cql3/Sets.java    |  31 +-
 .../apache/cassandra/cql3/UntypedResultSet.java |  97 +++-
 .../apache/cassandra/cql3/UpdateParameters.java |  34 +-
 .../cassandra/cql3/functions/TimeuuidFcts.java  |   1 +
 .../cassandra/cql3/functions/TokenFct.java      |   6 +-
 .../cql3/statements/AlterTableStatement.java    |  39 +-
 .../cql3/statements/AlterTypeStatement.java     |   3 +-
 .../cql3/statements/ColumnGroupMap.java         | 171 -------
 .../cql3/statements/CreateIndexStatement.java   |   8 +-
 .../cql3/statements/CreateTableStatement.java   |  57 +--
 .../cql3/statements/DeleteStatement.java        |  50 +-
 .../cql3/statements/ModificationStatement.java  |  70 ++-
 .../cql3/statements/SelectStatement.java        | 471 ++++++++---------
 .../cassandra/cql3/statements/Selection.java    |  14 -
 .../cql3/statements/UpdateStatement.java        |  27 +-
 .../cassandra/db/ArrayBackedSortedColumns.java  |  31 +-
 .../apache/cassandra/db/AtomDeserializer.java   | 110 ++++
 .../cassandra/db/AtomicSortedColumns.java       |  26 +-
 .../apache/cassandra/db/BatchlogManager.java    |  12 +-
 .../org/apache/cassandra/db/CFRowAdder.java     |  81 +++
 .../cassandra/db/CollationController.java       |   8 +-
 src/java/org/apache/cassandra/db/Column.java    | 105 +---
 .../org/apache/cassandra/db/ColumnFamily.java   |  28 +-
 .../cassandra/db/ColumnFamilySerializer.java    |  13 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  50 +-
 .../org/apache/cassandra/db/ColumnIndex.java    |  10 +-
 .../apache/cassandra/db/ColumnSerializer.java   |  37 +-
 .../org/apache/cassandra/db/CounterColumn.java  |  27 +-
 .../apache/cassandra/db/CounterMutation.java    |   4 +-
 .../cassandra/db/CounterUpdateColumn.java       |   9 +-
 src/java/org/apache/cassandra/db/DataRange.java |  23 +-
 .../org/apache/cassandra/db/DefsTables.java     |  13 -
 .../org/apache/cassandra/db/DeletedColumn.java  |  18 +-
 .../org/apache/cassandra/db/DeletionInfo.java   |  53 +-
 .../org/apache/cassandra/db/DeletionTime.java   |  10 +-
 .../org/apache/cassandra/db/EmptyColumns.java   |   6 +-
 .../org/apache/cassandra/db/ExpiringColumn.java |  27 +-
 .../cassandra/db/HintedHandOffManager.java      |  27 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   3 +-
 .../org/apache/cassandra/db/OnDiskAtom.java     |  42 +-
 .../apache/cassandra/db/PagedRangeCommand.java  |  42 +-
 .../apache/cassandra/db/RangeSliceCommand.java  |  10 +-
 .../org/apache/cassandra/db/RangeTombstone.java |  77 +--
 .../apache/cassandra/db/RangeTombstoneList.java |  74 ++-
 .../org/apache/cassandra/db/RowIndexEntry.java  |  40 +-
 .../org/apache/cassandra/db/RowMutation.java    |  14 +-
 .../cassandra/db/SliceByNamesReadCommand.java   |  10 +-
 .../cassandra/db/SliceFromReadCommand.java      |  12 +-
 .../org/apache/cassandra/db/SuperColumns.java   | 228 +++++----
 .../org/apache/cassandra/db/SystemKeyspace.java |  23 +-
 .../db/TreeMapBackedSortedColumns.java          |  22 +-
 .../apache/cassandra/db/UnsortedColumns.java    |  10 +-
 .../db/columniterator/IdentityQueryFilter.java  |   4 +-
 .../db/columniterator/IndexedSliceReader.java   | 135 +++--
 .../db/columniterator/SSTableNamesIterator.java |  46 +-
 .../db/columniterator/SSTableSliceIterator.java |   2 +-
 .../db/columniterator/SimpleSliceReader.java    |  12 +-
 .../db/compaction/LazilyCompactedRow.java       |   2 +-
 .../cassandra/db/compaction/Scrubber.java       |   4 +-
 .../cassandra/db/composites/AbstractCType.java  | 336 ++++++++++++
 .../db/composites/AbstractCellNameType.java     | 356 +++++++++++++
 .../db/composites/AbstractComposite.java        | 132 +++++
 .../AbstractCompoundCellNameType.java           | 264 ++++++++++
 .../composites/AbstractSimpleCellNameType.java  | 141 +++++
 .../db/composites/BoundedComposite.java         |  95 ++++
 .../cassandra/db/composites/CBuilder.java       |  34 ++
 .../apache/cassandra/db/composites/CType.java   | 142 ++++++
 .../cassandra/db/composites/CellName.java       |  74 +++
 .../cassandra/db/composites/CellNameType.java   | 202 ++++++++
 .../cassandra/db/composites/CellNames.java      |  91 ++++
 .../cassandra/db/composites/Composite.java      |  76 +++
 .../cassandra/db/composites/Composites.java     | 110 ++++
 .../cassandra/db/composites/CompoundCType.java  | 158 ++++++
 .../db/composites/CompoundComposite.java        |  70 +++
 .../db/composites/CompoundDenseCellName.java    |  70 +++
 .../composites/CompoundDenseCellNameType.java   |  86 ++++
 .../db/composites/CompoundSparseCellName.java   | 164 ++++++
 .../composites/CompoundSparseCellNameType.java  | 238 +++++++++
 .../cassandra/db/composites/SimpleCType.java    | 133 +++++
 .../db/composites/SimpleComposite.java          |  76 +++
 .../db/composites/SimpleDenseCellName.java      |  73 +++
 .../db/composites/SimpleDenseCellNameType.java  |  78 +++
 .../db/composites/SimpleSparseCellName.java     |  99 ++++
 .../db/composites/SimpleSparseCellNameType.java |  98 ++++
 .../cassandra/db/filter/ColumnCounter.java      |  18 +-
 .../apache/cassandra/db/filter/ColumnSlice.java | 160 ++++--
 .../cassandra/db/filter/ExtendedFilter.java     |  63 +--
 .../cassandra/db/filter/IDiskAtomFilter.java    |  40 +-
 .../cassandra/db/filter/NamesQueryFilter.java   |  71 ++-
 .../apache/cassandra/db/filter/QueryFilter.java |  17 +-
 .../cassandra/db/filter/SliceQueryFilter.java   |  69 ++-
 .../AbstractSimplePerColumnSecondaryIndex.java  |  13 +-
 .../cassandra/db/index/SecondaryIndex.java      |  20 +-
 .../db/index/SecondaryIndexManager.java         |  21 +-
 .../db/index/composites/CompositesIndex.java    |  55 +-
 .../CompositesIndexOnClusteringKey.java         |  45 +-
 .../CompositesIndexOnCollectionKey.java         |  48 +-
 .../CompositesIndexOnCollectionValue.java       |  48 +-
 .../CompositesIndexOnPartitionKey.java          |  33 +-
 .../composites/CompositesIndexOnRegular.java    |  38 +-
 .../db/index/composites/CompositesSearcher.java |  35 +-
 .../cassandra/db/index/keys/KeysIndex.java      |  17 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |  24 +-
 .../db/marshal/AbstractCommutativeType.java     |   3 +-
 .../db/marshal/AbstractCompositeType.java       |  20 +-
 .../cassandra/db/marshal/AbstractType.java      | 108 ----
 .../cassandra/db/marshal/CollectionType.java    |   4 +-
 .../cassandra/db/marshal/CompositeType.java     |  31 +-
 .../cassandra/db/marshal/CounterColumnType.java |   3 +-
 .../apache/cassandra/db/marshal/ListType.java   |   8 +-
 .../apache/cassandra/db/marshal/MapType.java    |  12 +-
 .../apache/cassandra/db/marshal/SetType.java    |   9 +-
 .../hadoop/ColumnFamilyInputFormat.java         |   7 +-
 .../hadoop/ColumnFamilyRecordReader.java        |  61 +--
 .../hadoop/pig/AbstractCassandraStorage.java    |  11 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   5 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   9 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |  25 +-
 .../cassandra/io/sstable/ColumnNameHelper.java  | 234 ++++-----
 .../cassandra/io/sstable/IndexHelper.java       |  75 +--
 .../cassandra/io/sstable/KeyIterator.java       |   2 +-
 .../apache/cassandra/io/sstable/SSTable.java    |   2 +-
 .../cassandra/io/sstable/SSTableReader.java     |  10 +-
 .../cassandra/io/sstable/SSTableScanner.java    |   6 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   2 +-
 .../io/sstable/SSTableSimpleWriter.java         |   3 +-
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +-
 .../io/sstable/metadata/MetadataCollector.java  |   8 +-
 .../cassandra/io/util/MappedFileDataInput.java  |   5 +-
 .../apache/cassandra/service/CacheService.java  |   9 +-
 .../apache/cassandra/service/StorageProxy.java  |  11 +-
 .../service/pager/AbstractQueryPager.java       |   6 +-
 .../service/pager/RangeSliceQueryPager.java     |  11 +-
 .../service/pager/SliceQueryPager.java          |   8 +-
 .../cassandra/thrift/CassandraServer.java       | 103 ++--
 .../cassandra/thrift/ThriftValidation.java      |  70 +--
 .../org/apache/cassandra/tools/BulkLoader.java  |  19 +-
 .../apache/cassandra/tools/SSTableExport.java   |  16 +-
 .../apache/cassandra/tools/SSTableImport.java   |  38 +-
 .../org/apache/cassandra/tracing/Tracing.java   |  29 +-
 .../cassandra/triggers/TriggerExecutor.java     |   3 +-
 .../org/apache/cassandra/utils/FBUtilities.java |   6 +-
 .../org/apache/cassandra/utils/ObjectSizes.java |  13 +
 test/data/serialization/2.0/db.RowMutation.bin  | Bin 3599 -> 0 bytes
 .../apache/cassandra/db/LongKeyspaceTest.java   |   5 +-
 .../apache/cassandra/db/MeteredFlusherTest.java |   6 +-
 .../cassandra/db/commitlog/ComitLogStress.java  |   5 +-
 .../db/compaction/LongCompactionsTest.java      |   2 +-
 .../LongLeveledCompactionStrategyTest.java      |   2 +-
 .../cassandra/AbstractSerializationsTester.java |   1 +
 .../unit/org/apache/cassandra/SchemaLoader.java |  76 +--
 test/unit/org/apache/cassandra/Util.java        | 102 +++-
 .../apache/cassandra/config/CFMetaDataTest.java |  30 +-
 .../cassandra/config/ColumnDefinitionTest.java  |   2 +-
 .../org/apache/cassandra/config/DefsTest.java   |  53 +-
 .../db/ArrayBackedSortedColumnsTest.java        |  42 +-
 .../org/apache/cassandra/db/CleanupTest.java    |   2 +-
 .../cassandra/db/CollationControllerTest.java   |  14 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     | 358 ++++++-------
 .../apache/cassandra/db/ColumnFamilyTest.java   |  31 +-
 .../org/apache/cassandra/db/CommitLogTest.java  |  11 +-
 .../apache/cassandra/db/CounterColumnTest.java  |  89 ++--
 .../cassandra/db/CounterMutationTest.java       |  12 +-
 .../apache/cassandra/db/HintedHandOffTest.java  |   3 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  21 +-
 .../apache/cassandra/db/KeyCollisionTest.java   |   3 +-
 .../org/apache/cassandra/db/KeyspaceTest.java   | 162 +++---
 .../org/apache/cassandra/db/NameSortTest.java   |   4 +-
 .../cassandra/db/RangeTombstoneListTest.java    |  13 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  21 +-
 .../apache/cassandra/db/ReadMessageTest.java    |  34 +-
 .../cassandra/db/RecoveryManagerTest.java       |   5 +-
 .../db/RecoveryManagerTruncateTest.java         |   7 +-
 .../cassandra/db/RemoveColumnFamilyTest.java    |   4 +-
 .../db/RemoveColumnFamilyWithFlush1Test.java    |   6 +-
 .../db/RemoveColumnFamilyWithFlush2Test.java    |   4 +-
 .../apache/cassandra/db/RemoveColumnTest.java   |  22 +-
 .../cassandra/db/RemoveSubColumnTest.java       |   5 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  33 +-
 .../apache/cassandra/db/RowIterationTest.java   |   7 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   7 +-
 .../db/SecondaryIndexColumnSizeTest.java        |  13 +-
 .../apache/cassandra/db/SerializationsTest.java |  62 ++-
 .../org/apache/cassandra/db/TimeSortTest.java   |  38 +-
 .../compaction/BlacklistingCompactionsTest.java |   5 +-
 .../db/compaction/CompactionsPurgeTest.java     |  39 +-
 .../db/compaction/CompactionsTest.java          |  14 +-
 .../LeveledCompactionStrategyTest.java          |   6 +-
 .../db/compaction/OneCompactionTest.java        |   2 +-
 .../SizeTieredCompactionStrategyTest.java       |   6 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |  20 +-
 .../db/index/PerRowSecondaryIndexTest.java      |  18 +-
 .../cassandra/db/marshal/CompositeTypeTest.java |  13 +-
 .../db/marshal/DynamicCompositeTypeTest.java    |  13 +-
 .../CompressedRandomAccessReaderTest.java       |   5 +-
 .../cassandra/io/sstable/IndexHelperTest.java   |  51 +-
 .../io/sstable/IndexSummaryManagerTest.java     |  10 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   7 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   2 +-
 .../io/sstable/SSTableMetadataTest.java         |  32 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  26 +-
 .../io/sstable/SSTableScannerTest.java          |   2 +-
 .../io/sstable/SSTableSimpleWriterTest.java     |   2 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   2 +-
 .../metadata/MetadataSerializerTest.java        |   3 +-
 .../service/AntiEntropyServiceCounterTest.java  |   3 +-
 .../service/AntiEntropyServiceStandardTest.java |   3 +-
 .../cassandra/service/QueryPagerTest.java       |  18 +-
 .../streaming/StreamingTransferTest.java        |  23 +-
 .../compress/CompressedInputStreamTest.java     |   3 +-
 .../cassandra/tools/SSTableExportTest.java      |  42 +-
 .../cassandra/tools/SSTableImportTest.java      |  30 +-
 .../cassandra/utils/IntervalTreeTest.java       |   4 +-
 237 files changed, 7001 insertions(+), 3718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0d0f0d..dcc7e33 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
  * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
  * Secondary index support for collections (CASSANDRA-4511)
  * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
+ * Push composites support in the storage engine (CASSANDRA-5417)
 
 
 2.0.4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 5c5dfe7..36e55bf 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -246,7 +246,7 @@ public class Auth
             ResultMessage.Rows rows = selectUserStatement.execute(QueryState.forInternalCalls(),
                                                                   new QueryOptions(consistencyForUser(username),
                                                                                    Lists.newArrayList(ByteBufferUtil.bytes(username))));
-            return new UntypedResultSet(rows.result);
+            return UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index deecfdb..8f257db 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -76,7 +76,7 @@ public class CassandraAuthorizer implements IAuthorizer
                                                                  new QueryOptions(ConsistencyLevel.ONE,
                                                                                   Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
                                                                                                      ByteBufferUtil.bytes(resource.getName()))));
-            result = new UntypedResultSet(rows.result);
+            result = UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 3d9ba98..cd5bdc3 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -110,7 +110,7 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
             ResultMessage.Rows rows = authenticateStatement.execute(QueryState.forInternalCalls(),
                                                                     new QueryOptions(consistencyForUser(username),
                                                                                      Lists.newArrayList(ByteBufferUtil.bytes(username))));
-            result = new UntypedResultSet(rows.result);
+            result = UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 6554eb3..cbadefc 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.cache;
 
 import java.io.*;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
@@ -30,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
@@ -43,7 +41,6 @@ import org.apache.cassandra.io.util.LengthAvailableInputStream;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
 public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K, V>
@@ -171,7 +168,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             else
                 type = OperationType.UNKNOWN;
 
-            info = new CompactionInfo(new CFMetaData(Keyspace.SYSTEM_KS, cacheType.toString(), ColumnFamilyType.Standard, BytesType.instance, null),
+            info = new CompactionInfo(CFMetaData.denseCFMetaData(Keyspace.SYSTEM_KS, cacheType.toString(), BytesType.instance),
                                       type,
                                       0,
                                       keys.size(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 0a33c20..e56c1dd 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -22,7 +22,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -30,6 +29,7 @@ import com.google.common.base.Objects;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.db.composites.*;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -55,6 +55,7 @@ import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.thrift.CqlRow;
+import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -384,7 +385,7 @@ public final class CFMetaData
     public final String ksName;                       // name of keyspace
     public final String cfName;                       // name of this column family
     public final ColumnFamilyType cfType;             // standard, super
-    public volatile AbstractType<?> comparator;       // bytes, long, timeuuid, utf8, etc.
+    public volatile CellNameType comparator;          // bytes, long, timeuuid, utf8, etc.
 
     //OPTIONAL
     private volatile String comment = "";
@@ -438,7 +439,6 @@ public final class CFMetaData
     public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;}
     public CFMetaData minCompactionThreshold(int prop) {minCompactionThreshold = prop; return this;}
     public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
-    public CFMetaData columnMetadata(Map<ByteBuffer,ColumnDefinition> prop) {columnMetadata = prop; return this;}
     public CFMetaData compactionStrategyClass(Class<? extends AbstractCompactionStrategy> prop) {compactionStrategyClass = prop; return this;}
     public CFMetaData compactionStrategyOptions(Map<String, String> prop) {compactionStrategyOptions = prop; return this;}
     public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
@@ -452,18 +452,13 @@ public final class CFMetaData
     public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
     public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
 
-    public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
-    {
-        this(keyspace, name, type, makeComparator(type, comp, subcc));
-    }
-
-    public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp)
+    public CFMetaData(String keyspace, String name, ColumnFamilyType type, CellNameType comp)
     {
         this(keyspace, name, type, comp, getId(keyspace, name));
     }
 
     @VisibleForTesting
-    CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp,  UUID id)
+    CFMetaData(String keyspace, String name, ColumnFamilyType type, CellNameType comp,  UUID id)
     {
         // (subcc may be null for non-supercolumns)
         // (comp may also be null for custom indexes, which is kind of broken if you ask me)
@@ -475,6 +470,28 @@ public final class CFMetaData
         cfId = id;
     }
 
+    public static CFMetaData denseCFMetaData(String keyspace, String name, AbstractType<?> comp, AbstractType<?> subcc)
+    {
+        CellNameType cellNameType = CellNames.fromAbstractType(makeRawAbstractType(comp, subcc), true);
+        return new CFMetaData(keyspace, name, subcc == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, cellNameType);
+    }
+
+    public static CFMetaData sparseCFMetaData(String keyspace, String name, AbstractType<?> comp)
+    {
+        CellNameType cellNameType = CellNames.fromAbstractType(comp, false);
+        return new CFMetaData(keyspace, name, ColumnFamilyType.Standard, cellNameType);
+    }
+
+    public static CFMetaData denseCFMetaData(String keyspace, String name, AbstractType<?> comp)
+    {
+        return denseCFMetaData(keyspace, name, comp, null);
+    }
+
+    private static AbstractType<?> makeRawAbstractType(AbstractType<?> comparator, AbstractType<?> subComparator)
+    {
+        return subComparator == null ? comparator : CompositeType.getInstance(Arrays.asList(comparator, subComparator));
+    }
+
     public Map<String, TriggerDefinition> getTriggers()
     {
         return triggers;
@@ -491,7 +508,7 @@ public final class CFMetaData
         try
         {
             CreateTableStatement statement = (CreateTableStatement) QueryProcessor.parseStatement(cql).prepare().statement;
-            CFMetaData cfm = newSystemMetadata(keyspace, statement.columnFamily(), "", statement.comparator, null);
+            CFMetaData cfm = newSystemMetadata(keyspace, statement.columnFamily(), "", statement.comparator);
             statement.applyPropertiesTo(cfm);
             return cfm.rebuild();
         }
@@ -501,13 +518,6 @@ public final class CFMetaData
         }
     }
 
-    private static AbstractType<?> makeComparator(ColumnFamilyType cftype, AbstractType<?> comp, AbstractType<?> subcc)
-    {
-        return cftype == ColumnFamilyType.Super
-             ? CompositeType.getInstance(comp, subcc == null ? BytesType.instance : subcc)
-             : comp;
-    }
-
     private static String enforceCommentNotNull (CharSequence comment)
     {
         return (comment == null) ? "" : comment.toString();
@@ -518,11 +528,9 @@ public final class CFMetaData
         return UUID.nameUUIDFromBytes(ArrayUtils.addAll(ksName.getBytes(), cfName.getBytes()));
     }
 
-    private static CFMetaData newSystemMetadata(String keyspace, String cfName, String comment, AbstractType<?> comparator, AbstractType<?> subcc)
+    private static CFMetaData newSystemMetadata(String keyspace, String cfName, String comment, CellNameType comparator)
     {
-        ColumnFamilyType type = subcc == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super;
-        CFMetaData newCFMD = new CFMetaData(keyspace, cfName, type, comparator,  subcc);
-
+        CFMetaData newCFMD = new CFMetaData(keyspace, cfName, ColumnFamilyType.Standard, comparator);
         return newCFMD.comment(comment)
                 .readRepairChance(0)
                 .dcLocalReadRepairChance(0)
@@ -530,7 +538,7 @@ public final class CFMetaData
                 .memtableFlushPeriod(3600 * 1000);
     }
 
-    public static CFMetaData newIndexMetadata(CFMetaData parent, ColumnDefinition info, AbstractType<?> columnComparator)
+    public static CFMetaData newIndexMetadata(CFMetaData parent, ColumnDefinition info, CellNameType indexComparator)
     {
         // Depends on parent's cache setting, turn on its index CF's cache.
         // Row caching is never enabled; see CASSANDRA-5732
@@ -538,7 +546,7 @@ public final class CFMetaData
                              ? Caching.KEYS_ONLY
                              : Caching.NONE;
 
-        return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, columnComparator, (AbstractType)null)
+        return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, indexComparator)
                              .keyValidator(info.type)
                              .readRepairChance(0.0)
                              .dcLocalReadRepairChance(0.0)
@@ -574,14 +582,12 @@ public final class CFMetaData
 
     static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
     {
-        Map<ByteBuffer, ColumnDefinition> clonedColumns = new HashMap<>();
+        List<ColumnDefinition> clonedColumns = new ArrayList<>(oldCFMD.allColumns().size());
         for (ColumnDefinition cd : oldCFMD.allColumns())
-        {
-            ColumnDefinition cloned = cd.copy();
-            clonedColumns.put(cloned.name.bytes, cloned);
-        }
+            clonedColumns.add(cd.copy());
 
-        return newCFMD.comment(oldCFMD.comment)
+        return newCFMD.addAllColumnDefinitions(clonedColumns)
+                      .comment(oldCFMD.comment)
                       .readRepairChance(oldCFMD.readRepairChance)
                       .dcLocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
                       .replicateOnWrite(oldCFMD.replicateOnWrite)
@@ -590,7 +596,6 @@ public final class CFMetaData
                       .keyValidator(oldCFMD.keyValidator)
                       .minCompactionThreshold(oldCFMD.minCompactionThreshold)
                       .maxCompactionThreshold(oldCFMD.maxCompactionThreshold)
-                      .columnMetadata(clonedColumns)
                       .compactionStrategyClass(oldCFMD.compactionStrategyClass)
                       .compactionStrategyOptions(new HashMap<>(oldCFMD.compactionStrategyOptions))
                       .compressionParameters(oldCFMD.compressionParameters.copy())
@@ -763,18 +768,13 @@ public final class CFMetaData
         return compactValueColumn;
     }
 
-    public ColumnNameBuilder getKeyNameBuilder()
+    // TODO: we could use CType for key validation too to make this unnecessary but
+    // it's unclear it would be a win overall
+    public CType getKeyValidatorAsCType()
     {
         return keyValidator instanceof CompositeType
-             ? new CompositeType.Builder((CompositeType)keyValidator)
-             : new NonCompositeBuilder(keyValidator);
-    }
-
-    public ColumnNameBuilder getColumnNameBuilder()
-    {
-        return comparator instanceof CompositeType
-             ? new CompositeType.Builder((CompositeType)comparator)
-             : new NonCompositeBuilder(comparator);
+             ? new CompoundCType(((CompositeType) keyValidator).types)
+             : new SimpleCType(keyValidator);
     }
 
     public double getBloomFilterFpChance()
@@ -891,19 +891,23 @@ public final class CFMetaData
             .toHashCode();
     }
 
-    public AbstractType<?> getValueValidatorFromCellName(ByteBuffer cellName)
+    public AbstractType<?> getValueValidator(ColumnIdentifier column)
     {
-        // If this is a CQL table, we need to pull out the CQL column name to look up the correct column type.
-        if (!hasCompositeComparator() || isDense())
-            return getValueValidator(new ColumnIdentifier(cellName, comparator));
+        return getValueValidator(getColumnDefinition(column));
+    }
 
-        ByteBuffer name = ((CompositeType)comparator).extractLastComponent(cellName);
-        return getValueValidator(new ColumnIdentifier(name, UTF8Type.instance));
+    public AbstractType<?> getValueValidator(CellName name)
+    {
+        return getValueValidator(getColumnDefinition(name));
     }
 
-    public AbstractType<?> getValueValidator(ColumnIdentifier column)
+    public AbstractType<?> getValueValidatorForFullCellName(ByteBuffer name)
     {
-        return getValueValidator(getColumnDefinition(column));
+        // If this is a CQL table, we need to pull out the CQL column name to look up the correct column type.
+        if (!isCQL3Table())
+            return getValueValidator(getColumnDefinition(name));
+
+        return getValueValidator(getColumnDefinition(comparator.cellFromByteBuffer(name)));
     }
 
     public AbstractType<?> getValueValidator(ColumnDefinition columnDefinition)
@@ -954,11 +958,19 @@ public final class CFMetaData
 
         try
         {
+            AbstractType<?> rawComparator = TypeParser.parse(cf_def.comparator_type);
+            AbstractType<?> subComparator = cfType == ColumnFamilyType.Standard
+                                          ? null
+                                          : cf_def.subcomparator_type == null ? BytesType.instance : TypeParser.parse(cf_def.subcomparator_type);
+
+            // Dense for thrit is simplified as all column metadata are REGULAR
+            boolean isDense = (cf_def.column_metadata == null || cf_def.column_metadata.isEmpty()) && !isCQL3OnlyPKComparator(rawComparator);
+            CellNameType comparator = CellNames.fromAbstractType(makeRawAbstractType(rawComparator, subComparator), isDense);
+
             CFMetaData newCFMD = new CFMetaData(cf_def.keyspace,
                                                 cf_def.name,
                                                 cfType,
-                                                TypeParser.parse(cf_def.comparator_type),
-                                                cf_def.subcomparator_type == null ? null : TypeParser.parse(cf_def.subcomparator_type));
+                                                comparator);
 
             if (cf_def.isSetGc_grace_seconds()) { newCFMD.gcGraceSeconds(cf_def.gc_grace_seconds); }
             if (cf_def.isSetMin_compaction_threshold()) { newCFMD.minCompactionThreshold(cf_def.min_compaction_threshold); }
@@ -992,15 +1004,12 @@ public final class CFMetaData
 
             if (cf_def.isSetKey_validation_class()) { newCFMD.keyValidator(TypeParser.parse(cf_def.key_validation_class)); }
             if (cf_def.isSetKey_alias() && !(newCFMD.keyValidator instanceof CompositeType))
-            {
-                newCFMD.columnMetadata.put(cf_def.key_alias,
-                                           ColumnDefinition.partitionKeyDef(newCFMD, cf_def.key_alias, newCFMD.keyValidator, null));
-            }
+                newCFMD.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(newCFMD, cf_def.key_alias, newCFMD.keyValidator, null));
 
-            return newCFMD.comment(cf_def.comment)
+            return newCFMD.addAllColumnDefinitions(ColumnDefinition.fromThrift(newCFMD, cf_def.column_metadata))
+                          .comment(cf_def.comment)
                           .replicateOnWrite(cf_def.replicate_on_write)
                           .defaultValidator(TypeParser.parse(cf_def.default_validation_class))
-                          .columnMetadata(ColumnDefinition.fromThrift(newCFMD, cf_def.column_metadata))
                           .compressionParameters(cp)
                           .rebuild();
         }
@@ -1016,19 +1025,24 @@ public final class CFMetaData
      * @param row CqlRow containing columns from schema_columnfamilies.
      * @return CFMetaData derived from CqlRow
      */
-    public static CFMetaData fromThriftCqlRow(CqlRow row)
+    public static CFMetaData fromThriftCqlRow(CqlRow cf, CqlResult columnsRes)
     {
-        Map<String, ByteBuffer> columns = new HashMap<>();
-        try
-        {
-            for (org.apache.cassandra.thrift.Column column : row.getColumns())
-                columns.put(ByteBufferUtil.string(column.bufferForName()), column.value);
-        }
-        catch (CharacterCodingException ignore)
-        {
-        }
-        UntypedResultSet.Row cql3row = new UntypedResultSet.Row(columns);
-        return fromSchemaNoColumnsNoTriggers(cql3row);
+        UntypedResultSet.Row cfRow = new UntypedResultSet.Row(convertThriftCqlRow(cf));
+
+        List<Map<String, ByteBuffer>> cols = new ArrayList<>(columnsRes.rows.size());
+        for (CqlRow row : columnsRes.rows)
+            cols.add(convertThriftCqlRow(row));
+        UntypedResultSet colsRow = UntypedResultSet.create(cols);
+
+        return fromSchemaNoTriggers(cfRow, colsRow);
+    }
+
+    private static Map<String, ByteBuffer> convertThriftCqlRow(CqlRow row)
+    {
+        Map<String, ByteBuffer> m = new HashMap<>();
+        for (org.apache.cassandra.thrift.Column column : row.getColumns())
+            m.put(UTF8Type.instance.getString(column.bufferForName()), column.value);
+        return m;
     }
 
     public void reload()
@@ -1051,11 +1065,11 @@ public final class CFMetaData
     /**
      * Updates CFMetaData in-place to match cf_def
      *
-     * *Note*: This method left public only for DefsTest, don't use directly!
+     * *Note*: This method left package-private only for DefsTest, don't use directly!
      *
      * @throws ConfigurationException if ks/cf names or cf ids didn't match
      */
-    public void apply(CFMetaData cfm) throws ConfigurationException
+    void apply(CFMetaData cfm) throws ConfigurationException
     {
         logger.debug("applying {} to {}", cfm, this);
 
@@ -1201,9 +1215,8 @@ public final class CFMetaData
 
         if (isSuper())
         {
-            CompositeType ct = (CompositeType)comparator;
-            def.setComparator_type(ct.types.get(0).toString());
-            def.setSubcomparator_type(ct.types.get(1).toString());
+            def.setComparator_type(comparator.subtype(0).toString());
+            def.setSubcomparator_type(comparator.subtype(1).toString());
         }
         else
         {
@@ -1257,37 +1270,14 @@ public final class CFMetaData
     }
 
     /**
-     * Returns a ColumnDefinition given a full (internal) column name.
+     * Returns a ColumnDefinition given a cell name.
      */
-    public ColumnDefinition getColumnDefinitionFromCellName(ByteBuffer cellName)
+    public ColumnDefinition getColumnDefinition(CellName cellName)
     {
-        if (!isSuper() && hasCompositeComparator())
-        {
-            CompositeType composite = (CompositeType)comparator;
-            ByteBuffer[] components = composite.split(cellName);
-            for (ColumnDefinition def : allColumns())
-            {
-                ByteBuffer toCompare;
-                if (def.isOnAllComponents())
-                {
-                    toCompare = cellName;
-                }
-                else
-                {
-                    if (def.position() >= components.length)
-                        break;
-
-                    toCompare = components[def.position()];
-                }
-                if (def.name.bytes.equals(toCompare))
-                    return def;
-            }
-            return null;
-        }
-        else
-        {
-            return columnMetadata.get(cellName);
-        }
+        ColumnIdentifier id = cellName.cql3ColumnName();
+        return id == null
+             ? getColumnDefinition(cellName.toByteBuffer())  // Means a dense layout, try the full column name
+             : getColumnDefinition(id);
     }
 
     public ColumnDefinition getColumnDefinitionForIndex(String indexName)
@@ -1356,8 +1346,13 @@ public final class CFMetaData
     public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
     {
         if (version.hasSuperColumns && cfType == ColumnFamilyType.Super)
-            return SuperColumns.onDiskIterator(in, count, flag, expireBefore);
-        return Column.onDiskIterator(in, count, flag, expireBefore, version);
+            return SuperColumns.onDiskIterator(in, count, flag, expireBefore, comparator);
+        return Column.onDiskIterator(in, count, flag, expireBefore, version, comparator);
+    }
+
+    public AtomDeserializer getOnDiskDeserializer(DataInput in, Descriptor.Version version)
+    {
+        return new AtomDeserializer(comparator, in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
     }
 
     public static boolean isNameValid(String name)
@@ -1382,8 +1377,11 @@ public final class CFMetaData
         if (cfType == null)
             throw new ConfigurationException(String.format("Invalid column family type for %s", cfName));
 
-        if (comparator instanceof CounterColumnType)
-            throw new ConfigurationException("CounterColumnType is not a valid comparator");
+        for (int i = 0; i < comparator.size(); i++)
+        {
+            if (comparator.subtype(i) instanceof CounterColumnType)
+                throw new ConfigurationException("CounterColumnType is not a valid comparator");
+        }
         if (keyValidator instanceof CounterColumnType)
             throw new ConfigurationException("CounterColumnType is not a valid key validator");
 
@@ -1534,9 +1532,8 @@ public final class CFMetaData
         ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = SchemaColumnFamiliesCf.getColumnNameBuilder();
-        builder.add(ByteBufferUtil.bytes(cfName));
-        cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
+        Composite prefix = SchemaColumnFamiliesCf.comparator.make(cfName);
+        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
 
         for (ColumnDefinition cd : allColumns())
             cd.deleteFromSchema(rm, timestamp);
@@ -1570,67 +1567,80 @@ public final class CFMetaData
         // For property that can be null (and can be changed), we insert tombstones, to make sure
         // we don't keep a property the user has removed
         ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
+        Composite prefix = SchemaColumnFamiliesCf.comparator.make(cfName);
+        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
 
-        cf.addColumn(Column.create("", timestamp, cfName, ""));
-        cf.addColumn(Column.create(cfType.toString(), timestamp, cfName, "type"));
+        adder.add("type", cfType.toString());
 
         if (isSuper())
         {
             // We need to continue saving the comparator and subcomparator separatly, otherwise
             // we won't know at deserialization if the subcomparator should be taken into account
             // TODO: we should implement an on-start migration if we want to get rid of that.
-            CompositeType ct = (CompositeType)comparator;
-            cf.addColumn(Column.create(ct.types.get(0).toString(), timestamp, cfName, "comparator"));
-            cf.addColumn(Column.create(ct.types.get(1).toString(), timestamp, cfName, "subcomparator"));
+            adder.add("comparator", comparator.subtype(0).toString());
+            adder.add("subcomparator", comparator.subtype(1).toString());
         }
         else
         {
-            cf.addColumn(Column.create(comparator.toString(), timestamp, cfName, "comparator"));
-        }
-
-        cf.addColumn(comment == null ? DeletedColumn.create(ldt, timestamp, cfName, "comment")
-                                     : Column.create(comment, timestamp, cfName, "comment"));
-        cf.addColumn(Column.create(readRepairChance, timestamp, cfName, "read_repair_chance"));
-        cf.addColumn(Column.create(dcLocalReadRepairChance, timestamp, cfName, "local_read_repair_chance"));
-        cf.addColumn(Column.create(replicateOnWrite, timestamp, cfName, "replicate_on_write"));
-        cf.addColumn(Column.create(populateIoCacheOnFlush, timestamp, cfName, "populate_io_cache_on_flush"));
-        cf.addColumn(Column.create(gcGraceSeconds, timestamp, cfName, "gc_grace_seconds"));
-        cf.addColumn(Column.create(defaultValidator.toString(), timestamp, cfName, "default_validator"));
-        cf.addColumn(Column.create(keyValidator.toString(), timestamp, cfName, "key_validator"));
-        cf.addColumn(Column.create(minCompactionThreshold, timestamp, cfName, "min_compaction_threshold"));
-        cf.addColumn(Column.create(maxCompactionThreshold, timestamp, cfName, "max_compaction_threshold"));
-        cf.addColumn(bloomFilterFpChance == null ? DeletedColumn.create(ldt, timestamp, cfName, "bloomFilterFpChance")
-                                                 : Column.create(bloomFilterFpChance, timestamp, cfName, "bloom_filter_fp_chance"));
-        cf.addColumn(Column.create(memtableFlushPeriod, timestamp, cfName, "memtable_flush_period_in_ms"));
-        cf.addColumn(Column.create(caching.toString(), timestamp, cfName, "caching"));
-        cf.addColumn(Column.create(defaultTimeToLive, timestamp, cfName, "default_time_to_live"));
-        cf.addColumn(Column.create(compactionStrategyClass.getName(), timestamp, cfName, "compaction_strategy_class"));
-        cf.addColumn(Column.create(json(compressionParameters.asThriftOptions()), timestamp, cfName, "compression_parameters"));
-        cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options"));
-        cf.addColumn(Column.create(indexInterval, timestamp, cfName, "index_interval"));
-        cf.addColumn(Column.create(speculativeRetry.toString(), timestamp, cfName, "speculative_retry"));
+            adder.add("comparator", comparator.toString());
+        }
+
+        adder.add("comment", comment);
+        adder.add("read_repair_chance", readRepairChance);
+        adder.add("local_read_repair_chance", dcLocalReadRepairChance);
+        adder.add("replicate_on_write", replicateOnWrite);
+        adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
+        adder.add("gc_grace_seconds", gcGraceSeconds);
+        adder.add("default_validator", defaultValidator.toString());
+        adder.add("key_validator", keyValidator.toString());
+        adder.add("min_compaction_threshold", minCompactionThreshold);
+        adder.add("max_compaction_threshold", maxCompactionThreshold);
+        adder.add("bloom_filter_fp_chance", bloomFilterFpChance);
+
+        adder.add("memtable_flush_period_in_ms", memtableFlushPeriod);
+        adder.add("caching", caching.toString());
+        adder.add("default_time_to_live", defaultTimeToLive);
+        adder.add("compaction_strategy_class", compactionStrategyClass.getName());
+        adder.add("compression_parameters", json(compressionParameters.asThriftOptions()));
+        adder.add("compaction_strategy_options", json(compactionStrategyOptions));
+        adder.add("index_interval", indexInterval);
+        adder.add("speculative_retry", speculativeRetry.toString());
 
         for (Map.Entry<ColumnIdentifier, Long> entry : droppedColumns.entrySet())
-            cf.addColumn(new Column(makeDroppedColumnName(entry.getKey()), LongType.instance.decompose(entry.getValue()), timestamp));
+            adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue());
 
         // Save the CQL3 metadata "the old way" for compatibility sake
-        cf.addColumn(Column.create(aliasesToJson(partitionKeyColumns), timestamp, cfName, "key_aliases"));
-        cf.addColumn(Column.create(aliasesToJson(clusteringColumns), timestamp, cfName, "column_aliases"));
-        cf.addColumn(compactValueColumn == null ? DeletedColumn.create(ldt, timestamp, cfName, "value_alias")
-                                                : Column.create(compactValueColumn.name.bytes, timestamp, cfName, "value_alias"));
+        adder.add("key_aliases", aliasesToJson(partitionKeyColumns));
+        adder.add("column_aliases", aliasesToJson(clusteringColumns));
+        adder.add("value_alias", compactValueColumn == null ? null : compactValueColumn.name.toString());
     }
 
     // Package protected for use by tests
-    static CFMetaData fromSchemaNoColumnsNoTriggers(UntypedResultSet.Row result)
+    static CFMetaData fromSchemaNoTriggers(UntypedResultSet.Row result, UntypedResultSet serializedColumnDefinitions)
     {
         try
         {
-            CFMetaData cfm = new CFMetaData(result.getString("keyspace_name"),
-                                            result.getString("columnfamily_name"),
-                                            ColumnFamilyType.valueOf(result.getString("type")),
-                                            TypeParser.parse(result.getString("comparator")),
-                                            result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null);
+            String ksName = result.getString("keyspace_name");
+            String cfName = result.getString("columnfamily_name");
+
+            AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
+            AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
+            ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type"));
+
+            AbstractType<?> fullRawComparator = makeRawAbstractType(rawComparator, subComparator);
+
+            List<ColumnDefinition> columnDefs = ColumnDefinition.fromSchema(serializedColumnDefinitions,
+                                                                            ksName,
+                                                                            cfName,
+                                                                            fullRawComparator,
+                                                                            cfType == ColumnFamilyType.Super);
+
+            CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense(fullRawComparator, columnDefs));
+
+            CFMetaData cfm = new CFMetaData(ksName,
+                                            cfName,
+                                            cfType,
+                                            comparator);
 
             cfm.readRepairChance(result.getDouble("read_repair_chance"));
             cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
@@ -1675,11 +1685,11 @@ public final class CFMetaData
              * However, for upgrade sake we need to still be able to read those old values. Moreover, we cannot easily
              * remove those old columns once "converted" to columnMetadata because that would screw up nodes that may
              * not have upgraded. So for now we keep the both info and in sync, even though its redundant.
-             * In other words, the ColumnDefinition the following lines add may be replaced later when ColumnDefinition.fromSchema
-             * is called but that's ok.
              */
-            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("key_aliases"))), cfm.keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
-            cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("column_aliases"))), cfm.comparator, ColumnDefinition.Kind.CLUSTERING_COLUMN);
+            if (result.has("key_aliases"))
+                cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("key_aliases"))), cfm.keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
+            if (result.has("column_aliases"))
+                cfm.addColumnMetadataFromAliases(aliasesFromStrings(fromJsonList(result.getString("column_aliases"))), cfm.comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
 
             if (result.has("value_alias"))
                 cfm.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(result.getBytes("value_alias")), cfm.defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
@@ -1687,7 +1697,10 @@ public final class CFMetaData
             if (result.has("dropped_columns"))
                 cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
 
-            return cfm;
+            for (ColumnDefinition cd : columnDefs)
+                cfm.addOrReplaceColumnDefinition(cd);
+
+            return cfm.rebuild();
         }
         catch (SyntaxException | ConfigurationException e)
         {
@@ -1723,13 +1736,16 @@ public final class CFMetaData
      */
     public static CFMetaData fromSchema(UntypedResultSet.Row result)
     {
-        CFMetaData cfDef = fromSchemaNoColumnsNoTriggers(result);
+        String ksName = result.getString("keyspace_name");
+        String cfName = result.getString("columnfamily_name");
+
+        Row serializedColumns = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNS_CF, ksName, cfName);
+        CFMetaData cfm = fromSchemaNoTriggers(result, ColumnDefinition.resultify(serializedColumns));
 
-        Row serializedTriggers = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_TRIGGERS_CF, cfDef.ksName, cfDef.cfName);
-        addTriggerDefinitionsFromSchema(cfDef, serializedTriggers);
+        Row serializedTriggers = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_TRIGGERS_CF, ksName, cfName);
+        addTriggerDefinitionsFromSchema(cfm, serializedTriggers);
 
-        Row serializedColumns = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNS_CF, cfDef.ksName, cfDef.cfName);
-        return addColumnDefinitionsFromSchema(cfDef, serializedColumns);
+        return cfm;
     }
 
     private static CFMetaData fromSchema(Row row)
@@ -1740,6 +1756,9 @@ public final class CFMetaData
 
     private String aliasesToJson(List<ColumnDefinition> rawAliases)
     {
+        if (rawAliases == null)
+            return null;
+
         List<String> aliases = new ArrayList<String>(rawAliases.size());
         for (ColumnDefinition rawAlias : rawAliases)
             aliases.add(rawAlias.name.toString());
@@ -1762,14 +1781,6 @@ public final class CFMetaData
         return converted;
     }
 
-    private ByteBuffer makeDroppedColumnName(ColumnIdentifier column)
-    {
-        ColumnNameBuilder builder = SchemaColumnFamiliesCf.getColumnNameBuilder();
-        builder.add(UTF8Type.instance.decompose(cfName));
-        builder.add(UTF8Type.instance.decompose("dropped_columns"));
-        return builder.add(column).build();
-    }
-
     /**
      * Convert current metadata into schema mutation
      *
@@ -1798,33 +1809,23 @@ public final class CFMetaData
         switch (kind)
         {
             case REGULAR:
-                AbstractType<?> cfComparator = cfType == ColumnFamilyType.Super ? ((CompositeType)comparator).types.get(1) : comparator;
-                if (cfComparator instanceof CompositeType)
-                {
-                    if (componentIndex == null)
-                        return cfComparator;
+                if (componentIndex == null)
+                    return comparator.asAbstractType();
 
-                    List<AbstractType<?>> types = ((CompositeType)cfComparator).types;
-                    AbstractType<?> t = types.get(componentIndex);
-                    assert t != null : "Non-sensical component index";
-                    return t;
-                }
-                else
-                {
-                    return cfComparator;
-                }
+                AbstractType<?> t = comparator.subtype(componentIndex);
+                assert t != null : "Non-sensical component index";
+                return t;
             default:
                 // CQL3 column names are UTF8
                 return UTF8Type.instance;
         }
     }
 
-    // Package protected for use by tests
-    static CFMetaData addColumnDefinitionsFromSchema(CFMetaData cfm, Row serializedColumnDefinitions)
+    public CFMetaData addAllColumnDefinitions(Collection<ColumnDefinition> defs)
     {
-        for (ColumnDefinition cd : ColumnDefinition.fromSchema(serializedColumnDefinitions, cfm))
-            cfm.addOrReplaceColumnDefinition(cd);
-        return cfm.rebuild();
+        for (ColumnDefinition def : defs)
+            addOrReplaceColumnDefinition(def);
+        return this;
     }
 
     public CFMetaData addColumnDefinition(ColumnDefinition def) throws ConfigurationException
@@ -1839,12 +1840,16 @@ public final class CFMetaData
     // know this cannot happen.
     public CFMetaData addOrReplaceColumnDefinition(ColumnDefinition def)
     {
+        if (def.kind == ColumnDefinition.Kind.REGULAR)
+            comparator.addCQL3Column(def.name);
         columnMetadata.put(def.name.bytes, def);
         return this;
     }
 
     public boolean removeColumnDefinition(ColumnDefinition def)
     {
+        if (def.kind == ColumnDefinition.Kind.REGULAR)
+            comparator.removeCQL3Column(def.name);
         return columnMetadata.remove(def.name.bytes) != null;
     }
 
@@ -1900,13 +1905,9 @@ public final class CFMetaData
     public CFMetaData rebuild()
     {
         List<ColumnDefinition> pkCols = nullInitializedList(keyValidator.componentsCount());
-        boolean isDense = isDense(comparator, allColumns());
-        int nbCkCols = isDense
-                     ? comparator.componentsCount()
-                     : comparator.componentsCount() - (hasCollections() ? 2 : 1);
-        List<ColumnDefinition> ckCols = nullInitializedList(nbCkCols);
+        List<ColumnDefinition> ckCols = nullInitializedList(comparator.clusteringPrefixSize());
         // We keep things sorted to get consistent/predicatable order in select queries
-        SortedSet<ColumnDefinition> regCols = new TreeSet<ColumnDefinition>(regularColumnComparator);
+        SortedSet<ColumnDefinition> regCols = new TreeSet<>(regularColumnComparator);
         ColumnDefinition compactCol = null;
 
         for (ColumnDefinition def : allColumns())
@@ -1918,7 +1919,7 @@ public final class CFMetaData
                     pkCols.set(def.position(), def);
                     break;
                 case CLUSTERING_COLUMN:
-                    assert !(def.isOnAllComponents() && comparator instanceof CompositeType);
+                    assert !(def.isOnAllComponents() && comparator.isCompound());
                     ckCols.set(def.position(), def);
                     break;
                 case REGULAR:
@@ -1935,7 +1936,7 @@ public final class CFMetaData
         partitionKeyColumns = addDefaultKeyAliases(pkCols);
         clusteringColumns = addDefaultColumnAliases(ckCols);
         regularColumns = regCols;
-        compactValueColumn = addDefaultValueAlias(compactCol, isDense);
+        compactValueColumn = addDefaultValueAlias(compactCol, comparator.isDense());
         return this;
     }
 
@@ -1969,12 +1970,17 @@ public final class CFMetaData
         {
             if (ckCols.get(i) == null)
             {
-                Integer idx = null;
-                AbstractType<?> type = comparator;
-                if (hasCompositeComparator())
+                Integer idx;
+                AbstractType<?> type;
+                if (comparator.isCompound())
                 {
                     idx = i;
-                    type = ((CompositeType)comparator).types.get(i);
+                    type = comparator.subtype(i);
+                }
+                else
+                {
+                    idx = null;
+                    type = comparator.asAbstractType();
                 }
                 ByteBuffer name = ByteBufferUtil.bytes(DEFAULT_COLUMN_ALIAS + (i + 1));
                 ColumnDefinition newDef = ColumnDefinition.clusteringKeyDef(this, name, type, idx);
@@ -2003,26 +2009,6 @@ public final class CFMetaData
         }
     }
 
-    public boolean hasCollections()
-    {
-        return getCollectionType() != null;
-    }
-
-    public boolean hasCompositeComparator()
-    {
-        return comparator instanceof CompositeType;
-    }
-
-    public ColumnToCollectionType getCollectionType()
-    {
-        if (isSuper() || !hasCompositeComparator())
-            return null;
-
-        CompositeType composite = (CompositeType)comparator;
-        AbstractType<?> last = composite.types.get(composite.types.size() - 1);
-        return last instanceof ColumnToCollectionType ? (ColumnToCollectionType)last : null;
-    }
-
     /*
      * We call dense a CF for which each component of the comparator is a clustering column, i.e. no
      * component is used to store a regular column names. In other words, non-composite static "thrift"
@@ -2073,12 +2059,6 @@ public final class CFMetaData
 
     }
 
-    // See above.
-    public boolean isDense()
-    {
-        return clusteringColumns.size() == comparator.componentsCount();
-    }
-
     private static boolean isCQL3OnlyPKComparator(AbstractType<?> comparator)
     {
         if (!(comparator instanceof CompositeType))
@@ -2088,6 +2068,11 @@ public final class CFMetaData
         return ct.types.size() == 1 && ct.types.get(0) instanceof UTF8Type;
     }
 
+    public boolean isCQL3Table()
+    {
+        return !isSuper() && !comparator.isDense() && comparator.isCompound();
+    }
+
     private static <T> List<T> nullInitializedList(int size)
     {
         List<T> l = new ArrayList<>(size);
@@ -2155,77 +2140,4 @@ public final class CFMetaData
             .append("triggers", triggers)
             .toString();
     }
-
-    private static class NonCompositeBuilder implements ColumnNameBuilder
-    {
-        private final AbstractType<?> type;
-        private ByteBuffer columnName;
-
-        private NonCompositeBuilder(AbstractType<?> type)
-        {
-            this.type = type;
-        }
-
-        public NonCompositeBuilder add(ByteBuffer bb)
-        {
-            if (columnName != null)
-                throw new IllegalStateException("Column name is already constructed");
-
-            columnName = bb;
-            return this;
-        }
-
-        public NonCompositeBuilder add(ColumnIdentifier name)
-        {
-            return add(name.bytes);
-        }
-
-        public NonCompositeBuilder add(ByteBuffer bb, Relation.Type op)
-        {
-            return add(bb);
-        }
-
-        public int componentCount()
-        {
-            return columnName == null ? 0 : 1;
-        }
-
-        public int remainingCount()
-        {
-            return columnName == null ? 1 : 0;
-        }
-
-        public ByteBuffer get(int i)
-        {
-            if (i < 0 || i >= (columnName == null ? 0 : 1))
-                throw new IllegalArgumentException();
-
-            return columnName;
-        }
-
-        public ByteBuffer build()
-        {
-            return columnName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : columnName;
-        }
-
-        public ByteBuffer buildAsEndOfRange()
-        {
-            return build();
-        }
-
-        public NonCompositeBuilder copy()
-        {
-            NonCompositeBuilder newBuilder = new NonCompositeBuilder(type);
-            newBuilder.columnName = columnName;
-            return newBuilder;
-        }
-
-        public ByteBuffer getComponent(int i)
-        {
-            if (i != 0 || columnName == null)
-                throw new IllegalArgumentException();
-
-            return columnName;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 05a10bc..ef6f589 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Maps;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.exceptions.*;
@@ -265,26 +266,22 @@ public class ColumnDefinition extends ColumnSpecification
         return cd;
     }
 
-    public static Map<ByteBuffer, ColumnDefinition> fromThrift(CFMetaData cfm, List<ColumnDef> thriftDefs) throws SyntaxException, ConfigurationException
+    public static List<ColumnDefinition> fromThrift(CFMetaData cfm, List<ColumnDef> thriftDefs) throws SyntaxException, ConfigurationException
     {
         if (thriftDefs == null)
-            return new HashMap<>();
+            return Collections.emptyList();
 
-        Map<ByteBuffer, ColumnDefinition> cds = new TreeMap<>();
+        List<ColumnDefinition> defs = new ArrayList<>(thriftDefs.size());
         for (ColumnDef thriftColumnDef : thriftDefs)
-        {
-            ColumnDefinition def = fromThrift(cfm, thriftColumnDef);
-            cds.put(def.name.bytes, def);
-        }
+            defs.add(fromThrift(cfm, thriftColumnDef));
 
-        return cds;
+        return defs;
     }
 
     /**
      * Drop specified column from the schema using given row.
      *
      * @param rm         The schema row mutation
-     * @param cfName     The name of the parent ColumnFamily
      * @param timestamp  The timestamp to use for column modification
      */
     public void deleteFromSchema(RowMutation rm, long timestamp)
@@ -292,29 +289,24 @@ public class ColumnDefinition extends ColumnSpecification
         ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaColumnsCf.getColumnNameBuilder();
-        // Note: the following is necessary for backward compatibility. For CQL3, BBU.bytes(name.toString()) == name
         ByteBuffer nameBytes = ByteBufferUtil.bytes(name.toString());
-        builder.add(ByteBufferUtil.bytes(cfName)).add(nameBytes);
-        cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
+        // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
+        Composite prefix = CFMetaData.SchemaColumnsCf.comparator.make(cfName, name.toString());
+        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
     }
 
     public void toSchema(RowMutation rm, long timestamp)
     {
         ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
-        int ldt = (int) (System.currentTimeMillis() / 1000);
-
-        cf.addColumn(Column.create("", timestamp, cfName, name.toString(), ""));
-        cf.addColumn(Column.create(type.toString(), timestamp, cfName, name.toString(), TYPE));
-        cf.addColumn(indexType == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_TYPE)
-                                       : Column.create(indexType.toString(), timestamp, cfName, name.toString(), INDEX_TYPE));
-        cf.addColumn(indexOptions == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_OPTIONS)
-                                          : Column.create(json(indexOptions), timestamp, cfName, name.toString(), INDEX_OPTIONS));
-        cf.addColumn(indexName == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), INDEX_NAME)
-                                       : Column.create(indexName, timestamp, cfName, name.toString(), INDEX_NAME));
-        cf.addColumn(componentIndex == null ? DeletedColumn.create(ldt, timestamp, cfName, name.toString(), COMPONENT_INDEX)
-                                            : Column.create(componentIndex, timestamp, cfName, name.toString(), COMPONENT_INDEX));
-        cf.addColumn(Column.create(kind.serialize(), timestamp, cfName, name.toString(), KIND));
+        Composite prefix = CFMetaData.SchemaColumnsCf.comparator.make(cfName, name.toString());
+        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
+
+        adder.add(TYPE, type.toString());
+        adder.add(INDEX_TYPE, indexType == null ? null : indexType.toString());
+        adder.add(INDEX_OPTIONS, json(indexOptions));
+        adder.add(INDEX_NAME, indexName);
+        adder.add(COMPONENT_INDEX, componentIndex);
+        adder.add(KIND, kind.serialize());
     }
 
     public ColumnDefinition apply(ColumnDefinition def)  throws ConfigurationException
@@ -344,18 +336,22 @@ public class ColumnDefinition extends ColumnSpecification
                                     kind);
     }
 
+    public static UntypedResultSet resultify(Row serializedColumns)
+    {
+        String query = String.format("SELECT * FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.SCHEMA_COLUMNS_CF);
+        return QueryProcessor.resultify(query, serializedColumns);
+    }
+
     /**
      * Deserialize columns from storage-level representation
      *
      * @param serializedColumns storage-level partition containing the column definitions
      * @return the list of processed ColumnDefinitions
      */
-    public static List<ColumnDefinition> fromSchema(Row serializedColumns, CFMetaData cfm)
+    public static List<ColumnDefinition> fromSchema(UntypedResultSet serializedColumns, String ksName, String cfName, AbstractType<?> rawComparator, boolean isSuper)
     {
         List<ColumnDefinition> cds = new ArrayList<>();
-
-        String query = String.format("SELECT * FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.SCHEMA_COLUMNS_CF);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, serializedColumns))
+        for (UntypedResultSet.Row row : serializedColumns)
         {
             Kind kind = row.has(KIND)
                       ? Kind.deserialize(row.getString(KIND))
@@ -364,12 +360,12 @@ public class ColumnDefinition extends ColumnSpecification
             Integer componentIndex = null;
             if (row.has(COMPONENT_INDEX))
                 componentIndex = row.getInt(COMPONENT_INDEX);
-            else if (kind == Kind.CLUSTERING_COLUMN && cfm.isSuper())
+            else if (kind == Kind.CLUSTERING_COLUMN && isSuper)
                 componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
 
             // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
             // we need to use the comparator fromString method
-            AbstractType<?> comparator = cfm.getComponentComparator(componentIndex, kind);
+            AbstractType<?> comparator = getComponentComparator(rawComparator, componentIndex, kind);
             ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString(COLUMN_NAME)), comparator);
 
             AbstractType<?> validator;
@@ -394,12 +390,27 @@ public class ColumnDefinition extends ColumnSpecification
             if (row.has(INDEX_NAME))
                 indexName = row.getString(INDEX_NAME);
 
-            cds.add(new ColumnDefinition(cfm.ksName, cfm.cfName, name, validator, indexType, indexOptions, indexName, componentIndex, kind));
+            cds.add(new ColumnDefinition(ksName, cfName, name, validator, indexType, indexOptions, indexName, componentIndex, kind));
         }
 
         return cds;
     }
 
+    public static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex, ColumnDefinition.Kind kind)
+    {
+        switch (kind)
+        {
+            case REGULAR:
+                if (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType)))
+                    return rawComparator;
+
+                return ((CompositeType)rawComparator).types.get(componentIndex);
+            default:
+                // CQL3 column names are UTF8
+                return UTF8Type.instance;
+        }
+    }
+
     public String getIndexName()
     {
         return indexName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 71f81c7..3dfea03 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -241,10 +241,11 @@ public final class KSMetaData
     {
         RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
         ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaKeyspacesCf);
+        CFRowAdder adder = new CFRowAdder(cf, CFMetaData.SchemaKeyspacesCf.comparator.builder().build(), timestamp);
 
-        cf.addColumn(Column.create(durableWrites, timestamp, "durable_writes"));
-        cf.addColumn(Column.create(strategyClass.getName(), timestamp, "strategy_class"));
-        cf.addColumn(Column.create(json(strategyOptions), timestamp, "strategy_options"));
+        adder.add("durable_writes", durableWrites);
+        adder.add("strategy_class", strategyClass.getName());
+        adder.add("strategy_options", json(strategyOptions));
 
         for (CFMetaData cfm : cfMetaData.values())
             cfm.toSchema(rm, timestamp);
@@ -308,17 +309,6 @@ public final class KSMetaData
             CFMetaData cfm = CFMetaData.fromSchema(result);
             cfms.put(cfm.cfName, cfm);
         }
-
-        for (CFMetaData cfm : cfms.values())
-        {
-            Row columnRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNS_CF, cfm.ksName, cfm.cfName);
-            // This may replace some existing definition coming from the old key, column and
-            // value aliases. But that's what we want (see CFMetaData.fromSchemaNoColumnsNoTriggers).
-            for (ColumnDefinition cd : ColumnDefinition.fromSchema(columnRow, cfm))
-                cfm.addOrReplaceColumnDefinition(cd);
-            cfm.rebuild();
-        }
-
         return cfms;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/config/TriggerDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/TriggerDefinition.java b/src/java/org/apache/cassandra/config/TriggerDefinition.java
index e08f97c..e1809eb 100644
--- a/src/java/org/apache/cassandra/config/TriggerDefinition.java
+++ b/src/java/org/apache/cassandra/config/TriggerDefinition.java
@@ -22,15 +22,13 @@ import java.util.*;
 
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.thrift.TriggerDef;
 
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
 public class TriggerDefinition
 {
     private static final String TRIGGER_NAME = "trigger_name";
@@ -84,11 +82,11 @@ public class TriggerDefinition
     {
         ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getColumnNameBuilder();
-        builder.add(bytes(cfName)).add(bytes(name));
+        CFMetaData cfm = CFMetaData.SchemaTriggersCf;
+        Composite prefix = cfm.comparator.make(cfName, name);
+        CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
 
-        cf.addColumn(builder.copy().add(bytes("")).build(), bytes(""), timestamp); // the row marker
-        cf.addColumn(builder.copy().add(bytes(TRIGGER_OPTIONS)).add(bytes(CLASS)).build(), bytes(classOption), timestamp);
+        adder.addMapEntry(TRIGGER_OPTIONS, CLASS, classOption);
     }
 
     /**
@@ -103,9 +101,8 @@ public class TriggerDefinition
         ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
         int ldt = (int) (System.currentTimeMillis() / 1000);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaTriggersCf.getColumnNameBuilder();
-        builder.add(bytes(cfName)).add(bytes(name));
-        cf.addAtom(new RangeTombstone(builder.build(), builder.buildAsEndOfRange(), timestamp, ldt));
+        Composite prefix = CFMetaData.SchemaTriggersCf.comparator.make(cfName, name);
+        cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
     }
 
     public static TriggerDefinition fromThrift(TriggerDef thriftDef)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/config/UTMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UTMetaData.java b/src/java/org/apache/cassandra/config/UTMetaData.java
index 108e106..f78e645 100644
--- a/src/java/org/apache/cassandra/config/UTMetaData.java
+++ b/src/java/org/apache/cassandra/config/UTMetaData.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.cql3.*;
@@ -34,6 +35,9 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public final class UTMetaData
 {
+    private static final ColumnIdentifier COLUMN_NAMES = new ColumnIdentifier("column_names", false);
+    private static final ColumnIdentifier COLUMN_TYPES = new ColumnIdentifier("column_types", false);
+
     private final Map<ByteBuffer, UserType> userTypes = new HashMap<>();
 
     // Only for Schema. You should generally not create instance of this, but rather use
@@ -84,8 +88,9 @@ public final class UTMetaData
         RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, newType.name);
         ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
 
-        ColumnNameBuilder builder = CFMetaData.SchemaUserTypesCf.getColumnNameBuilder();
-        UpdateParameters params = new UpdateParameters(CFMetaData.SchemaUserTypesCf, Collections.<ByteBuffer>emptyList(), timestamp, 0, null);
+        CFMetaData cfm = CFMetaData.SchemaUserTypesCf;
+        UpdateParameters params = new UpdateParameters(cfm, Collections.<ByteBuffer>emptyList(), timestamp, 0, null);
+        Composite prefix = cfm.comparator.builder().build();
 
         List<ByteBuffer> columnTypes = new ArrayList<>(newType.types.size());
         for (AbstractType<?> type : newType.types)
@@ -93,8 +98,8 @@ public final class UTMetaData
 
         try
         {
-            new Lists.Setter(new ColumnIdentifier("column_names", false), new Lists.Value(newType.columnNames)).execute(newType.name, cf, builder.copy(), params);
-            new Lists.Setter(new ColumnIdentifier("column_types", false), new Lists.Value(columnTypes)).execute(newType.name, cf, builder, params);
+            new Lists.Setter(cfm.getColumnDefinition(COLUMN_NAMES), new Lists.Value(newType.columnNames)).execute(newType.name, cf, prefix, params);
+            new Lists.Setter(cfm.getColumnDefinition(COLUMN_TYPES), new Lists.Value(columnTypes)).execute(newType.name, cf, prefix, params);
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 479f830..0d767b2 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -69,7 +69,7 @@ public class AlterTableStatement
         CFMetaData cfm = meta.clone();
 
         ByteBuffer columnName = this.oType == OperationType.OPTS ? null
-                                                                 : meta.comparator.fromStringCQL2(this.columnName);
+                                                                 : meta.comparator.subtype(0).fromStringCQL2(this.columnName);
 
         switch (oType)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index a140d02..a71707c 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -26,7 +26,7 @@ import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TypeParser;
@@ -123,20 +123,20 @@ public class CreateColumnFamilyStatement
     }
 
     // Column definitions
-    private Map<ByteBuffer, ColumnDefinition> getColumns(CFMetaData cfm) throws InvalidRequestException
+    private List<ColumnDefinition> getColumns(CFMetaData cfm) throws InvalidRequestException
     {
-        Map<ByteBuffer, ColumnDefinition> columnDefs = new HashMap<ByteBuffer, ColumnDefinition>();
+        List<ColumnDefinition> columnDefs = new ArrayList<>(columns.size());
 
         for (Map.Entry<Term, String> col : columns.entrySet())
         {
             try
             {
-                ByteBuffer columnName = cfm.comparator.fromStringCQL2(col.getKey().getText());
+                ByteBuffer columnName = cfm.comparator.asAbstractType().fromStringCQL2(col.getKey().getText());
                 String validatorClassName = CFPropDefs.comparators.containsKey(col.getValue())
                                           ? CFPropDefs.comparators.get(col.getValue())
                                           : col.getValue();
                 AbstractType<?> validator = TypeParser.parse(validatorClassName);
-                columnDefs.put(columnName, ColumnDefinition.regularDef(cfm, columnName, validator, null));
+                columnDefs.add(ColumnDefinition.regularDef(cfm, columnName, validator, null));
             }
             catch (ConfigurationException e)
             {
@@ -175,8 +175,7 @@ public class CreateColumnFamilyStatement
             newCFMD = new CFMetaData(keyspace,
                                      name,
                                      ColumnFamilyType.Standard,
-                                     comparator,
-                                     null);
+                                     new SimpleDenseCellNameType(comparator));
 
             if (CFMetaData.DEFAULT_COMPRESSOR != null && cfProps.compressionParameters.isEmpty())
                 cfProps.compressionParameters.put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
@@ -185,7 +184,8 @@ public class CreateColumnFamilyStatement
             if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)
                 throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead.");
 
-            newCFMD.comment(cfProps.getProperty(CFPropDefs.KW_COMMENT))
+            newCFMD.addAllColumnDefinitions(getColumns(newCFMD))
+                   .comment(cfProps.getProperty(CFPropDefs.KW_COMMENT))
                    .readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
                    .dcLocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
                    .replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
@@ -193,7 +193,6 @@ public class CreateColumnFamilyStatement
                    .defaultValidator(cfProps.getValidator())
                    .minCompactionThreshold(minCompactionThreshold)
                    .maxCompactionThreshold(maxCompactionThreshold)
-                   .columnMetadata(getColumns(newCFMD))
                    .keyValidator(TypeParser.parse(CFPropDefs.comparators.get(getKeyType())))
                    .compactionStrategyClass(cfProps.compactionStrategyClass)
                    .compactionStrategyOptions(cfProps.compactionStrategyOptions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/DeleteStatement.java b/src/java/org/apache/cassandra/cql/DeleteStatement.java
index 0a1f90c..bcc63e1 100644
--- a/src/java/org/apache/cassandra/cql/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql/DeleteStatement.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -100,9 +101,10 @@ public class DeleteStatement extends AbstractModification
         else
         {
             // Delete specific columns
+            AbstractType<?> at = metadata.comparator.asAbstractType();
             for (Term column : columns)
             {
-                ByteBuffer columnName = column.getByteBuffer(metadata.comparator, variables);
+                CellName columnName = metadata.comparator.cellFromByteBuffer(column.getByteBuffer(at, variables));
                 validateColumnName(columnName);
                 rm.delete(columnFamily, columnName, (timestamp == null) ? getTimestamp(clientState) : timestamp);
             }