You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:42 UTC

[06/13] Push composites support in the storage engine

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 357ad65..27f0dd3 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -27,9 +27,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.IndexType;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
@@ -144,12 +144,12 @@ public class SecondaryIndexManager
         logger.info("Index build of {} complete", idxNames);
     }
 
-    public boolean indexes(ByteBuffer name, Collection<SecondaryIndex> indexes)
+    public boolean indexes(CellName name, Collection<SecondaryIndex> indexes)
     {
         return !indexFor(name, indexes).isEmpty();
     }
 
-    public List<SecondaryIndex> indexFor(ByteBuffer name, Collection<SecondaryIndex> indexes)
+    public List<SecondaryIndex> indexFor(CellName name, Collection<SecondaryIndex> indexes)
     {
         List<SecondaryIndex> matching = null;
         for (SecondaryIndex index : indexes)
@@ -169,12 +169,12 @@ public class SecondaryIndexManager
         return indexes(column.name());
     }
 
-    public boolean indexes(ByteBuffer name)
+    public boolean indexes(CellName name)
     {
         return indexes(name, indexesByColumn.values());
     }
 
-    public List<SecondaryIndex> indexFor(ByteBuffer name)
+    public List<SecondaryIndex> indexFor(CellName name)
     {
         return indexFor(name, indexesByColumn.values());
     }
@@ -437,7 +437,8 @@ public class SecondaryIndexManager
 
         for (Column column : indexedColumnsInRow)
         {
-            SecondaryIndex index = indexesByColumn.get(column.name());
+            // TODO: this is probably incorrect, we should pull all indexes
+            SecondaryIndex index = indexesByColumn.get(column.name().toByteBuffer());
             if (index == null)
                 continue;
 
@@ -559,8 +560,12 @@ public class SecondaryIndexManager
 
     public boolean validate(Column column)
     {
-        SecondaryIndex index = getIndexForColumn(column.name());
-        return index == null || index.validate(column);
+        for (SecondaryIndex index : indexFor(column.name()))
+        {
+            if (!index.validate(column))
+                return false;
+        }
+        return true;
     }
 
     public static interface Updater

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 6d137ca..95314cf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -24,12 +24,15 @@ import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
 /**
@@ -37,9 +40,9 @@ import org.apache.cassandra.exceptions.ConfigurationException;
  */
 public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
 {
-    private volatile CompositeType indexComparator;
+    private volatile CellNameType indexComparator;
 
-    protected CompositeType getIndexComparator()
+    protected CellNameType getIndexComparator()
     {
         // Yes, this is racy, but doing this more than once is not a big deal, we just want to avoid doing it every time
         // More seriously, we should fix that whole SecondaryIndex API so this can be a final and avoid all that non-sense.
@@ -81,7 +84,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
     }
 
     // Check SecondaryIndex.getIndexComparator if you want to know why this is static
-    public static CompositeType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
+    public static CellNameType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
     {
         if (cfDef.type.isCollection())
         {
@@ -110,12 +113,12 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
         throw new AssertionError();
     }
 
-    protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
+    protected CellName makeIndexColumnName(ByteBuffer rowKey, Column column)
     {
-        return makeIndexColumnNameBuilder(rowKey, column.name()).build();
+        return getIndexComparator().create(makeIndexColumnPrefix(rowKey, column.name()), null);
     }
 
-    protected abstract ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName);
+    protected abstract Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName);
 
     public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry);
 
@@ -132,17 +135,11 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     }
 
-    protected AbstractType getExpressionComparator()
+    protected AbstractType<?> getExpressionComparator()
     {
         return baseCfs.metadata.getColumnDefinitionComparator(columnDef);
     }
 
-    protected CompositeType getBaseComparator()
-    {
-        assert baseCfs.getComparator() instanceof CompositeType;
-        return (CompositeType)baseCfs.getComparator();
-    }
-
     public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
     {
         return new CompositesSearcher(baseCfs.indexManager, columns);
@@ -166,45 +163,31 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
     public static class IndexedEntry
     {
         public final DecoratedKey indexValue;
-        public final ByteBuffer indexEntry;
+        public final CellName indexEntry;
         public final long timestamp;
 
         public final ByteBuffer indexedKey;
-        public final ColumnNameBuilder indexedEntryNameBuilder;
+        public final Composite indexedEntryPrefix;
         public final ByteBuffer indexedEntryCollectionKey; // may be null
 
-        public IndexedEntry(DecoratedKey indexValue,
-                            ByteBuffer indexEntry,
-                            long timestamp,
-                            ByteBuffer indexedKey,
-                            ColumnNameBuilder indexedEntryNameBuilder)
+        public IndexedEntry(DecoratedKey indexValue, CellName indexEntry, long timestamp, ByteBuffer indexedKey, Composite indexedEntryPrefix)
         {
-            this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryNameBuilder, null);
+            this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryPrefix, null);
         }
 
         public IndexedEntry(DecoratedKey indexValue,
-                            ByteBuffer indexEntry,
+                            CellName indexEntry,
                             long timestamp,
                             ByteBuffer indexedKey,
-                            ColumnNameBuilder indexedEntryNameBuilder,
+                            Composite indexedEntryPrefix,
                             ByteBuffer indexedEntryCollectionKey)
         {
             this.indexValue = indexValue;
             this.indexEntry = indexEntry;
             this.timestamp = timestamp;
             this.indexedKey = indexedKey;
-            this.indexedEntryNameBuilder = indexedEntryNameBuilder;
+            this.indexedEntryPrefix = indexedEntryPrefix;
             this.indexedEntryCollectionKey = indexedEntryCollectionKey;
         }
-
-        public ByteBuffer indexedEntryStart()
-        {
-            return indexedEntryNameBuilder.build();
-        }
-
-        public ByteBuffer indexedEntryEnd()
-        {
-            return indexedEntryNameBuilder.buildAsEndOfRange();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 63889ee..38c55fd 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 
@@ -47,62 +47,55 @@ import org.apache.cassandra.db.marshal.*;
  */
 public class CompositesIndexOnClusteringKey extends CompositesIndex
 {
-    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
         // Index cell names are rk ck_0 ... ck_{i-1} ck_{i+1} ck_n, so n
         // components total (where n is the number of clustering keys)
         int ckCount = baseMetadata.clusteringColumns().size();
         List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount);
-        List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents();
         types.add(SecondaryIndex.keyComparator);
         for (int i = 0; i < columnDef.position(); i++)
-            types.add(ckTypes.get(i));
+            types.add(baseMetadata.clusteringColumns().get(i).type);
         for (int i = columnDef.position() + 1; i < ckCount; i++)
-            types.add(ckTypes.get(i));
-        return CompositeType.getInstance(types);
+            types.add(baseMetadata.clusteringColumns().get(i).type);
+        return new CompoundDenseCellNameType(types);
     }
 
     protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
     {
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(column.name());
-        return components[columnDef.position()];
+        return column.name().get(columnDef.position());
     }
 
-    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName)
     {
-        int ckCount = baseCfs.metadata.clusteringColumns().size();
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(columnName);
-        CompositeType.Builder builder = getIndexComparator().builder();
+        int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size());
+        CBuilder builder = getIndexComparator().prefixBuilder();
         builder.add(rowKey);
-
-        for (int i = 0; i < Math.min(components.length, columnDef.position()); i++)
-            builder.add(components[i]);
-        for (int i = columnDef.position() + 1; i < Math.min(components.length, ckCount); i++)
-            builder.add(components[i]);
-        return builder;
+        for (int i = 0; i < Math.min(columnDef.position(), count); i++)
+            builder.add(columnName.get(i));
+        for (int i = columnDef.position() + 1; i < count; i++)
+            builder.add(columnName.get(i));
+        return builder.build();
     }
 
     public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
     {
         int ckCount = baseCfs.metadata.clusteringColumns().size();
-        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
 
-        ColumnNameBuilder builder = getBaseComparator().builder();
+        CBuilder builder = baseCfs.getComparator().builder();
         for (int i = 0; i < columnDef.position(); i++)
-            builder.add(components[i + 1]);
+            builder.add(indexEntry.name().get(i + 1));
 
         builder.add(indexedValue.key);
 
         for (int i = columnDef.position() + 1; i < ckCount; i++)
-            builder.add(components[i]);
+            builder.add(indexEntry.name().get(i));
 
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
     }
 
     @Override
-    public boolean indexes(ByteBuffer name)
+    public boolean indexes(CellName name)
     {
         // For now, assume this is only used in CQL3 when we know name has enough component.
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
index c2acfc9..f3daaf2 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
@@ -23,11 +23,14 @@ import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.LocalToken;
 
 /**
  * Index on the collection element of the cell name of a collection.
@@ -45,15 +48,14 @@ import org.apache.cassandra.dht.LocalToken;
  */
 public class CompositesIndexOnCollectionKey extends CompositesIndex
 {
-    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
         int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix
         List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count);
-        List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents();
         types.add(SecondaryIndex.keyComparator);
         for (int i = 0; i < count - 1; i++)
-            types.add(ckTypes.get(i));
-        return CompositeType.getInstance(types);
+            types.add(baseMetadata.comparator.subtype(i));
+        return new CompoundDenseCellNameType(types);
     }
 
     @Override
@@ -64,49 +66,41 @@ public class CompositesIndexOnCollectionKey extends CompositesIndex
 
     protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
     {
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(column.name());
-        return components[columnDef.position() + 1];
+        return column.name().get(columnDef.position() + 1);
     }
 
-    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
     {
         int count = 1 + baseCfs.metadata.clusteringColumns().size();
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(columnName);
-        CompositeType.Builder builder = getIndexComparator().builder();
+        CBuilder builder = getIndexComparator().builder();
         builder.add(rowKey);
         for (int i = 0; i < count - 1; i++)
-            builder.add(components[i]);
-        return builder;
+            builder.add(cellName.get(i));
+        return builder.build();
     }
 
     public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
     {
         int count = 1 + baseCfs.metadata.clusteringColumns().size();
-        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
-
-        ColumnNameBuilder builder = getBaseComparator().builder();
+        CBuilder builder = baseCfs.getComparator().builder();
         for (int i = 0; i < count - 1; i++)
-            builder.add(components[i + 1]);
-
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+            builder.add(indexEntry.name().get(i + 1));
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
     }
 
     @Override
-    public boolean indexes(ByteBuffer name)
+    public boolean indexes(CellName name)
     {
         // We index if the CQL3 column name is the one of the collection we index
-        ByteBuffer[] components = getBaseComparator().split(name);
         AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return components.length > columnDef.position()
-            && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+        return name.size() > columnDef.position()
+            && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
     }
 
     public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
-        ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexValue.key).build();
-        Column liveColumn = data.getColumn(bb);
+        CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef.name, entry.indexValue.key);
+        Column liveColumn = data.getColumn(name);
         return (liveColumn == null || liveColumn.isMarkedForDelete(now));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
index f416d0e..9bf297b 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -23,11 +23,14 @@ import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.LocalToken;
 
 /**
  * Index the value of a collection cell.
@@ -42,15 +45,15 @@ import org.apache.cassandra.dht.LocalToken;
  */
 public class CompositesIndexOnCollectionValue extends CompositesIndex
 {
-    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
         int prefixSize = columnDef.position();
         List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 2);
         types.add(SecondaryIndex.keyComparator);
         for (int i = 0; i < prefixSize; i++)
-            types.add(((CompositeType)baseMetadata.comparator).types.get(i));
+            types.add(baseMetadata.comparator.subtype(i));
         types.add(((CollectionType)columnDef.type).nameComparator()); // collection key
-        return CompositeType.getInstance(types);
+        return new CompoundDenseCellNameType(types);
     }
 
     @Override
@@ -64,43 +67,38 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex
         return column.value();
     }
 
-    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
     {
-        int prefixSize = columnDef.position();
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(columnName);
-        assert components.length == baseComparator.types.size();
-        CompositeType.Builder builder = getIndexComparator().builder();
+        CBuilder builder = getIndexComparator().prefixBuilder();
         builder.add(rowKey);
-        for (int i = 0; i < prefixSize; i++)
-            builder.add(components[i]);
-        builder.add(components[prefixSize + 1]);
-        return builder;
+        for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++)
+            builder.add(cellName.get(i));
+        builder.add(cellName.get(columnDef.position() + 1));
+        return builder.build();
     }
 
     public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
     {
         int prefixSize = columnDef.position();
-        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
-        CompositeType.Builder builder = getBaseComparator().builder();
+        CellName name = indexEntry.name();
+        CBuilder builder = baseCfs.getComparator().builder();
         for (int i = 0; i < prefixSize; i++)
-            builder.add(components[i + 1]);
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder, components[prefixSize + 1]);
+            builder.add(name.get(i + 1));
+        return new IndexedEntry(indexedValue, name, indexEntry.timestamp(), name.get(0), builder.build(), name.get(prefixSize + 1));
     }
 
     @Override
-    public boolean indexes(ByteBuffer name)
+    public boolean indexes(CellName name)
     {
-        ByteBuffer[] components = getBaseComparator().split(name);
         AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return components.length > columnDef.position()
-            && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+        return name.size() > columnDef.position()
+            && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
     }
 
     public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
-        ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexedEntryCollectionKey).build();
-        Column liveColumn = data.getColumn(bb);
+        CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef.name, entry.indexedEntryCollectionKey);
+        Column liveColumn = data.getColumn(name);
         if (liveColumn == null || liveColumn.isMarkedForDelete(now))
             return true;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index d097899..6df1e8d 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 
@@ -48,13 +48,14 @@ import org.apache.cassandra.db.marshal.*;
  */
 public class CompositesIndexOnPartitionKey extends CompositesIndex
 {
-    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
         int ckCount = baseMetadata.clusteringColumns().size();
         List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount + 1);
         types.add(SecondaryIndex.keyComparator);
-        types.addAll(baseMetadata.comparator.getComponents());
-        return CompositeType.getInstance(types);
+        for (int i = 0; i < ckCount; i++)
+            types.add(baseMetadata.comparator.subtype(i));
+        return new CompoundDenseCellNameType(types);
     }
 
     protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
@@ -64,32 +65,28 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
         return components[columnDef.position()];
     }
 
-    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName)
     {
-        int ckCount = baseCfs.metadata.clusteringColumns().size();
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(columnName);
-        CompositeType.Builder builder = getIndexComparator().builder();
+        int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size());
+        CBuilder builder = getIndexComparator().prefixBuilder();
         builder.add(rowKey);
-        for (int i = 0; i < ckCount; i++)
-            builder.add(components[i]);
-        return builder;
+        for (int i = 0; i < count; i++)
+            builder.add(columnName.get(i));
+        return builder.build();
     }
 
     public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
     {
         int ckCount = baseCfs.metadata.clusteringColumns().size();
-        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
-
-        ColumnNameBuilder builder = getBaseComparator().builder();
+        CBuilder builder = baseCfs.getComparator().builder();
         for (int i = 0; i < ckCount; i++)
-            builder.add(components[i + 1]);
+            builder.add(indexEntry.name().get(i + 1));
 
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
     }
 
     @Override
-    public boolean indexes(ByteBuffer name)
+    public boolean indexes(CellName name)
     {
         // Since a partition key is always full, we always index it
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index 55a0f88..6903b77 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 
@@ -47,14 +47,14 @@ import org.apache.cassandra.db.marshal.*;
  */
 public class CompositesIndexOnRegular extends CompositesIndex
 {
-    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
         int prefixSize = columnDef.position();
         List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1);
         types.add(SecondaryIndex.keyComparator);
         for (int i = 0; i < prefixSize; i++)
-            types.add(((CompositeType)baseMetadata.comparator).types.get(i));
-        return CompositeType.getInstance(types);
+            types.add(baseMetadata.comparator.subtype(i));
+        return new CompoundDenseCellNameType(types);
     }
 
     protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
@@ -62,39 +62,35 @@ public class CompositesIndexOnRegular extends CompositesIndex
         return column.value();
     }
 
-    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
     {
-        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        ByteBuffer[] components = baseComparator.split(columnName);
-        CompositeType.Builder builder = getIndexComparator().builder();
+        CBuilder builder = getIndexComparator().prefixBuilder();
         builder.add(rowKey);
-        for (int i = 0; i < Math.min(columnDef.position(), components.length); i++)
-            builder.add(components[i]);
-        return builder;
+        for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++)
+            builder.add(cellName.get(i));
+        return builder.build();
     }
 
     public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
     {
-        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
-        CompositeType.Builder builder = getBaseComparator().builder();
+        CBuilder builder = baseCfs.getComparator().builder();
         for (int i = 0; i < columnDef.position(); i++)
-            builder.add(components[i + 1]);
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+            builder.add(indexEntry.name().get(i + 1));
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
     }
 
     @Override
-    public boolean indexes(ByteBuffer name)
+    public boolean indexes(CellName name)
     {
-        ByteBuffer[] components = getBaseComparator().split(name);
         AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return components.length > columnDef.position()
-            && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+        return name.size() > columnDef.position()
+            && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
     }
 
     public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
-        ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).build();
-        Column liveColumn = data.getColumn(bb);
+        CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef.name);
+        Column liveColumn = data.getColumn(name);
         if (liveColumn == null || liveColumn.isMarkedForDelete(now))
             return true;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index bcb0dd2..97602af 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -24,15 +24,16 @@ import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -52,23 +53,23 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         return baseCfs.filter(getIndexedIterator(filter), filter);
     }
 
-    private ByteBuffer makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
+    private Composite makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
     {
         if (key.remaining() == 0)
-            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            return Composites.EMPTY;
 
-        ColumnNameBuilder builder;
+        Composite prefix;
         IDiskAtomFilter columnFilter = filter.columnFilter(key);
         if (columnFilter instanceof SliceQueryFilter)
         {
             SliceQueryFilter sqf = (SliceQueryFilter)columnFilter;
-            builder = index.makeIndexColumnNameBuilder(key, isStart ? sqf.start() : sqf.finish());
+            prefix = index.makeIndexColumnPrefix(key, isStart ? sqf.start() : sqf.finish());
         }
         else
         {
-            builder = index.getIndexComparator().builder().add(key);
+            prefix = index.getIndexComparator().make(key);
         }
-        return isStart ? builder.build() : builder.buildAsEndOfRange();
+        return isStart ? prefix.start() : prefix.end();
     }
 
     private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
@@ -94,15 +95,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
         ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
-        final CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
-        final CompositeType indexComparator = (CompositeType)index.getIndexCfs().getComparator();
+        final CellNameType baseComparator = baseCfs.getComparator();
+        final CellNameType indexComparator = index.getIndexCfs().getComparator();
 
-        final ByteBuffer startPrefix = makePrefix(index, startKey, filter, true);
-        final ByteBuffer endPrefix = makePrefix(index, endKey, filter, false);
+        final Composite startPrefix = makePrefix(index, startKey, filter, true);
+        final Composite endPrefix = makePrefix(index, endKey, filter, false);
 
         return new ColumnFamilyStore.AbstractScanIterator()
         {
-            private ByteBuffer lastSeenPrefix = startPrefix;
+            private Composite lastSeenPrefix = startPrefix;
             private Deque<Column> indexColumns;
             private int columnsRead = Integer.MAX_VALUE;
             private int limit = filter.currentLimit();
@@ -135,7 +136,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                  */
                 DecoratedKey currentKey = null;
                 ColumnFamily data = null;
-                ByteBuffer previousPrefix = null;
+                Composite previousPrefix = null;
 
                 while (true)
                 {
@@ -229,7 +230,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                         }
 
                         // Check if this entry cannot be a hit due to the original column filter
-                        ByteBuffer start = entry.indexedEntryStart();
+                        Composite start = entry.indexedEntryPrefix;
                         if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start))
                             continue;
 
@@ -248,7 +249,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                         // We always query the whole CQL3 row. In the case where the original filter was a name filter this might be
                         // slightly wasteful, but this probably doesn't matter in practice and it simplify things.
                         SliceQueryFilter dataFilter = new SliceQueryFilter(start,
-                                                                           entry.indexedEntryEnd(),
+                                                                           entry.indexedEntryPrefix.end(),
                                                                            false,
                                                                            Integer.MAX_VALUE,
                                                                            baseCfs.metadata.clusteringColumns().size());
@@ -267,7 +268,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                         if (entry.indexedEntryCollectionKey != null)
                             previousPrefix = start;
 
-                        if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder, entry.indexedEntryCollectionKey))
+                        if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryPrefix, entry.indexedEntryCollectionKey))
                             continue;
 
                         if (data == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 7d98c24..ee56c36 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db.index.keys;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
@@ -38,9 +40,9 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
         return column.value();
     }
 
-    protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
+    protected CellName makeIndexColumnName(ByteBuffer rowKey, Column column)
     {
-        return rowKey;
+        return CellNames.simpleDense(rowKey);
     }
 
     public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
@@ -50,7 +52,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
 
     public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
     {
-        Column liveColumn = data.getColumn(columnDef.name.bytes);
+        Column liveColumn = data.getColumn(data.getComparator().makeCellName(columnDef.name.bytes));
         if (liveColumn == null || liveColumn.isMarkedForDelete(now))
             return true;
 
@@ -63,8 +65,15 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
         // no options used
     }
 
+    public boolean indexes(CellName name)
+    {
+        // This consider the full cellName directly
+        AbstractType<?> comparator = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+        return comparator.compare(columnDef.name.bytes, name.toByteBuffer()) == 0;
+    }
+
     protected AbstractType getExpressionComparator()
     {
-        return baseCfs.getComparator();
+        return baseCfs.getComparator().asAbstractType();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 3740e24..0101a0b 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -28,13 +28,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.index.*;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.HeapAllocator;
 
 public class KeysSearcher extends SecondaryIndexSearcher
@@ -75,12 +78,15 @@ public class KeysSearcher extends SecondaryIndexSearcher
          * indexed row.
          */
         final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
-        final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        CellNameType type = index.getIndexCfs().getComparator();
+        final Composite startKey = range.left instanceof DecoratedKey ? type.make(((DecoratedKey)range.left).key) : Composites.EMPTY;
+        final Composite endKey = range.right instanceof DecoratedKey ? type.make(((DecoratedKey)range.right).key) : Composites.EMPTY;
+
+        final CellName primaryColumn = baseCfs.getComparator().cellFromByteBuffer(primary.column);
 
         return new ColumnFamilyStore.AbstractScanIterator()
         {
-            private ByteBuffer lastSeenKey = startKey;
+            private Composite lastSeenKey = startKey;
             private Iterator<Column> indexColumns;
             private int columnsRead = Integer.MAX_VALUE;
 
@@ -101,7 +107,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
 
                         if (logger.isTraceEnabled() && (index instanceof AbstractSimplePerColumnSecondaryIndex))
                             logger.trace("Scanning index {} starting with {}",
-                                         ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey));
+                                         ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey.toByteBuffer()));
 
                         QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
                                                                              index.getIndexCfs().name,
@@ -128,7 +134,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         {
                             // skip the row we already saw w/ the last page of results
                             indexColumns.next();
-                            logger.trace("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
+                            logger.trace("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name().toByteBuffer()));
                         }
                         else if (range instanceof Range && indexColumns.hasNext() && firstColumn.name().equals(startKey))
                         {
@@ -148,7 +154,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                             continue;
                         }
 
-                        DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
+                        DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey.toByteBuffer());
                         if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
                         {
                             logger.trace("Reached end of assigned scan range");
@@ -161,7 +167,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         }
 
                         logger.trace("Returning index hit for {}", dk);
-                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey), filter.timestamp));
+                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey.toByteBuffer()), filter.timestamp));
                         // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
                         if (data == null)
                             data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
@@ -179,7 +185,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data, filter.timestamp))
                         {
                             // delete the index entry w/ its own timestamp
-                            Column dummyColumn = new Column(primary.column, indexKey.key, column.timestamp());
+                            Column dummyColumn = new Column(primaryColumn, indexKey.key, column.timestamp());
                             ((PerColumnSecondaryIndex)index).delete(dk.key, dummyColumn);
                             continue;
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
index 01db148..a7162ae 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -44,5 +45,5 @@ public abstract class AbstractCommutativeType extends AbstractType<Long>
     /**
      * create commutative column
      */
-    public abstract Column createColumn(ByteBuffer name, ByteBuffer value, long timestamp);
+    public abstract Column createColumn(CellName name, ByteBuffer value, long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index d002aa7..be66d21 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -35,21 +35,21 @@ import java.util.List;
 public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
 {
     // changes bb position
-    protected static int getShortLength(ByteBuffer bb)
+    public static int getShortLength(ByteBuffer bb)
     {
         int length = (bb.get() & 0xFF) << 8;
         return length | (bb.get() & 0xFF);
     }
 
     // changes bb position
-    protected static void putShortLength(ByteBuffer bb, int length)
+    public static void putShortLength(ByteBuffer bb, int length)
     {
         bb.put((byte) ((length >> 8) & 0xFF));
         bb.put((byte) (length & 0xFF));
     }
 
     // changes bb position
-    protected static ByteBuffer getBytes(ByteBuffer bb, int length)
+    public static ByteBuffer getBytes(ByteBuffer bb, int length)
     {
         ByteBuffer copy = bb.duplicate();
         copy.limit(copy.position() + length);
@@ -58,7 +58,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
     }
 
     // changes bb position
-    protected static ByteBuffer getWithShortLength(ByteBuffer bb)
+    public static ByteBuffer getWithShortLength(ByteBuffer bb)
     {
         int length = getShortLength(bb);
         return getBytes(bb, length);
@@ -169,7 +169,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
      * Escapes all occurences of the ':' character from the input, replacing them by "\:".
      * Furthermore, if the last character is '\' or '!', a '!' is appended.
      */
-    static String escape(String input)
+    public static String escape(String input)
     {
         if (input.isEmpty())
             return input;
@@ -234,7 +234,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
             byte b = bb.get();
             if (b != 0)
             {
-                sb.append(":!");
+                sb.append(b < 0 ? ":_" : ":!");
                 break;
             }
             ++i;
@@ -249,6 +249,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         List<ParsedComparator> comparators = new ArrayList<ParsedComparator>(parts.size());
         int totalLength = 0, i = 0;
         boolean lastByteIsOne = false;
+        boolean lastByteIsMinusOne = false;
 
         for (String part : parts)
         {
@@ -257,6 +258,11 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
                 lastByteIsOne = true;
                 break;
             }
+            else if (part.equals("_"))
+            {
+                lastByteIsMinusOne = true;
+                break;
+            }
 
             ParsedComparator p = parseComparator(i, part);
             AbstractType<?> type = p.getAbstractType();
@@ -281,6 +287,8 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         }
         if (lastByteIsOne)
             bb.put(bb.limit() - 1, (byte)1);
+        else if (lastByteIsMinusOne)
+            bb.put(bb.limit() - 1, (byte)-1);
 
         bb.rewind();
         return bb;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index ffba918..cefa465 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -25,17 +25,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.cassandra.cql3.CQL3Type;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RangeTombstone;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 
 /**
  * Specifies a Comparator for a specific type of ByteBuffer.
@@ -47,78 +39,10 @@ import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
  */
 public abstract class AbstractType<T> implements Comparator<ByteBuffer>
 {
-    public final Comparator<IndexInfo> indexComparator;
-    public final Comparator<IndexInfo> indexReverseComparator;
-    public final Comparator<Column> columnComparator;
-    public final Comparator<Column> columnReverseComparator;
-    public final Comparator<OnDiskAtom> onDiskAtomComparator;
     public final Comparator<ByteBuffer> reverseComparator;
 
     protected AbstractType()
     {
-        indexComparator = new Comparator<IndexInfo>()
-        {
-            public int compare(IndexInfo o1, IndexInfo o2)
-            {
-                return AbstractType.this.compare(o1.lastName, o2.lastName);
-            }
-        };
-        indexReverseComparator = new Comparator<IndexInfo>()
-        {
-            public int compare(IndexInfo o1, IndexInfo o2)
-            {
-                return AbstractType.this.compare(o1.firstName, o2.firstName);
-            }
-        };
-        columnComparator = new Comparator<Column>()
-        {
-            public int compare(Column c1, Column c2)
-            {
-                return AbstractType.this.compare(c1.name(), c2.name());
-            }
-        };
-        columnReverseComparator = new Comparator<Column>()
-        {
-            public int compare(Column c1, Column c2)
-            {
-                return AbstractType.this.compare(c2.name(), c1.name());
-            }
-        };
-        onDiskAtomComparator = new Comparator<OnDiskAtom>()
-        {
-            public int compare(OnDiskAtom c1, OnDiskAtom c2)
-            {
-                int comp = AbstractType.this.compare(c1.name(), c2.name());
-                if (comp != 0)
-                    return comp;
-
-                if (c1 instanceof RangeTombstone)
-                {
-                    if (c2 instanceof RangeTombstone)
-                    {
-                        RangeTombstone t1 = (RangeTombstone)c1;
-                        RangeTombstone t2 = (RangeTombstone)c2;
-                        int comp2 = AbstractType.this.compare(t1.max, t2.max);
-                        if (comp2 == 0)
-                            return t1.data.compareTo(t2.data);
-                        else
-                            return comp2;
-                    }
-                    else
-                    {
-                        return -1;
-                    }
-                }
-                else if (c2 instanceof RangeTombstone)
-                {
-                    return 1;
-                }
-                else
-                {
-                    return 0;
-                }
-            }
-        };
         reverseComparator = new Comparator<ByteBuffer>()
         {
             public int compare(ByteBuffer o1, ByteBuffer o2)
@@ -197,17 +121,6 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
         return builder.toString();
     }
 
-    /* convenience method */
-    public String getColumnsString(Iterable<Column> columns)
-    {
-        StringBuilder builder = new StringBuilder();
-        for (Column column : columns)
-        {
-            builder.append(column.getString(this)).append(",");
-        }
-        return builder.toString();
-    }
-
     public boolean isCommutative()
     {
         return false;
@@ -312,25 +225,4 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     {
         return getClass().getName();
     }
-
-    protected boolean intersects(ByteBuffer minColName, ByteBuffer maxColName, ByteBuffer sliceStart, ByteBuffer sliceEnd)
-    {
-        return (sliceStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || compare(maxColName, sliceStart) >= 0)
-               && (sliceEnd.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || compare(sliceEnd, minColName) >= 0);
-    }
-
-    public boolean intersects(List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames, SliceQueryFilter filter)
-    {
-        assert minColumnNames.size() == 1;
-
-        for (ColumnSlice slice : filter.slices)
-        {
-            ByteBuffer start = filter.isReversed() ? slice.finish : slice.start;
-            ByteBuffer finish = filter.isReversed() ? slice.start : slice.finish;
-
-            if (intersects(minColumnNames.get(0), maxColumnNames.get(0), start, finish))
-                return true;
-        }
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 07c86e0..0f3f564 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -58,7 +58,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
 
     protected abstract void appendToStringBuilder(StringBuilder sb);
 
-    public abstract ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns);
+    public abstract ByteBuffer serialize(List<Column> columns);
 
     @Override
     public String toString()
@@ -113,7 +113,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
         return (ByteBuffer)result.flip();
     }
 
-    protected List<Pair<ByteBuffer, Column>> enforceLimit(List<Pair<ByteBuffer, Column>> columns)
+    protected List<Column> enforceLimit(List<Column> columns)
     {
         if (columns.size() <= MAX_ELEMENTS)
             return columns;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index e7a5fee..36249bf 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -27,11 +27,8 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -234,30 +231,6 @@ public class CompositeType extends AbstractCompositeType
         return true;
     }
 
-    @Override
-    public boolean intersects(List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames, SliceQueryFilter filter)
-    {
-        assert minColumnNames.size() == maxColumnNames.size();
-        outer:
-        for (ColumnSlice slice : filter.slices)
-        {
-            // This slices intersects if all component intersect. And we don't intersect
-            // only if no slice intersects
-            ByteBuffer[] start = split(filter.isReversed() ? slice.finish : slice.start);
-            ByteBuffer[] finish = split(filter.isReversed() ? slice.start : slice.finish);
-            for (int i = 0; i < minColumnNames.size(); i++)
-            {
-                AbstractType<?> t = types.get(i);
-                ByteBuffer s = i < start.length ? start[i] : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-                ByteBuffer f = i < finish.length ? finish[i] : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-                if (!t.intersects(minColumnNames.get(i), maxColumnNames.get(i), s, f))
-                    continue outer;
-            }
-            return true;
-        }
-        return false;
-    }
-
     private static class StaticParsedComparator implements ParsedComparator
     {
         final AbstractType<?> type;
@@ -315,7 +288,7 @@ public class CompositeType extends AbstractCompositeType
         return out;
     }
 
-    public static class Builder implements ColumnNameBuilder
+    public static class Builder
     {
         private final CompositeType composite;
 
@@ -376,13 +349,11 @@ public class CompositeType extends AbstractCompositeType
             return this;
         }
 
-        @Override
         public Builder add(ByteBuffer bb)
         {
             return add(bb, Relation.Type.EQ);
         }
 
-        @Override
         public Builder add(ColumnIdentifier name)
         {
             return add(name.bytes);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
index 6a77458..37cd59b 100644
--- a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.CounterSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -47,7 +48,7 @@ public class CounterColumnType extends AbstractCommutativeType
     /**
      * create commutative column
      */
-    public Column createColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    public Column createColumn(CellName name, ByteBuffer value, long timestamp)
     {
         return new CounterUpdateColumn(name, value, timestamp);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 808ba45..58ba6f1 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -82,16 +82,16 @@ public class ListType<T> extends CollectionType<List<T>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
     }
 
-    public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
+    public ByteBuffer serialize(List<Column> columns)
     {
         columns = enforceLimit(columns);
 
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(columns.size());
         int size = 0;
-        for (Pair<ByteBuffer, Column> p : columns)
+        for (Column c : columns)
         {
-            bbs.add(p.right.value());
-            size += 2 + p.right.value().remaining();
+            bbs.add(c.value());
+            size += 2 + c.value().remaining();
         }
         return pack(bbs, columns.size(), size);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index fd96da7..17bd7a7 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -89,17 +89,19 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     /**
      * Creates the same output than serialize, but from the internal representation.
      */
-    public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
+    public ByteBuffer serialize(List<Column> columns)
     {
         columns = enforceLimit(columns);
 
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * columns.size());
         int size = 0;
-        for (Pair<ByteBuffer, Column> p : columns)
+        for (Column c : columns)
         {
-            bbs.add(p.left);
-            bbs.add(p.right.value());
-            size += 4 + p.left.remaining() + p.right.value().remaining();
+            ByteBuffer key = c.name().collectionElement();
+            ByteBuffer value = c.value();
+            bbs.add(key);
+            bbs.add(value);
+            size += 4 + key.remaining() + value.remaining();
         }
         return pack(bbs, columns.size(), size);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index c947d26..9e45f8f 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -82,16 +82,17 @@ public class SetType<T> extends CollectionType<Set<T>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
     }
 
-    public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
+    public ByteBuffer serialize(List<Column> columns)
     {
         columns = enforceLimit(columns);
 
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(columns.size());
         int size = 0;
-        for (Pair<ByteBuffer, Column> p : columns)
+        for (Column c : columns)
         {
-            bbs.add(p.left);
-            size += 2 + p.left.remaining();
+            ByteBuffer key = c.name().collectionElement();
+            bbs.add(key);
+            size += 2 + key.remaining();
         }
         return pack(bbs, columns.size(), size);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 1b5a4e2..4327aa9 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.Column;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
@@ -44,15 +45,15 @@ import org.apache.hadoop.mapreduce.*;
  *
  * The default split size is 64k rows.
  */
-public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
+public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<CellName, Column>>
 {
     
-    public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+    public RecordReader<ByteBuffer, SortedMap<CellName, Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();
     }
 
-    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
     {
         TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 98a294d..7bda3fb 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -29,8 +29,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.*;
@@ -44,8 +44,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>>
-    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>>
+public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<CellName, Column>>
+    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Column>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
 
@@ -53,7 +53,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     private ColumnFamilySplit split;
     private RowIterator iter;
-    private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow;
+    private Pair<ByteBuffer, SortedMap<CellName, Column>> currentRow;
     private SlicePredicate predicate;
     private boolean isEmptyPredicate;
     private int totalRowCount; // total number of rows to fetch
@@ -92,7 +92,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return currentRow.left;
     }
 
-    public SortedMap<ByteBuffer, Column> getCurrentValue()
+    public SortedMap<CellName, Column> getCurrentValue()
     {
         return currentRow.right;
     }
@@ -210,12 +210,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return split.getLocations()[0];
     }
 
-    private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
+    private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<CellName, Column>>>
     {
         protected List<KeySlice> rows;
         protected int totalRead = 0;
         protected final boolean isSuper;
-        protected final AbstractType<?> comparator;
+        protected final CellNameType comparator;
         protected final AbstractType<?> subComparator;
         protected final IPartitioner partitioner;
 
@@ -253,7 +253,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                         cfDef.column_type = ByteBufferUtil.string(type);
                 }
 
-                comparator = TypeParser.parse(cfDef.comparator_type);
+                comparator = CellNames.fromAbstractType(TypeParser.parse(cfDef.comparator_type), true);
                 subComparator = cfDef.subcomparator_type == null ? null : TypeParser.parse(cfDef.subcomparator_type);
             }
             catch (ConfigurationException e)
@@ -297,21 +297,21 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             for (org.apache.cassandra.thrift.Column column : super_column.columns)
             {
                 Column c = unthriftifySimple(column);
-                columns.add(c.withUpdatedName(CompositeType.build(super_column.name, c.name())));
+                columns.add(c.withUpdatedName(comparator.makeCellName(super_column.name, c.name().toByteBuffer())));
             }
             return columns;
         }
 
         protected Column unthriftifySimple(org.apache.cassandra.thrift.Column column)
         {
-            return new Column(column.name, column.value, column.timestamp);
+            return new Column(comparator.cellFromByteBuffer(column.name), column.value, column.timestamp);
         }
 
         private Column unthriftifyCounter(CounterColumn column)
         {
             //CounterColumns read the counterID from the System keyspace, so need the StorageService running and access
             //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Column.
-            return new Column(column.name, ByteBufferUtil.bytes(column.value), 0);
+            return new Column(comparator.cellFromByteBuffer(column.name), ByteBufferUtil.bytes(column.value), 0);
         }
 
         private List<Column> unthriftifySuperCounter(CounterSuperColumn super_column)
@@ -320,7 +320,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             for (CounterColumn column : super_column.columns)
             {
                 Column c = unthriftifyCounter(column);
-                columns.add(c.withUpdatedName(CompositeType.build(super_column.name, c.name())));
+                columns.add(c.withUpdatedName(comparator.makeCellName(super_column.name, c.name().toByteBuffer())));
             }
             return columns;
         }
@@ -401,7 +401,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
+        protected Pair<ByteBuffer, SortedMap<CellName, Column>> computeNext()
         {
             maybeInit();
             if (rows == null)
@@ -409,7 +409,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
             totalRead++;
             KeySlice ks = rows.get(i++);
-            SortedMap<ByteBuffer, Column> map = new TreeMap<ByteBuffer, Column>(comparator);
+            SortedMap<CellName, Column> map = new TreeMap<CellName, Column>(comparator);
             for (ColumnOrSuperColumn cosc : ks.columns)
             {
                 List<Column> columns = unthriftify(cosc);
@@ -422,8 +422,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     private class WideRowIterator extends RowIterator
     {
-        private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns;
-        private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        private PeekingIterator<Pair<ByteBuffer, SortedMap<CellName, Column>>> wideColumns;
+        private Composite lastColumn = Composites.EMPTY;
         private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         private void maybeInit()
@@ -452,7 +452,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
             try
             {
-                rows = client.get_paged_slice(cfName, keyRange, lastColumn, consistencyLevel);
+                rows = client.get_paged_slice(cfName, keyRange, lastColumn.toByteBuffer(), consistencyLevel);
                 int n = 0;
                 for (KeySlice row : rows)
                     n += row.columns.size();
@@ -471,14 +471,14 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
+        protected Pair<ByteBuffer, SortedMap<CellName, Column>> computeNext()
         {
             maybeInit();
             if (rows == null)
                 return endOfData();
 
-            Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next();
-            lastColumn = next.right.values().iterator().next().name().duplicate();
+            Pair<ByteBuffer, SortedMap<CellName, Column>> next = wideColumns.next();
+            lastColumn = next.right.values().iterator().next().name();
 
             maybeIncreaseRowCounter(next);
             return next;
@@ -489,7 +489,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
          * Increases the row counter only if we really moved to the next row.
          * @param next just fetched row slice
          */
-        private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next)
+        private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<CellName, Column>> next)
         {
             ByteBuffer currentKey = next.left;
             if (!currentKey.equals(lastCountedKey))
@@ -499,7 +499,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
+        private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<CellName, Column>>>
         {
             private final Iterator<KeySlice> rows;
             private Iterator<ColumnOrSuperColumn> columns;
@@ -520,14 +520,17 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                 columns = currentRow.columns.iterator();
             }
 
-            protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
+            protected Pair<ByteBuffer, SortedMap<CellName, Column>> computeNext()
             {
+                CellNameType cellType = subComparator == null
+                                      ? comparator
+                                      : new CompoundDenseCellNameType(Arrays.asList(comparator.asAbstractType(), subComparator));
                 while (true)
                 {
                     if (columns.hasNext())
                     {
                         ColumnOrSuperColumn cosc = columns.next();
-                        SortedMap<ByteBuffer, Column> map;
+                        SortedMap<CellName, Column> map;
                         List<Column> columns = unthriftify(cosc);
                         if (columns.size() == 1)
                         {
@@ -536,11 +539,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                         else
                         {
                             assert isSuper;
-                            map = new TreeMap<ByteBuffer, Column>(CompositeType.getInstance(comparator, subComparator));
+                            map = new TreeMap<CellName, Column>(cellType);
                             for (Column column : columns)
                                 map.put(column.name(), column);
                         }
-                        return Pair.<ByteBuffer, SortedMap<ByteBuffer, Column>>create(currentRow.key, map);
+                        return Pair.<ByteBuffer, SortedMap<CellName, Column>>create(currentRow.key, map);
                     }
 
                     if (!rows.hasNext())
@@ -557,7 +560,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
     // to the old. Thus, expect a small performance hit.
     // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
     // and ColumnFamilyRecordReader don't support them, it should be fine for now.
-    public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException
+    public boolean next(ByteBuffer key, SortedMap<CellName, Column> value) throws IOException
     {
         if (this.nextKeyValue())
         {
@@ -578,9 +581,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return ByteBuffer.wrap(new byte[this.keyBufferSize]);
     }
 
-    public SortedMap<ByteBuffer, Column> createValue()
+    public SortedMap<CellName, Column> createValue()
     {
-        return new TreeMap<ByteBuffer, Column>();
+        return new TreeMap<CellName, Column>();
     }
 
     public long getPos() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index b0b7fe9..e5b8bb1 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -123,22 +123,21 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         CfDef cfDef = cfInfo.cfDef;
         Tuple pair = TupleFactory.getInstance().newTuple(2);
 
+        ByteBuffer colName = col.name().toByteBuffer();
+
         // name
         if(comparator instanceof AbstractCompositeType)
-            setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
+            setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,colName));
         else
-            setTupleValue(pair, 0, cassandraToObj(comparator, col.name()));
+            setTupleValue(pair, 0, cassandraToObj(comparator, col.name().toByteBuffer()));
 
         // value
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        ByteBuffer colName;
         if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
         {
-            ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
+            ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(colName);
             colName = names[names.length-1];
         }
-        else
-            colName = col.name();
         if (validators.get(colName) == null)
         {
             Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 89c1944..7ce78de 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -24,6 +24,7 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -111,8 +112,8 @@ public class CqlStorage extends AbstractCassandraStorage
                 ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
                 if (columnValue != null)
                 {
-                    Column column = new Column(cdef.name, columnValue);
-                    AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+                    Column column = new Column(CellNames.simpleDense(cdef.name), columnValue);
+                    AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
                     setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
                 }
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 6018369..27f1c12 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CounterId;
@@ -117,7 +116,7 @@ public abstract class AbstractSSTableSimpleWriter
             if (currentSuperColumn == null)
                 throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started.");
 
-            column = column.withUpdatedName(CompositeType.build(currentSuperColumn, column.name()));
+            column = column.withUpdatedName(columnFamily.getComparator().makeCellName(currentSuperColumn, column.name().toByteBuffer()));
         }
         columnFamily.addColumn(column);
     }
@@ -130,7 +129,7 @@ public abstract class AbstractSSTableSimpleWriter
      */
     public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
     {
-        addColumn(new Column(name, value, timestamp));
+        addColumn(new Column(metadata.comparator.cellFromByteBuffer(name), value, timestamp));
     }
 
     /**
@@ -145,7 +144,7 @@ public abstract class AbstractSSTableSimpleWriter
      */
     public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS)
     {
-        addColumn(new ExpiringColumn(name, value, timestamp, ttl, (int)(expirationTimestampMS / 1000)));
+        addColumn(new ExpiringColumn(metadata.comparator.cellFromByteBuffer(name), value, timestamp, ttl, (int)(expirationTimestampMS / 1000)));
     }
 
     /**
@@ -155,7 +154,7 @@ public abstract class AbstractSSTableSimpleWriter
      */
     public void addCounterColumn(ByteBuffer name, long value)
     {
-        addColumn(new CounterColumn(name, CounterContext.instance().create(counterid, 1L, value, false), System.currentTimeMillis()));
+        addColumn(new CounterColumn(metadata.comparator.cellFromByteBuffer(name), CounterContext.instance().create(counterid, 1L, value, false), System.currentTimeMillis()));
     }
 
     /**