You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:47 UTC

[11/13] Push composites support in the storage engine

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index ae6c15c..e0e2693 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.service.ClientState;
@@ -91,7 +90,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             throw new InvalidRequestException("Cannot specify index class for a non-CUSTOM index");
 
         // TODO: we could lift that limitation
-        if (cfm.isDense() && cd.kind != ColumnDefinition.Kind.REGULAR)
+        if (cfm.comparator.isDense() && cd.kind != ColumnDefinition.Kind.REGULAR)
             throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.kind, columnName));
 
         if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
@@ -111,7 +110,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         {
             cd.setIndexType(IndexType.CUSTOM, Collections.singletonMap(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, indexClass));
         }
-        else if (cfm.hasCompositeComparator())
+        else if (cfm.comparator.isCompound())
         {
             Map<String, String> options = Collections.emptyMap();
             // For now, we only allow indexing values for collections, but we could later allow
@@ -119,8 +118,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             // lives easier then.
             if (cd.type.isCollection())
                 options = ImmutableMap.of("index_values", "");
-
-            cd.setIndexType(IndexType.COMPOSITES, options);
+            cd.setIndexType(IndexType.COMPOSITES, Collections.<String, String>emptyMap());
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 809e0dc..8f934e3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
@@ -43,7 +44,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 /** A <code>CREATE TABLE</code> parsed from a CQL query statement. */
 public class CreateTableStatement extends SchemaAlteringStatement
 {
-    public AbstractType<?> comparator;
+    public CellNameType comparator;
     private AbstractType<?> defaultValidator;
     private AbstractType<?> keyValidator;
 
@@ -87,22 +88,12 @@ public class CreateTableStatement extends SchemaAlteringStatement
     }
 
     // Column definitions
-    private Map<ByteBuffer, ColumnDefinition> getColumns(CFMetaData cfm)
+    private List<ColumnDefinition> getColumns(CFMetaData cfm)
     {
-        Map<ByteBuffer, ColumnDefinition> columnDefs = new HashMap<ByteBuffer, ColumnDefinition>();
-        Integer componentIndex = null;
-        if (cfm.hasCompositeComparator())
-        {
-            CompositeType ct = (CompositeType) comparator;
-            componentIndex = ct.types.get(ct.types.size() - 1) instanceof ColumnToCollectionType
-                           ? ct.types.size() - 2
-                           : ct.types.size() - 1;
-        }
-
+        List<ColumnDefinition> columnDefs = new ArrayList<>(columns.size());
+        Integer componentIndex = comparator.isCompound() ? comparator.clusteringPrefixSize() : null;
         for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
-        {
-            columnDefs.put(col.getKey().bytes, ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
-        }
+            columnDefs.add(ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
 
         return columnDefs;
     }
@@ -138,8 +129,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
         newCFMD = new CFMetaData(keyspace(),
                                  columnFamily(),
                                  ColumnFamilyType.Standard,
-                                 comparator,
-                                 null);
+                                 comparator);
         applyPropertiesTo(newCFMD);
         return newCFMD;
     }
@@ -148,10 +138,10 @@ public class CreateTableStatement extends SchemaAlteringStatement
     {
         cfmd.defaultValidator(defaultValidator)
             .keyValidator(keyValidator)
-            .columnMetadata(getColumns(cfmd));
+            .addAllColumnDefinitions(getColumns(cfmd));
 
         cfmd.addColumnMetadataFromAliases(keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
-        cfmd.addColumnMetadataFromAliases(columnAliases, comparator, ColumnDefinition.Kind.CLUSTERING_COLUMN);
+        cfmd.addColumnMetadataFromAliases(columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
         if (valueAlias != null)
             cfmd.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
 
@@ -241,15 +231,13 @@ public class CreateTableStatement extends SchemaAlteringStatement
                     if (definedCollections != null)
                         throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
 
-                    stmt.comparator = UTF8Type.instance;
+                    stmt.comparator = new SimpleSparseCellNameType(UTF8Type.instance);
                 }
                 else
                 {
-                    List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(definedCollections == null ? 1 : 2);
-                    types.add(UTF8Type.instance);
-                    if (definedCollections != null)
-                        types.add(ColumnToCollectionType.getInstance(definedCollections));
-                    stmt.comparator = CompositeType.getInstance(types);
+                    stmt.comparator = definedCollections == null
+                                    ? new CompoundSparseCellNameType(Collections.<AbstractType<?>>emptyList())
+                                    : new CompoundSparseCellNameType.WithCollection(Collections.<AbstractType<?>>emptyList(), ColumnToCollectionType.getInstance(definedCollections));
                 }
             }
             else
@@ -261,9 +249,10 @@ public class CreateTableStatement extends SchemaAlteringStatement
                     if (definedCollections != null)
                         throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
                     stmt.columnAliases.add(columnAliases.get(0).bytes);
-                    stmt.comparator = getTypeAndRemove(stmt.columns, columnAliases.get(0));
-                    if (stmt.comparator instanceof CounterColumnType)
+                    AbstractType<?> at = getTypeAndRemove(stmt.columns, columnAliases.get(0));
+                    if (at instanceof CounterColumnType)
                         throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
+                    stmt.comparator = new SimpleDenseCellNameType(at);
                 }
                 else
                 {
@@ -282,19 +271,15 @@ public class CreateTableStatement extends SchemaAlteringStatement
                     {
                         if (definedCollections != null)
                             throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
+
+                        stmt.comparator = new CompoundDenseCellNameType(types);
                     }
                     else
                     {
-                        // For sparse, we must add the last UTF8 component
-                        // and the collection type if there is one
-                        types.add(UTF8Type.instance);
-                        if (definedCollections != null)
-                            types.add(ColumnToCollectionType.getInstance(definedCollections));
+                        stmt.comparator = definedCollections == null
+                                        ? new CompoundSparseCellNameType(types)
+                                        : new CompoundSparseCellNameType.WithCollection(types, ColumnToCollectionType.getInstance(definedCollections));
                     }
-
-                    if (types.isEmpty())
-                        throw new IllegalStateException("Nonsensical empty parameter list for CompositeType");
-                    stmt.comparator = CompositeType.getInstance(types);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index db991c0..b465347 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.Pair;
 
@@ -42,47 +43,40 @@ public class DeleteStatement extends ModificationStatement
         return false;
     }
 
-    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+    public ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params)
     throws InvalidRequestException
     {
         ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
         List<Operation> deletions = getOperations();
 
-        boolean fullKey = builder.componentCount() == cfm.clusteringColumns().size();
-        boolean isRange = cfm.isDense() ? !fullKey : (!fullKey || deletions.isEmpty());
+        if (prefix.size() < cfm.clusteringColumns().size() && !deletions.isEmpty())
+            throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletions.iterator().next().column.name));
 
-        if (!deletions.isEmpty() && isRange)
-            throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletions.get(0).columnName));
-
-        if (deletions.isEmpty() && builder.componentCount() == 0)
+        if (deletions.isEmpty())
         {
-            // No columns specified, delete the row
-            cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
-        }
-        else
-        {
-            if (isRange)
+            // We delete the slice selected by the prefix.
+            // However, for performance reasons, we distinguish 2 cases:
+            //   - It's a full internal row delete
+            //   - It's a full cell name (i.e it's a dense layout and the prefix is full)
+            if (prefix.isEmpty())
             {
-                assert deletions.isEmpty();
-                ByteBuffer start = builder.build();
-                ByteBuffer end = builder.buildAsEndOfRange();
-                cf.addAtom(params.makeRangeTombstone(start, end));
+                // No columns specified, delete the row
+                cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
+            }
+            else if (cfm.comparator.isDense() && prefix.size() == cfm.clusteringColumns().size())
+            {
+                cf.addAtom(params.makeTombstone(cfm.comparator.create(prefix, null)));
             }
             else
             {
-                // Delete specific columns
-                if (cfm.isDense())
-                {
-                    ByteBuffer columnName = builder.build();
-                    cf.addColumn(params.makeTombstone(columnName));
-                }
-                else
-                {
-                    for (Operation deletion : deletions)
-                        deletion.execute(key, cf, builder.copy(), params);
-                }
+                cf.addAtom(params.makeRangeTombstone(prefix.slice()));
             }
         }
+        else
+        {
+            for (Operation op : deletions)
+                op.execute(key, cf, prefix, params);
+        }
 
         return cf;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 25f59c7..2574f73 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -27,10 +27,10 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.exceptions.*;
@@ -72,7 +72,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     }
 
     public abstract boolean requireFullClusteringKey();
-    public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
+    public abstract ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException;
 
     public int getBoundsTerms()
     {
@@ -215,7 +215,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
     throws InvalidRequestException
     {
-        ColumnNameBuilder keyBuilder = cfm.getKeyNameBuilder();
+        CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
         for (ColumnDefinition def : cfm.partitionKeyColumns())
         {
@@ -231,7 +231,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 {
                     if (val == null)
                         throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
-                    keys.add(keyBuilder.copy().add(val).build());
+                    keys.add(keyBuilder.buildWith(val).toByteBuffer());
                 }
             }
             else
@@ -247,10 +247,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return keys;
     }
 
-    public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
+    public Composite createClusteringPrefix(List<ByteBuffer> variables)
     throws InvalidRequestException
     {
-        ColumnNameBuilder builder = cfm.getColumnNameBuilder();
+        CBuilder builder = cfm.comparator.prefixBuilder();
         ColumnDefinition firstEmptyKey = null;
         for (ColumnDefinition def : cfm.clusteringColumns())
         {
@@ -258,7 +258,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             if (r == null)
             {
                 firstEmptyKey = def;
-                if (requireFullClusteringKey() && cfm.hasCompositeComparator() && !cfm.isDense())
+                if (requireFullClusteringKey() && !cfm.comparator.isDense() && cfm.comparator.isCompound())
                     throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
             }
             else if (firstEmptyKey != null)
@@ -275,7 +275,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 builder.add(val);
             }
         }
-        return builder;
+        return builder.build();
     }
 
     protected ColumnDefinition getFirstEmptyKey()
@@ -288,7 +288,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return null;
     }
 
-    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+    protected Map<ByteBuffer, CQL3Row> readRequiredRows(List<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         // Lists SET operation incurs a read.
@@ -299,14 +299,14 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             {
                 if (toRead == null)
                     toRead = new TreeSet<ColumnIdentifier>();
-                toRead.add(op.columnName);
+                toRead.add(op.column.name);
             }
         }
 
-        return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
+        return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, cfm, local, cl);
     }
 
-    private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ColumnIdentifier> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+    protected Map<ByteBuffer, CQL3Row> readRows(List<ByteBuffer> partitionKeys, Composite rowPrefix, Set<ColumnIdentifier> toRead, CFMetaData cfm, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         try
@@ -321,11 +321,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         ColumnSlice[] slices = new ColumnSlice[toRead.size()];
         int i = 0;
         for (ColumnIdentifier name : toRead)
-        {
-            ByteBuffer start = clusteringPrefix.copy().add(name).build();
-            ByteBuffer finish = clusteringPrefix.copy().add(name).buildAsEndOfRange();
-            slices[i++] = new ColumnSlice(start, finish);
-        }
+            slices[i++] = cfm.comparator.create(rowPrefix, name).slice();
 
         List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
         long now = System.currentTimeMillis();
@@ -340,20 +336,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                        ? SelectStatement.readLocally(keyspace(), commands)
                        : StorageProxy.read(commands, cl);
 
-        Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
+        Map<ByteBuffer, CQL3Row> map = new HashMap<ByteBuffer, CQL3Row>();
         for (Row row : rows)
         {
-            if (row.cf == null || row.cf.getColumnCount() == 0)
+            if (row.cf == null || row.cf.isEmpty())
                 continue;
 
-            ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true, now);
-            for (Column column : row.cf)
-                groupBuilder.add(column);
-
-            List<ColumnGroupMap> groups = groupBuilder.groups();
-            assert groups.isEmpty() || groups.size() == 1;
-            if (!groups.isEmpty())
-                map.put(row.key.key, groups.get(0));
+            Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(now).group(row.cf.getSortedColumns().iterator());
+            if (iter.hasNext())
+            {
+                map.put(row.key.key, iter.next());
+                // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
+                assert !iter.hasNext();
+            }
         }
         return map;
     }
@@ -402,7 +397,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         if (keys.size() > 1)
             throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
 
-        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+        Composite clusteringPrefix = createClusteringPrefix(variables);
 
         ByteBuffer key = keys.get(0);
         ThriftValidation.validateKey(cfm, key);
@@ -467,7 +462,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         {
             List<ColumnDefinition> defs = new ArrayList<>(columnConditions.size());
             for (Operation condition : columnConditions)
-                defs.add(cfm.getColumnDefinition(condition.columnName));
+                defs.add(condition.column);
             selection = Selection.forColumns(defs);
         }
 
@@ -503,10 +498,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(variables);
-        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+        Composite clusteringPrefix = createClusteringPrefix(variables);
 
         // Some lists operation requires reading
-        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
+        Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
         UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
 
         Collection<IMutation> mutations = new ArrayList<IMutation>();
@@ -535,7 +530,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return isCounter() ? new CounterMutation(rm, cl) : rm;
     }
 
-    private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, UpdateParameters params)
+    private ColumnFamily buildConditions(ByteBuffer key, Composite clusteringPrefix, UpdateParameters params)
     throws InvalidRequestException
     {
         if (ifNotExists)
@@ -544,15 +539,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
 
         // CQL row marker
-        if (cfm.hasCompositeComparator() && !cfm.isDense() && !cfm.isSuper())
-        {
-            ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
-            cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
-        }
+        if (cfm.isCQL3Table())
+            cf.addColumn(params.makeColumn(cfm.comparator.rowMarker(clusteringPrefix), ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
         // Conditions
         for (Operation condition : columnConditions)
-            condition.execute(key, cf, clusteringPrefix.copy(), params);
+            condition.execute(key, cf, clusteringPrefix, params);
 
         assert !cf.isEmpty();
         return cf;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 62ebd21..d9b4e04 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -29,6 +29,7 @@ import org.github.jamm.MemoryMeter;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -47,7 +48,6 @@ import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Encapsulates a completely parsed SELECT query, including the target
@@ -77,8 +77,9 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     private boolean isKeyRange;
     private boolean keyIsInRelation;
     private boolean usesSecondaryIndexing;
+    private boolean lastClusteringIsIn;
 
-    private Map<ColumnDefinition, Integer> orderingIndexes;
+    private Map<ColumnIdentifier, Integer> orderingIndexes;
 
     // Used by forSelection below
     private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier, Boolean>emptyMap(), false, false, null, false);
@@ -366,13 +367,9 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
         else if (isColumnRange())
         {
-            // For sparse, we used to ask for 'defined columns' * 'asked limit' (where defined columns includes the row marker)
-            // to account for the grouping of columns.
-            // Since that doesn't work for maps/sets/lists, we now use the compositesToGroup option of SliceQueryFilter.
-            // But we must preserve backward compatibility too (for mixed version cluster that is).
-            int toGroup = cfm.isDense() ? -1 : cfm.clusteringColumns().size();
-            List<ByteBuffer> startBounds = getRequestedBound(Bound.START, variables);
-            List<ByteBuffer> endBounds = getRequestedBound(Bound.END, variables);
+            int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
+            List<Composite> startBounds = getRequestedBound(Bound.START, variables);
+            List<Composite> endBounds = getRequestedBound(Bound.END, variables);
             assert startBounds.size() == endBounds.size();
 
             // The case where startBounds == 1 is common enough that it's worth optimizing
@@ -402,7 +399,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
         else
         {
-            SortedSet<ByteBuffer> cellNames = getRequestedColumns(variables);
+            SortedSet<CellName> cellNames = getRequestedColumns(variables);
             if (cellNames == null) // in case of IN () for the last column of the key
                 return null;
             QueryProcessor.validateCellNames(cellNames);
@@ -444,7 +441,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-        ColumnNameBuilder builder = cfm.getKeyNameBuilder();
+        CBuilder builder = cfm.getKeyValidatorAsCType().builder();
         for (ColumnDefinition def : cfm.partitionKeyColumns())
         {
             Restriction r = keyRestrictions[def.position()];
@@ -458,7 +455,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 {
                     if (val == null)
                         throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
-                    keys.add(builder.copy().add(val).build());
+                    keys.add(builder.buildWith(val).toByteBuffer());
                 }
             }
             else
@@ -484,7 +481,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         // We deal with IN queries for keys in other places, so we know buildBound will return only one result
-        return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyNameBuilder(), variables).get(0);
+        return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), variables).get(0).toByteBuffer();
     }
 
     private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
@@ -528,8 +525,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     {
         // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
         // Static CF (non dense but non composite) never entails a column slice however
-        if (!cfm.isDense())
-            return cfm.hasCompositeComparator();
+        if (!cfm.comparator.isDense())
+            return cfm.comparator.isCompound();
 
         // Otherwise (i.e. for compact table where we don't have a row marker anyway and thus don't care about CASSANDRA-5762),
         // it is a range query if it has at least one the column alias for which no relation is defined or is not EQ.
@@ -541,15 +538,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return false;
     }
 
-    private SortedSet<ByteBuffer> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
+    private SortedSet<CellName> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
     {
         assert !isColumnRange();
 
-        ColumnNameBuilder builder = cfm.getColumnNameBuilder();
+        CBuilder builder = cfm.comparator.prefixBuilder();
         Iterator<ColumnDefinition> idIter = cfm.clusteringColumns().iterator();
         for (Restriction r : columnRestrictions)
         {
-            ColumnIdentifier id = idIter.next().name;
+            ColumnDefinition def = idIter.next();
             assert r != null && !r.isSlice();
 
             List<ByteBuffer> values = r.values(variables);
@@ -557,7 +554,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             {
                 ByteBuffer val = values.get(0);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", id));
+                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", def.name));
                 builder.add(val);
             }
             else
@@ -567,32 +564,29 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 // for each value of the IN, creates all the columns corresponding to the selection.
                 if (values.isEmpty())
                     return null;
-                SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfm.comparator);
+                SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
                 Iterator<ByteBuffer> iter = values.iterator();
                 while (iter.hasNext())
                 {
                     ByteBuffer val = iter.next();
-                    ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
                     if (val == null)
-                        throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", id));
-                    b.add(val);
-                    if (cfm.isDense())
-                        columns.add(b.build());
-                    else
-                        columns.addAll(addSelectedColumns(b));
+                        throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", def.name));
+
+                    Composite prefix = builder.buildWith(val);
+                    columns.addAll(addSelectedColumns(prefix));
                 }
                 return columns;
             }
         }
 
-        return addSelectedColumns(builder);
+        return addSelectedColumns(builder.build());
     }
 
-    private SortedSet<ByteBuffer> addSelectedColumns(ColumnNameBuilder builder)
+    private SortedSet<CellName> addSelectedColumns(Composite prefix)
     {
-        if (cfm.isDense())
+        if (cfm.comparator.isDense())
         {
-            return FBUtilities.singleton(builder.build());
+            return FBUtilities.singleton(cfm.comparator.create(prefix, null), cfm.comparator);
         }
         else
         {
@@ -600,31 +594,26 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             // non-know set of columns, so we shouldn't get there
             assert !selectACollection();
 
-            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfm.comparator);
+            SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
 
             // We need to query the selected column as well as the marker
             // column (for the case where the row exists but has no columns outside the PK)
             // Two exceptions are "static CF" (non-composite non-compact CF) and "super CF"
             // that don't have marker and for which we must query all columns instead
-            if (cfm.hasCompositeComparator() && !cfm.isSuper())
+            if (cfm.comparator.isCompound() && !cfm.isSuper())
             {
                 // marker
-                columns.add(builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build());
+                columns.add(cfm.comparator.rowMarker(prefix));
 
                 // selected columns
-                for (ColumnIdentifier id : selection.regularColumnsToFetch())
-                    columns.add(builder.copy().add(id).build());
+                for (ColumnDefinition def : selection.getColumnsList())
+                    if (def.kind == ColumnDefinition.Kind.REGULAR)
+                        columns.add(cfm.comparator.create(prefix, def.name));
             }
             else
             {
-                Iterator<ColumnDefinition> iter = cfm.regularColumns().iterator();
-                while (iter.hasNext())
-                {
-                    ColumnDefinition def = iter.next();
-                    ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
-                    ByteBuffer cname = b.add(def.name).build();
-                    columns.add(cname);
-                }
+                for (ColumnDefinition def : cfm.regularColumns())
+                    columns.add(cfm.comparator.create(prefix, def.name));
             }
             return columns;
         }
@@ -632,7 +621,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
     private boolean selectACollection()
     {
-        if (!cfm.hasCollections())
+        if (!cfm.comparator.hasCollections())
             return false;
 
         for (ColumnDefinition def : selection.getColumnsList())
@@ -644,13 +633,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return false;
     }
 
-    private List<ByteBuffer> buildBound(Bound bound,
-                                        Collection<ColumnDefinition> defs,
-                                        Restriction[] restrictions,
-                                        boolean isReversed,
-                                        ColumnNameBuilder builder,
-                                        List<ByteBuffer> variables) throws InvalidRequestException
+    private static List<Composite> buildBound(Bound bound,
+                                              Collection<ColumnDefinition> defs,
+                                              Restriction[] restrictions,
+                                              boolean isReversed,
+                                              CType type,
+                                              List<ByteBuffer> variables) throws InvalidRequestException
     {
+        CBuilder builder = type.builder();
+
         // The end-of-component of composite doesn't depend on whether the
         // component type is reversed or not (i.e. the ReversedType is applied
         // to the component comparator but not to the end-of-component itself),
@@ -668,9 +659,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 // There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
                 // For composites, if there was preceding component and we're computing the end, we must change the last component
                 // End-Of-Component, otherwise we would be selecting only one record.
-                return Collections.singletonList(builder.componentCount() > 0 && eocBound == Bound.END
-                                                 ? builder.buildAsEndOfRange()
-                                                 : builder.build());
+                Composite prefix = builder.build();
+                return Collections.singletonList(!prefix.isEmpty() && eocBound == Bound.END ? prefix.end() : prefix);
             }
 
             if (r.isSlice())
@@ -680,7 +670,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 ByteBuffer val = slice.bound(b, variables);
                 if (val == null)
                     throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
-                return Collections.singletonList(builder.add(val, slice.getRelation(eocBound, b)).build());
+                return Collections.singletonList(builder.add(val).build().withEOC(eocForRelation(slice.getRelation(eocBound, b))));
             }
             else
             {
@@ -691,16 +681,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     assert def.position() == defs.size() - 1;
                     // The IN query might not have listed the values in comparator order, so we need to re-sort
                     // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
-                    TreeSet<ByteBuffer> s = new TreeSet<ByteBuffer>(isReversed ? cfm.comparator.reverseComparator : cfm.comparator);
+                    TreeSet<Composite> s = new TreeSet<Composite>(isReversed ? type.reverseComparator() : type);
                     for (ByteBuffer val : values)
                     {
                         if (val == null)
                             throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
-                        ColumnNameBuilder copy = builder.copy().add(val);
+                        Composite prefix = builder.buildWith(val);
                         // See below for why this
-                        s.add((bound == Bound.END && copy.remainingCount() > 0) ? copy.buildAsEndOfRange() : copy.build());
+                        s.add((bound == Bound.END && builder.remainingCount() > 0) ? prefix.end() : prefix);
                     }
-                    return new ArrayList<ByteBuffer>(s);
+                    return new ArrayList<Composite>(s);
                 }
 
                 ByteBuffer val = values.get(0);
@@ -714,14 +704,34 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         // it would be harmless to do it. However, we use this method got the partition key too. And when a query
         // with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
         // case using the eoc would be bad, since for the random partitioner we have no guarantee that
-        // builder.buildAsEndOfRange() will sort after builder.build() (see #5240).
-        return Collections.singletonList((bound == Bound.END && builder.remainingCount() > 0) ? builder.buildAsEndOfRange() : builder.build());
+        // prefix.end() will sort after prefix (see #5240).
+        Composite prefix = builder.build();
+        return Collections.singletonList(bound == Bound.END && builder.remainingCount() > 0 ? prefix.end() : prefix);
     }
 
-    private List<ByteBuffer> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+    private static Composite.EOC eocForRelation(Relation.Type op)
+    {
+        switch (op)
+        {
+            case LT:
+                // < X => using startOf(X) as finish bound
+                return Composite.EOC.START;
+            case GT:
+            case LTE:
+                // > X => using endOf(X) as start bound
+                // <= X => using endOf(X) as finish bound
+                return Composite.EOC.END;
+            default:
+                // >= X => using X as start bound (could use START_OF too)
+                // = X => using X
+                return Composite.EOC.NONE;
+        }
+    }
+
+    private List<Composite> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
     {
         assert isColumnRange();
-        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.getColumnNameBuilder(), variables);
+        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, variables);
     }
 
     public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
@@ -798,46 +808,30 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return value;
     }
 
-    private Iterable<Column> columnsInOrder(final ColumnFamily cf, final List<ByteBuffer> variables) throws InvalidRequestException
+    private Iterator<Column> applySliceRestriction(final Iterator<Column> cells, final List<ByteBuffer> variables) throws InvalidRequestException
     {
-        if (columnRestrictions.length == 0)
-            return cf.getSortedColumns();
-
-        // If the restriction for the last column alias is an IN, respect
-        // requested order
-        Restriction last = columnRestrictions[columnRestrictions.length - 1];
-        if (last == null || last.isSlice())
-            return cf.getSortedColumns();
-
-        ColumnNameBuilder builder = cfm.getColumnNameBuilder();
-        for (int i = 0; i < columnRestrictions.length - 1; i++)
-            builder.add(columnRestrictions[i].values(variables).get(0));
-
-        List<ByteBuffer> values = last.values(variables);
-        final List<ByteBuffer> requested = new ArrayList<ByteBuffer>(values.size());
-        Iterator<ByteBuffer> iter = values.iterator();
-        while (iter.hasNext())
-        {
-            ByteBuffer t = iter.next();
-            ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
-            requested.add(b.add(t).build());
-        }
+        assert sliceRestriction != null;
+
+        final CellNameType type = cfm.comparator;
+        final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, variables));
+        final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, variables));
 
-        return new Iterable<Column>()
+        return new AbstractIterator<Column>()
         {
-            public Iterator<Column> iterator()
+            protected Column computeNext()
             {
-                return new AbstractIterator<Column>()
-                {
-                    Iterator<ByteBuffer> iter = requested.iterator();
-                    public Column computeNext()
-                    {
-                        if (!iter.hasNext())
-                            return endOfData();
-                        Column column = cf.getColumn(iter.next());
-                        return column == null ? computeNext() : column;
-                    }
-                };
+                if (!cells.hasNext())
+                    return endOfData();
+
+                Column c = cells.next();
+
+                // For dynamic CF, the column could be out of the requested bounds (because we don't support strict bounds internally (unless
+                // the comparator is composite that is)), filter here
+                if ( (excludedStart != null && type.compare(c.name(), excludedStart) == 0)
+                  || (excludedEnd != null && type.compare(c.name(), excludedEnd) == 0) )
+                    return computeNext();
+
+                return c;
             }
         };
     }
@@ -856,7 +850,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
         ResultSet cqlRows = result.build();
 
-        orderResults(cqlRows);
+        orderResults(cqlRows, variables);
 
         // Internal calls always return columns in the comparator order, even when reverse was set
         if (isReversed)
@@ -871,177 +865,131 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     void processColumnFamily(ByteBuffer key, ColumnFamily cf, List<ByteBuffer> variables, long now, Selection.ResultSetBuilder result)
     throws InvalidRequestException
     {
-        ByteBuffer[] keyComponents = cfm.getKeyValidator() instanceof CompositeType
-                                   ? ((CompositeType)cfm.getKeyValidator()).split(key)
-                                   : new ByteBuffer[]{ key };
-
-        if (parameters.isDistinct)
+        CFMetaData cfm = cf.metadata();
+        ByteBuffer[] keyComponents = null;
+        if (cfm.getKeyValidator() instanceof CompositeType)
         {
-            if (!cf.hasOnlyTombstones(now))
-            {
-                result.newRow();
-                // selection.getColumnsList() will contain only the partition key components - all of them.
-                for (ColumnDefinition def : selection.getColumnsList())
-                    result.add(keyComponents[def.position()]);
-            }
+            keyComponents = ((CompositeType)cfm.getKeyValidator()).split(key);
         }
-        else if (cfm.isDense())
+        else
         {
-            // One cqlRow per column
-            for (Column c : columnsInOrder(cf, variables))
-            {
-                if (c.isMarkedForDelete(now))
-                    continue;
-
-                ByteBuffer[] components = null;
-                if (cfm.hasCompositeComparator())
-                {
-                    components = ((CompositeType)cfm.comparator).split(c.name());
-                }
-                else if (sliceRestriction != null)
-                {
-                    // For dynamic CF, the column could be out of the requested bounds, filter here
-                    if (!sliceRestriction.isInclusive(Bound.START) && c.name().equals(sliceRestriction.bound(Bound.START, variables)))
-                        continue;
-                    if (!sliceRestriction.isInclusive(Bound.END) && c.name().equals(sliceRestriction.bound(Bound.END, variables)))
-                        continue;
-                }
-
-                result.newRow();
-                // Respect selection order
-                for (ColumnDefinition def : selection.getColumnsList())
-                {
-                    switch (def.kind)
-                    {
-                        case PARTITION_KEY:
-                            result.add(keyComponents[def.position()]);
-                            break;
-                        case CLUSTERING_COLUMN:
-                            ByteBuffer val = cfm.hasCompositeComparator()
-                                           ? (def.position() < components.length ? components[def.position()] : null)
-                                           : c.name();
-                            result.add(val);
-                            break;
-                        case COMPACT_VALUE:
-                            result.add(c);
-                            break;
-                        case REGULAR:
-                            // This should not happen for compact CF
-                            throw new AssertionError();
-                        default:
-                            throw new AssertionError();
-                    }
-                }
-            }
+            keyComponents = new ByteBuffer[]{ key };
         }
-        else if (cfm.hasCompositeComparator())
-        {
-            // Sparse case: group column in cqlRow when composite prefix is equal
-            CompositeType composite = (CompositeType)cfm.comparator;
 
-            ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfm.hasCollections(), now);
+        Iterator<Column> cells = cf.getSortedColumns().iterator();
+        if (sliceRestriction != null)
+            cells = applySliceRestriction(cells, variables);
 
-            for (Column c : cf)
-            {
-                if (c.isMarkedForDelete(now))
-                    continue;
-
-                builder.add(c);
-            }
-
-            for (ColumnGroupMap group : builder.groups())
-                handleGroup(selection, result, keyComponents, group);
-        }
-        else
+        for (Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(now).group(cells); iter.hasNext();)
         {
-            if (cf.hasOnlyTombstones(now))
-                return;
+            CQL3Row cql3Row = iter.next();
 
-            // Static case: One cqlRow for all columns
+            // Respect requested order
             result.newRow();
+            // Respect selection order
             for (ColumnDefinition def : selection.getColumnsList())
             {
-                if (def.kind == ColumnDefinition.Kind.PARTITION_KEY)
-                    result.add(keyComponents[def.position()]);
-                else
-                    result.add(cf.getColumn(def.name.bytes));
-            }
+                switch (def.kind)
+                {
+                    case PARTITION_KEY:
+                        result.add(keyComponents[def.position()]);
+                        break;
+                    case CLUSTERING_COLUMN:
+                        result.add(cql3Row.getClusteringColumn(def.position()));
+                        break;
+                    case COMPACT_VALUE:
+                        result.add(cql3Row.getColumn(null));
+                        break;
+                    case REGULAR:
+                        if (def.type.isCollection())
+                        {
+                            List<Column> collection = cql3Row.getCollection(def.name);
+                            ByteBuffer value = collection == null
+                                             ? null
+                                             : ((CollectionType)def.type).serialize(collection);
+                            result.add(value);
+                        }
+                        else
+                        {
+                            result.add(cql3Row.getColumn(def.name));
+                        }
+                        break;
+                    }
+                }
         }
     }
 
     /**
      * Orders results when multiple keys are selected (using IN)
      */
-    private void orderResults(ResultSet cqlRows)
+    private void orderResults(ResultSet cqlRows, List<ByteBuffer> variables) throws InvalidRequestException
     {
-        // There is nothing to do if
-        //   a. there are no results,
-        //   b. no ordering information where given,
-        //   c. key restriction is a Range or not an IN expression
-        if (cqlRows.size() == 0 || parameters.orderings.isEmpty() || isKeyRange || !keyIsInRelation)
+        if (cqlRows.size() == 0)
+            return;
+
+        /*
+         * We need to do post-query ordering in 2 cases:
+         *   1) if the last clustering key is restricted by a IN.
+         *   2) if the row key is restricted by a IN and there is some ORDER BY values
+         */
+        if (!(lastClusteringIsIn || (keyIsInRelation && parameters.orderings.size() > 0)))
             return;
 
         assert orderingIndexes != null;
 
-        // optimization when only *one* order condition was given
-        // because there is no point of using composite comparator if there is only one order condition
-        if (parameters.orderings.size() == 1)
+        List<Integer> idToSort = new ArrayList<Integer>();
+        List<Comparator<ByteBuffer>> sorters = new ArrayList<Comparator<ByteBuffer>>();
+
+        // If the restriction for the last clustering key is an IN, respect requested order
+        if (lastClusteringIsIn)
         {
-            ColumnDefinition ordering = cfm.getColumnDefinition(parameters.orderings.keySet().iterator().next());
-            Collections.sort(cqlRows.rows, new SingleColumnComparator(orderingIndexes.get(ordering), ordering.type));
-            return;
+            List<ColumnDefinition> cc = cfm.clusteringColumns();
+            idToSort.add(orderingIndexes.get(cc.get(cc.size() - 1).name));
+            Restriction last = columnRestrictions[columnRestrictions.length - 1];
+            sorters.add(makeComparatorFor(last.values(variables)));
         }
 
-        // builds a 'composite' type for multi-column comparison from the comparators of the ordering components
-        // and passes collected position information and built composite comparator to CompositeComparator to do
-        // an actual comparison of the CQL rows.
-        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(parameters.orderings.size());
-        int[] positions = new int[parameters.orderings.size()];
-
-        int idx = 0;
+        // Then add the order by
         for (ColumnIdentifier identifier : parameters.orderings.keySet())
         {
             ColumnDefinition orderingColumn = cfm.getColumnDefinition(identifier);
-            types.add(orderingColumn.type);
-            positions[idx++] = orderingIndexes.get(orderingColumn);
+            idToSort.add(orderingIndexes.get(orderingColumn.name));
+            sorters.add(orderingColumn.type);
         }
 
-        Collections.sort(cqlRows.rows, new CompositeComparator(types, positions));
+        Comparator<List<ByteBuffer>> comparator = idToSort.size() == 1
+                                                ? new SingleColumnComparator(idToSort.get(0), sorters.get(0))
+                                                : new CompositeComparator(sorters, idToSort);
+        Collections.sort(cqlRows.rows, comparator);
     }
 
-    private void handleGroup(Selection selection, Selection.ResultSetBuilder result, ByteBuffer[] keyComponents, ColumnGroupMap columns) throws InvalidRequestException
+    // Comparator used when the last clustering key is an IN, to sort result
+    // rows in the order of the values provided for the IN.
+    private Comparator<ByteBuffer> makeComparatorFor(final List<ByteBuffer> values)
     {
-        // Respect requested order
-        result.newRow();
-        for (ColumnDefinition def : selection.getColumnsList())
+        // This may not always be the most efficient, but it probably is if
+        // values is small, which is likely to be the most common case.
+        return new Comparator<ByteBuffer>()
         {
-            switch (def.kind)
+            public int compare(ByteBuffer b1, ByteBuffer b2)
             {
-                case PARTITION_KEY:
-                    result.add(keyComponents[def.position()]);
-                    break;
-                case CLUSTERING_COLUMN:
-                    result.add(columns.getKeyComponent(def.position()));
-                    break;
-                case COMPACT_VALUE:
-                    // This should not happen for SPARSE
-                    throw new AssertionError();
-                case REGULAR:
-                    if (def.type.isCollection())
-                    {
-                        List<Pair<ByteBuffer, Column>> collection = columns.getCollection(def.name.bytes);
-                        ByteBuffer value = collection == null
-                                         ? null
-                                         : ((CollectionType)def.type).serialize(collection);
-                        result.add(value);
-                    }
-                    else
-                    {
-                        result.add(columns.getSimple(def.name.bytes));
-                    }
-                    break;
+                int idx1 = -1;
+                int idx2 = -1;
+                for (int i = 0; i < values.size(); i++)
+                {
+                    ByteBuffer bb = values.get(i);
+                    if (bb.equals(b1))
+                        idx1 = i;
+                    if (bb.equals(b2))
+                        idx2 = i;
+
+                    if (idx1 >= 0 && idx2 >= 0)
+                        break;
+                }
+                assert idx1 >= 0 && idx2 >= 0 : "Got CQL3 row that was not queried in resultset";
+                return idx1 - idx2;
             }
-        }
+        };
     }
 
     private static boolean isReversedType(ColumnDefinition def)
@@ -1263,7 +1211,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     Restriction.Slice slice = (Restriction.Slice)restriction;
                     // For non-composite slices, we don't support internally the difference between exclusive and
                     // inclusive bounds, so we deal with it manually.
-                    if (!cfm.hasCompositeComparator() && (!slice.isInclusive(Bound.START) || !slice.isInclusive(Bound.END)))
+                    if (!cfm.comparator.isCompound() && (!slice.isInclusive(Bound.START) || !slice.isInclusive(Bound.END)))
                         stmt.sliceRestriction = slice;
                 }
                 else if (restriction.isIN())
@@ -1274,6 +1222,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cdef.name));
                     else if (stmt.selectACollection())
                         throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cdef.name));
+                    stmt.lastClusteringIsIn = true;
                 }
 
                 previous = cdef;
@@ -1311,7 +1260,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 // queried automatically, and then removing it from the resultSet afterwards if needed)
                 if (stmt.keyIsInRelation)
                 {
-                    stmt.orderingIndexes = new HashMap<ColumnDefinition, Integer>();
+                    stmt.orderingIndexes = new HashMap<ColumnIdentifier, Integer>();
                     for (ColumnIdentifier column : stmt.parameters.orderings.keySet())
                     {
                         final ColumnDefinition def = cfm.getColumnDefinition(column);
@@ -1325,14 +1274,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
                         if (selectClause.isEmpty()) // wildcard
                         {
-                            stmt.orderingIndexes.put(def, Iterators.indexOf(cfm.allColumnsInSelectOrder(),
-                                                                            new Predicate<ColumnDefinition>()
-                                                                            {
-                                                                                public boolean apply(ColumnDefinition n)
-                                                                                {
-                                                                                    return def.equals(n);
-                                                                                }
-                                                                            }));
+                            stmt.orderingIndexes.put(def.name, indexOf(def, cfm.allColumnsInSelectOrder()));
                         }
                         else
                         {
@@ -1342,7 +1284,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                 RawSelector selector = selectClause.get(i);
                                 if (def.name.equals(selector.selectable))
                                 {
-                                    stmt.orderingIndexes.put(def, i);
+                                    stmt.orderingIndexes.put(def.name, i);
                                     hasColumn = true;
                                     break;
                                 }
@@ -1399,6 +1341,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 stmt.isReversed = isReversed;
             }
 
+            if (stmt.lastClusteringIsIn)
+            {
+                // This means we'll have to do post-query reordering, so update the orderingIndexes
+                if (stmt.orderingIndexes == null)
+                    stmt.orderingIndexes = new HashMap<ColumnIdentifier, Integer>();
+
+                ColumnDefinition last = cfm.clusteringColumns().get(cfm.clusteringColumns().size() - 1);
+                stmt.orderingIndexes.put(last.name, indexOf(last, stmt.selection.getColumnsList().iterator()));
+            }
+
             // Make sure this queries is allowed (note: non key range non indexed cannot involve filtering underneath)
             if (!parameters.allowFiltering && (stmt.isKeyRange || stmt.usesSecondaryIndexing))
             {
@@ -1413,6 +1365,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             return new ParsedStatement.Prepared(stmt, names);
         }
 
+        private int indexOf(final ColumnDefinition def, Iterator<ColumnDefinition> defs)
+        {
+            return Iterators.indexOf(defs, new Predicate<ColumnDefinition>()
+                                           {
+                                               public boolean apply(ColumnDefinition n)
+                                               {
+                                                   return def.name.equals(n.name);
+                                               }
+                                           });
+        }
+
         private void validateDistinctSelection(Collection<ColumnDefinition> requestedColumns, Collection<ColumnDefinition> partitionKey)
         throws InvalidRequestException
         {
@@ -1584,9 +1547,9 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     private static class SingleColumnComparator implements Comparator<List<ByteBuffer>>
     {
         private final int index;
-        private final AbstractType<?> comparator;
+        private final Comparator<ByteBuffer> comparator;
 
-        public SingleColumnComparator(int columnIndex, AbstractType<?> orderer)
+        public SingleColumnComparator(int columnIndex, Comparator<ByteBuffer> orderer)
         {
             index = columnIndex;
             comparator = orderer;
@@ -1603,10 +1566,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
      */
     private static class CompositeComparator implements Comparator<List<ByteBuffer>>
     {
-        private final List<AbstractType<?>> orderTypes;
-        private final int[] positions;
+        private final List<Comparator<ByteBuffer>> orderTypes;
+        private final List<Integer> positions;
 
-        private CompositeComparator(List<AbstractType<?>> orderTypes, int[] positions)
+        private CompositeComparator(List<Comparator<ByteBuffer>> orderTypes, List<Integer> positions)
         {
             this.orderTypes = orderTypes;
             this.positions = positions;
@@ -1614,10 +1577,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
         public int compare(List<ByteBuffer> a, List<ByteBuffer> b)
         {
-            for (int i = 0; i < positions.length; i++)
+            for (int i = 0; i < positions.size(); i++)
             {
-                AbstractType<?> type = orderTypes.get(i);
-                int columnPos = positions[i];
+                Comparator<ByteBuffer> type = orderTypes.get(i);
+                int columnPos = positions.get(i);
 
                 ByteBuffer aValue = a.get(columnPos);
                 ByteBuffer bValue = b.get(columnPos);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index a578f3f..6a8cfe6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -235,20 +235,6 @@ public abstract class Selection
     protected abstract List<ByteBuffer> handleRow(ResultSetBuilder rs) throws InvalidRequestException;
 
     /**
-     * @return the list of CQL3 "regular" (the "COLUMN_METADATA" ones) column names to fetch.
-     */
-    public List<ColumnIdentifier> regularColumnsToFetch()
-    {
-        List<ColumnIdentifier> toFetch = new ArrayList<ColumnIdentifier>();
-        for (ColumnDefinition def : columnsList)
-        {
-            if (def.kind == ColumnDefinition.Kind.REGULAR)
-                toFetch.add(def.name);
-        }
-        return toFetch;
-    }
-
-    /**
      * @return the list of CQL3 columns value this SelectionClause needs.
      */
     public List<ColumnDefinition> getColumnsList()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 7a8340d..6cf0856 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -35,7 +35,7 @@ import org.apache.cassandra.utils.Pair;
  */
 public class UpdateStatement extends ModificationStatement
 {
-    private static final Operation setToEmptyOperation = new Constants.Setter(null, new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER));
+    private static final Constants.Value EMPTY = new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER);
 
     private UpdateStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
     {
@@ -47,7 +47,7 @@ public class UpdateStatement extends ModificationStatement
         return true;
     }
 
-    public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+    public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, Composite prefix, UpdateParameters params)
     throws InvalidRequestException
     {
         // Inserting the CQL row marker (see #4361)
@@ -61,17 +61,14 @@ public class UpdateStatement extends ModificationStatement
         // 'DELETE FROM t WHERE k = 1' does remove the row entirely)
         //
         // We never insert markers for Super CF as this would confuse the thrift side.
-        if (cfm.hasCompositeComparator() && !cfm.isDense() && !cfm.isSuper())
-        {
-            ByteBuffer name = builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
-            cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
-        }
+        if (cfm.isCQL3Table())
+            cf.addColumn(params.makeColumn(cfm.comparator.rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
         List<Operation> updates = getOperations();
 
-        if (cfm.isDense())
+        if (cfm.comparator.isDense())
         {
-            if (builder.componentCount() == 0)
+            if (prefix.isEmpty())
                 throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfm.clusteringColumns().iterator().next()));
 
             // An empty name for the compact value is what we use to recognize the case where there is not column
@@ -80,7 +77,7 @@ public class UpdateStatement extends ModificationStatement
             {
                 // There is no column outside the PK. So no operation could have passed through validation
                 assert updates.isEmpty();
-                setToEmptyOperation.execute(key, cf, builder.copy(), params);
+                new Constants.Setter(cfm.compactValueColumn(), EMPTY).execute(key, cf, prefix, params);
             }
             else
             {
@@ -89,21 +86,21 @@ public class UpdateStatement extends ModificationStatement
                     throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfm.compactValueColumn().name));
 
                 for (Operation update : updates)
-                    update.execute(key, cf, builder.copy(), params);
+                    update.execute(key, cf, prefix, params);
             }
         }
         else
         {
             for (Operation update : updates)
-                update.execute(key, cf, builder.copy(), params);
+                update.execute(key, cf, prefix, params);
         }
     }
 
-    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+    public ColumnFamily updateForKey(ByteBuffer key, Composite prefix, UpdateParameters params)
     throws InvalidRequestException
     {
         ColumnFamily cf = UnsortedColumns.factory.create(cfm);
-        addUpdateForKey(cf, key, builder, params);
+        addUpdateForKey(cf, key, prefix, params);
         return cf;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index d2825c1..432b47e 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.base.Function;
@@ -26,8 +25,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
 /**
@@ -79,12 +80,12 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         return reversed;
     }
 
-    private Comparator<ByteBuffer> internalComparator()
+    private Comparator<Composite> internalComparator()
     {
-        return reversed ? getComparator().reverseComparator : getComparator();
+        return reversed ? getComparator().reverseComparator() : getComparator();
     }
 
-    public Column getColumn(ByteBuffer name)
+    public Column getColumn(CellName name)
     {
         int pos = binarySearch(name);
         return pos >= 0 ? columns.get(pos) : null;
@@ -147,7 +148,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         columns.set(i, reconciledColumn);
     }
 
-    private int binarySearch(ByteBuffer name)
+    private int binarySearch(CellName name)
     {
         return binarySearch(columns, internalComparator(), name, 0);
     }
@@ -158,7 +159,7 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
      * (We don't use Collections.binarySearch() directly because it would require us to create
      * a fake Column (as well as an Column comparator) to do the search, which is ugly.
      */
-    private static int binarySearch(List<Column> columns, Comparator<ByteBuffer> comparator, ByteBuffer name, int start)
+    private static int binarySearch(List<Column> columns, Comparator<Composite> comparator, Composite name, int start)
     {
         int low = start;
         int mid = columns.size();
@@ -266,11 +267,11 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
         columns.clear();
     }
 
-    public Iterable<ByteBuffer> getColumnNames()
+    public Iterable<CellName> getColumnNames()
     {
-        return Iterables.transform(columns, new Function<Column, ByteBuffer>()
+        return Iterables.transform(columns, new Function<Column, CellName>()
         {
-            public ByteBuffer apply(Column column)
+            public CellName apply(Column column)
             {
                 return column.name;
             }
@@ -296,17 +297,17 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
     {
         private final List<Column> list;
         private final ColumnSlice[] slices;
-        private final Comparator<ByteBuffer> comparator;
+        private final Comparator<Composite> comparator;
 
         private int idx = 0;
         private int previousSliceEnd = 0;
         private Iterator<Column> currentSlice;
 
-        public SlicesIterator(List<Column> list, AbstractType<?> comparator, ColumnSlice[] slices, boolean reversed)
+        public SlicesIterator(List<Column> list, CellNameType comparator, ColumnSlice[] slices, boolean reversed)
         {
             this.list = reversed ? Lists.reverse(list) : list;
             this.slices = slices;
-            this.comparator = reversed ? comparator.reverseComparator : comparator;
+            this.comparator = reversed ? comparator.reverseComparator() : comparator;
         }
 
         protected Column computeNext()
@@ -318,12 +319,12 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
 
                 ColumnSlice slice = slices[idx++];
                 // The first idx to include
-                int startIdx = slice.start.remaining() == 0 ? 0 : binarySearch(list, comparator, slice.start, previousSliceEnd);
+                int startIdx = slice.start.isEmpty() ? 0 : binarySearch(list, comparator, slice.start, previousSliceEnd);
                 if (startIdx < 0)
                     startIdx = -startIdx - 1;
 
                 // The first idx to exclude
-                int finishIdx = slice.finish.remaining() == 0 ? list.size() - 1 : binarySearch(list, comparator, slice.finish, previousSliceEnd);
+                int finishIdx = slice.finish.isEmpty() ? list.size() - 1 : binarySearch(list, comparator, slice.finish, previousSliceEnd);
                 if (finishIdx >= 0)
                     finishIdx++;
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomDeserializer.java b/src/java/org/apache/cassandra/db/AtomDeserializer.java
new file mode 100644
index 0000000..799ed0e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Helper class to deserialize OnDiskAtom efficiently.
+ *
+ * More precisely, this class is used by the low-level readers
+ * (IndexedSliceReader and SSTableNamesIterator) to ensure we don't
+ * do more work than necessary (i.e. we don't allocate/deserialize
+ * objects for things we don't care about).
+ */
+public class AtomDeserializer
+{
+    private final CellNameType type;
+    private final CellNameType.Deserializer nameDeserializer;
+    private final DataInput in;
+    private final ColumnSerializer.Flag flag;
+    private final int expireBefore;
+    private final Descriptor.Version version;
+
+    public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
+    {
+        this.type = type;
+        this.nameDeserializer = type.newDeserializer(in);
+        this.in = in;
+        this.flag = flag;
+        this.expireBefore = expireBefore;
+        this.version = version;
+    }
+
+    /**
+     * Whether or not there is more atom to read.
+     */
+    public boolean hasNext() throws IOException
+    {
+        return nameDeserializer.hasNext();
+    }
+
+    /**
+     * Whether or not some atom has been read but not processed (neither readNext() nor
+     * skipNext() has been called for that atom) yet.
+     */
+    public boolean hasUnprocessed() throws IOException
+    {
+        return nameDeserializer.hasUnprocessed();
+    }
+
+    /**
+     * Compare the provided composite to the next atom to read on disk.
+     *
+     * This will not read/deserialize the whole atom but only what is necessary for the
+     * comparison. Whenever we know what to do with this atom (read it or skip it),
+     * readNext or skipNext should be called.
+     */
+    public int compareNextTo(Composite composite) throws IOException
+    {
+        return nameDeserializer.compareNextTo(composite);
+    }
+
+    /**
+     * Returns the next atom.
+     */
+    public OnDiskAtom readNext() throws IOException
+    {
+        Composite name = nameDeserializer.readNext();
+        assert !name.isEmpty(); // This would imply hasNext() hasn't been called
+        int b = in.readUnsignedByte();
+        if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
+            return type.rangeTombstoneSerializer().deserializeBody(in, name, version);
+        else
+            return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore);
+    }
+
+    /**
+     * Skips the next atom.
+     */
+    public void skipNext() throws IOException
+    {
+        nameDeserializer.skipNext();
+        int b = in.readUnsignedByte();
+        if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
+            type.rangeTombstoneSerializer().skipBody(in, version);
+        else
+            type.columnSerializer().skipColumnBody(in, b);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index b44d8bf..5056c26 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -27,9 +26,10 @@ import com.google.common.collect.Iterables;
 import edu.stanford.ppl.concurrent.SnapTreeMap;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
 /**
@@ -70,9 +70,9 @@ public class AtomicSortedColumns extends ColumnFamily
         this.ref = new AtomicReference<>(holder);
     }
 
-    public AbstractType<?> getComparator()
+    public CellNameType getComparator()
     {
-        return (AbstractType<?>)ref.get().map.comparator();
+        return (CellNameType)ref.get().map.comparator();
     }
 
     public ColumnFamily.Factory getFactory()
@@ -233,12 +233,12 @@ public class AtomicSortedColumns extends ColumnFamily
         while (!ref.compareAndSet(current, modified));
     }
 
-    public Column getColumn(ByteBuffer name)
+    public Column getColumn(CellName name)
     {
         return ref.get().map.get(name);
     }
 
-    public SortedSet<ByteBuffer> getColumnNames()
+    public SortedSet<CellName> getColumnNames()
     {
         return ref.get().map.keySet();
     }
@@ -279,15 +279,15 @@ public class AtomicSortedColumns extends ColumnFamily
         // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
         private static final DeletionInfo LIVE = DeletionInfo.live();
 
-        final SnapTreeMap<ByteBuffer, Column> map;
+        final SnapTreeMap<CellName, Column> map;
         final DeletionInfo deletionInfo;
 
-        Holder(AbstractType<?> comparator)
+        Holder(CellNameType comparator)
         {
-            this(new SnapTreeMap<ByteBuffer, Column>(comparator), LIVE);
+            this(new SnapTreeMap<CellName, Column>(comparator), LIVE);
         }
 
-        Holder(SnapTreeMap<ByteBuffer, Column> map, DeletionInfo deletionInfo)
+        Holder(SnapTreeMap<CellName, Column> map, DeletionInfo deletionInfo)
         {
             this.map = map;
             this.deletionInfo = deletionInfo;
@@ -303,7 +303,7 @@ public class AtomicSortedColumns extends ColumnFamily
             return new Holder(map, info);
         }
 
-        Holder with(SnapTreeMap<ByteBuffer, Column> newMap)
+        Holder with(SnapTreeMap<CellName, Column> newMap)
         {
             return new Holder(newMap, deletionInfo);
         }
@@ -312,12 +312,12 @@ public class AtomicSortedColumns extends ColumnFamily
         // afterwards.
         Holder clear()
         {
-            return new Holder(new SnapTreeMap<ByteBuffer, Column>(map.comparator()), LIVE);
+            return new Holder(new SnapTreeMap<CellName, Column>(map.comparator()), LIVE);
         }
 
         long addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater indexer)
         {
-            ByteBuffer name = column.name();
+            CellName name = column.name();
             while (true)
             {
                 Column oldColumn = map.putIfAbsent(name, column);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index bba3ffe..9bc857b 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -35,6 +35,7 @@ import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import org.apache.cassandra.db.composites.CellName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +46,6 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -128,9 +128,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         ByteBuffer data = serializeRowMutations(mutations);
 
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
-        cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
-        cf.addColumn(new Column(columnName("data"), data, timestamp));
-        cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
+        cf.addColumn(new Column(cellName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+        cf.addColumn(new Column(cellName("data"), data, timestamp));
+        cf.addColumn(new Column(cellName("written_at"), writtenAt, timestamp));
 
         return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
     }
@@ -282,9 +282,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
     }
 
-    private static ByteBuffer columnName(String name)
+    private static CellName cellName(String name)
     {
-        return CFMetaData.BatchlogCf.getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
+        return CFMetaData.BatchlogCf.comparator.makeCellName(name);
     }
 
     // force flush + compaction to reclaim space from the replayed batches