You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:45 UTC

[09/13] Push composites support in the storage engine

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index c943ad7..9a60209 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -20,33 +20,31 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Interval;
 
-public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implements OnDiskAtom
+public class RangeTombstone extends Interval<Composite, DeletionTime> implements OnDiskAtom
 {
-    public static final Serializer serializer = new Serializer();
-
-    public RangeTombstone(ByteBuffer start, ByteBuffer stop, long markedForDeleteAt, int localDeletionTime)
+    public RangeTombstone(Composite start, Composite stop, long markedForDeleteAt, int localDeletionTime)
     {
         this(start, stop, new DeletionTime(markedForDeleteAt, localDeletionTime));
     }
 
-    public RangeTombstone(ByteBuffer start, ByteBuffer stop, DeletionTime delTime)
+    public RangeTombstone(Composite start, Composite stop, DeletionTime delTime)
     {
         super(start, stop, delTime);
     }
 
-    public ByteBuffer name()
+    public Composite name()
     {
         return min;
     }
@@ -66,20 +64,6 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
         return data.markedForDeleteAt;
     }
 
-    public int serializedSize(TypeSizes typeSizes)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public long serializedSizeForSSTable()
-    {
-        TypeSizes typeSizes = TypeSizes.NATIVE;
-        return typeSizes.sizeof((short)min.remaining()) + min.remaining()
-             + 1 // serialization flag
-             + typeSizes.sizeof((short)max.remaining()) + max.remaining()
-             + DeletionTime.serializer.serializedSize(data, typeSizes);
-    }
-
     public void validateFields(CFMetaData metadata) throws MarshalException
     {
         metadata.comparator.validate(min);
@@ -88,8 +72,8 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
 
     public void updateDigest(MessageDigest digest)
     {
-        digest.update(min.duplicate());
-        digest.update(max.duplicate());
+        digest.update(min.toByteBuffer().duplicate());
+        digest.update(max.toByteBuffer().duplicate());
         DataOutputBuffer buffer = new DataOutputBuffer();
         try
         {
@@ -106,7 +90,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
      * This tombstone supersedes another one if it is more recent and cover a
      * bigger range than rt.
      */
-    public boolean supersedes(RangeTombstone rt, Comparator<ByteBuffer> comparator)
+    public boolean supersedes(RangeTombstone rt, Comparator<Composite> comparator)
     {
         if (rt.data.markedForDeleteAt > data.markedForDeleteAt)
             return false;
@@ -116,7 +100,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
 
     public static class Tracker
     {
-        private final Comparator<ByteBuffer> comparator;
+        private final Comparator<Composite> comparator;
         private final Deque<RangeTombstone> ranges = new ArrayDeque<RangeTombstone>();
         private final SortedSet<RangeTombstone> maxOrderingSet = new TreeSet<RangeTombstone>(new Comparator<RangeTombstone>()
         {
@@ -127,7 +111,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
         });
         private int atomCount;
 
-        public Tracker(Comparator<ByteBuffer> comparator)
+        public Tracker(Comparator<Composite> comparator)
         {
             this.comparator = comparator;
         }
@@ -174,7 +158,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
 
             for (RangeTombstone tombstone : toWrite)
             {
-                size += tombstone.serializedSizeForSSTable();
+                size += atomSerializer.serializedSizeForSSTable(tombstone);
                 atomCount++;
                 if (out != null)
                     atomSerializer.serializeForSSTable(tombstone, out);
@@ -254,33 +238,50 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
 
     public static class Serializer implements ISSTableSerializer<RangeTombstone>
     {
+        private final CType type;
+
+        public Serializer(CType type)
+        {
+            this.type = type;
+        }
+
         public void serializeForSSTable(RangeTombstone t, DataOutput out) throws IOException
         {
-            ByteBufferUtil.writeWithShortLength(t.min, out);
+            type.serializer().serialize(t.min, out);
             out.writeByte(ColumnSerializer.RANGE_TOMBSTONE_MASK);
-            ByteBufferUtil.writeWithShortLength(t.max, out);
+            type.serializer().serialize(t.max, out);
             DeletionTime.serializer.serialize(t.data, out);
         }
 
         public RangeTombstone deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException
         {
-            ByteBuffer min = ByteBufferUtil.readWithShortLength(in);
-            if (min.remaining() <= 0)
-                throw ColumnSerializer.CorruptColumnException.create(in, min);
+            Composite min = type.serializer().deserialize(in);
 
             int b = in.readUnsignedByte();
             assert (b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0;
             return deserializeBody(in, min, version);
         }
 
-        public RangeTombstone deserializeBody(DataInput in, ByteBuffer min, Descriptor.Version version) throws IOException
+        public RangeTombstone deserializeBody(DataInput in, Composite min, Descriptor.Version version) throws IOException
         {
-            ByteBuffer max = ByteBufferUtil.readWithShortLength(in);
-            if (max.remaining() <= 0)
-                throw ColumnSerializer.CorruptColumnException.create(in, max);
-
+            Composite max = type.serializer().deserialize(in);
             DeletionTime dt = DeletionTime.serializer.deserialize(in);
             return new RangeTombstone(min, max, dt);
         }
+
+        public void skipBody(DataInput in, Descriptor.Version version) throws IOException
+        {
+            type.serializer().skip(in);
+            DeletionTime.serializer.skip(in);
+        }
+
+        public long serializedSizeForSSTable(RangeTombstone t)
+        {
+            TypeSizes typeSizes = TypeSizes.NATIVE;
+            return type.serializer().serializedSize(t.min, typeSizes)
+                 + 1 // serialization flag
+                 + type.serializer().serializedSize(t.max, typeSizes)
+                 + DeletionTime.serializer.serializedSize(t.data, typeSizes);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index dad9004..4199ea4 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -20,16 +20,16 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
 
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,20 +54,18 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
 {
     private static final Logger logger = LoggerFactory.getLogger(RangeTombstoneList.class);
 
-    public static final Serializer serializer = new Serializer();
-
-    private final Comparator<ByteBuffer> comparator;
+    private final Comparator<Composite> comparator;
 
     // Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could
     // use a List for starts and ends, but having arrays everywhere is almost simpler.
-    private ByteBuffer[] starts;
-    private ByteBuffer[] ends;
+    private Composite[] starts;
+    private Composite[] ends;
     private long[] markedAts;
     private int[] delTimes;
 
     private int size;
 
-    private RangeTombstoneList(Comparator<ByteBuffer> comparator, ByteBuffer[] starts, ByteBuffer[] ends, long[] markedAts, int[] delTimes, int size)
+    private RangeTombstoneList(Comparator<Composite> comparator, Composite[] starts, Composite[] ends, long[] markedAts, int[] delTimes, int size)
     {
         assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length;
         this.comparator = comparator;
@@ -78,9 +76,9 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
         this.size = size;
     }
 
-    public RangeTombstoneList(Comparator<ByteBuffer> comparator, int capacity)
+    public RangeTombstoneList(Comparator<Composite> comparator, int capacity)
     {
-        this(comparator, new ByteBuffer[capacity], new ByteBuffer[capacity], new long[capacity], new int[capacity], 0);
+        this(comparator, new Composite[capacity], new Composite[capacity], new long[capacity], new int[capacity], 0);
     }
 
     public boolean isEmpty()
@@ -93,7 +91,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
         return size;
     }
 
-    public Comparator<ByteBuffer> comparator()
+    public Comparator<Composite> comparator()
     {
         return comparator;
     }
@@ -119,7 +117,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
      * This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case), 
      * but it doesn't assume it.
      */
-    public void add(ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    public void add(Composite start, Composite end, long markedAt, int delTime)
     {
         if (isEmpty())
         {
@@ -205,7 +203,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
      * Returns whether the given name/timestamp pair is deleted by one of the tombstone
      * of this RangeTombstoneList.
      */
-    public boolean isDeleted(ByteBuffer name, long timestamp)
+    public boolean isDeleted(Composite name, long timestamp)
     {
         int idx = searchInternal(name);
         return idx >= 0 && markedAts[idx] >= timestamp;
@@ -223,12 +221,12 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
      * Returns the DeletionTime for the tombstone overlapping {@code name} (there can't be more than one),
      * or null if {@code name} is not covered by any tombstone.
      */
-    public DeletionTime search(ByteBuffer name) {
+    public DeletionTime search(Composite name) {
         int idx = searchInternal(name);
         return idx < 0 ? null : new DeletionTime(markedAts[idx], delTimes[idx]);
     }
 
-    private int searchInternal(ByteBuffer name)
+    private int searchInternal(Composite name)
     {
         if (isEmpty())
             return -1;
@@ -259,7 +257,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
         int dataSize = TypeSizes.NATIVE.sizeof(size);
         for (int i = 0; i < size; i++)
         {
-            dataSize += starts[i].remaining() + ends[i].remaining();
+            dataSize += starts[i].dataSize() + ends[i].dataSize();
             dataSize += TypeSizes.NATIVE.sizeof(markedAts[i]);
             dataSize += TypeSizes.NATIVE.sizeof(delTimes[i]);
         }
@@ -384,7 +382,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
      * in term of intervals for start:
      *    ends[i-1] <= start < ends[i]
      */
-    private void insertFrom(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    private void insertFrom(int i, Composite start, Composite end, long markedAt, int delTime)
     {
         while (i < size)
         {
@@ -490,7 +488,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
     /*
      * Adds the new tombstone at index i, growing and/or moving elements to make room for it.
      */
-    private void addInternal(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    private void addInternal(int i, Composite start, Composite end, long markedAt, int delTime)
     {
         assert i >= 0;
 
@@ -529,12 +527,12 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
         delTimes = grow(delTimes, size, newLength, i);
     }
 
-    private static ByteBuffer[] grow(ByteBuffer[] a, int size, int newLength, int i)
+    private static Composite[] grow(Composite[] a, int size, int newLength, int i)
     {
         if (i < 0 || i >= size)
             return Arrays.copyOf(a, newLength);
 
-        ByteBuffer[] newA = new ByteBuffer[newLength];
+        Composite[] newA = new Composite[newLength];
         System.arraycopy(a, 0, newA, 0, i);
         System.arraycopy(a, i, newA, i+1, size - i);
         return newA;
@@ -576,7 +574,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
         System.arraycopy(delTimes, i, delTimes, i+1, size - i);
     }
 
-    private void setInternal(int i, ByteBuffer start, ByteBuffer end, long markedAt, int delTime)
+    private void setInternal(int i, Composite start, Composite end, long markedAt, int delTime)
     {
         starts[i] = start;
         ends[i] = end;
@@ -586,7 +584,12 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
 
     public static class Serializer implements IVersionedSerializer<RangeTombstoneList>
     {
-        private Serializer() {}
+        private final CType type;
+
+        public Serializer(CType type)
+        {
+            this.type = type;
+        }
 
         public void serialize(RangeTombstoneList tombstones, DataOutput out, int version) throws IOException
         {
@@ -599,34 +602,25 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
             out.writeInt(tombstones.size);
             for (int i = 0; i < tombstones.size; i++)
             {
-                ByteBufferUtil.writeWithShortLength(tombstones.starts[i], out);
-                ByteBufferUtil.writeWithShortLength(tombstones.ends[i], out);
+                type.serializer().serialize(tombstones.starts[i], out);
+                type.serializer().serialize(tombstones.ends[i], out);
                 out.writeInt(tombstones.delTimes[i]);
                 out.writeLong(tombstones.markedAts[i]);
             }
         }
 
-        /*
-         * RangeTombstoneList depends on the column family comparator, but it is not serialized.
-         * Thus deserialize(DataInput, int, Comparator<ByteBuffer>) should be used instead of this method.
-         */
         public RangeTombstoneList deserialize(DataInput in, int version) throws IOException
         {
-            throw new UnsupportedOperationException();
-        }
-
-        public RangeTombstoneList deserialize(DataInput in, int version, Comparator<ByteBuffer> comparator) throws IOException
-        {
             int size = in.readInt();
             if (size == 0)
                 return null;
 
-            RangeTombstoneList tombstones = new RangeTombstoneList(comparator, size);
+            RangeTombstoneList tombstones = new RangeTombstoneList(type, size);
 
             for (int i = 0; i < size; i++)
             {
-                ByteBuffer start = ByteBufferUtil.readWithShortLength(in);
-                ByteBuffer end = ByteBufferUtil.readWithShortLength(in);
+                Composite start = type.serializer().deserialize(in);
+                Composite end = type.serializer().deserialize(in);
                 int delTime =  in.readInt();
                 long markedAt = in.readLong();
 
@@ -658,10 +652,8 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
             long size = typeSizes.sizeof(tombstones.size);
             for (int i = 0; i < tombstones.size; i++)
             {
-                int startSize = tombstones.starts[i].remaining();
-                size += typeSizes.sizeof((short)startSize) + startSize;
-                int endSize = tombstones.ends[i].remaining();
-                size += typeSizes.sizeof((short)endSize) + endSize;
+                size += type.serializer().serializedSize(tombstones.starts[i], typeSizes);
+                size += type.serializer().serializedSize(tombstones.ends[i], typeSizes);
                 size += typeSizes.sizeof(tombstones.delTimes[i]);
                 size += typeSizes.sizeof(tombstones.markedAts[i]);
             }
@@ -687,7 +679,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>
     {
         private int idx;
 
-        public boolean isDeleted(ByteBuffer name, long timestamp)
+        public boolean isDeleted(Composite name, long timestamp)
         {
             while (idx < size)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index cfc7cf1..3ddaae5 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -27,6 +27,8 @@ import java.util.List;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.util.FileUtils;
@@ -34,8 +36,6 @@ import org.apache.cassandra.utils.ObjectSizes;
 
 public class RowIndexEntry implements IMeasurableMemory
 {
-    public static final Serializer serializer = new Serializer();
-
     public final long position;
 
     public RowIndexEntry(long position)
@@ -43,12 +43,7 @@ public class RowIndexEntry implements IMeasurableMemory
         this.position = position;
     }
 
-    public int serializedSize()
-    {
-        return TypeSizes.NATIVE.sizeof(position) + promotedSize();
-    }
-
-    protected int promotedSize()
+    protected int promotedSize(CType type)
     {
         return 0;
     }
@@ -93,17 +88,25 @@ public class RowIndexEntry implements IMeasurableMemory
 
     public static class Serializer
     {
+        private final CType type;
+
+        public Serializer(CType type)
+        {
+            this.type = type;
+        }
+
         public void serialize(RowIndexEntry rie, DataOutput out) throws IOException
         {
             out.writeLong(rie.position);
-            out.writeInt(rie.promotedSize());
+            out.writeInt(rie.promotedSize(type));
 
             if (rie.isIndexed())
             {
                 DeletionTime.serializer.serialize(rie.deletionTime(), out);
                 out.writeInt(rie.columnsIndex().size());
+                ISerializer<IndexHelper.IndexInfo> idxSerializer = type.indexSerializer();
                 for (IndexHelper.IndexInfo info : rie.columnsIndex())
-                    info.serialize(out);
+                    idxSerializer.serialize(info, out);
             }
         }
 
@@ -117,9 +120,10 @@ public class RowIndexEntry implements IMeasurableMemory
                 DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
 
                 int entries = in.readInt();
+                ISerializer<IndexHelper.IndexInfo> idxSerializer = type.indexSerializer();
                 List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<IndexHelper.IndexInfo>(entries);
                 for (int i = 0; i < entries; i++)
-                    columnsIndex.add(IndexHelper.IndexInfo.deserialize(in));
+                    columnsIndex.add(idxSerializer.deserialize(in));
 
                 return new IndexedEntry(position, deletionTime, columnsIndex);
             }
@@ -129,13 +133,13 @@ public class RowIndexEntry implements IMeasurableMemory
             }
         }
 
-        public void skip(DataInput in) throws IOException
+        public static void skip(DataInput in) throws IOException
         {
             in.readLong();
             skipPromotedIndex(in);
         }
 
-        public void skipPromotedIndex(DataInput in) throws IOException
+        public static void skipPromotedIndex(DataInput in) throws IOException
         {
             int size = in.readInt();
             if (size <= 0)
@@ -143,6 +147,11 @@ public class RowIndexEntry implements IMeasurableMemory
 
             FileUtils.skipBytesFully(in, size);
         }
+
+        public int serializedSize(RowIndexEntry rie)
+        {
+            return TypeSizes.NATIVE.sizeof(rie.position) + rie.promotedSize(type);
+        }
     }
 
     /**
@@ -175,13 +184,14 @@ public class RowIndexEntry implements IMeasurableMemory
         }
 
         @Override
-        public int promotedSize()
+        public int promotedSize(CType type)
         {
             TypeSizes typeSizes = TypeSizes.NATIVE;
             long size = DeletionTime.serializer.serializedSize(deletionTime, typeSizes);
             size += typeSizes.sizeof(columnsIndex.size()); // number of entries
+            ISerializer<IndexHelper.IndexInfo> idxSerializer = type.indexSerializer();
             for (IndexHelper.IndexInfo info : columnsIndex)
-                size += info.serializedSize(typeSizes);
+                size += idxSerializer.serializedSize(info, typeSizes);
 
             return Ints.checkedCast(size);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index e9d177b..c2c1780 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -139,17 +141,17 @@ public class RowMutation implements IMutation
         return modifications.isEmpty();
     }
 
-    public void add(String cfName, ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive)
+    public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive)
     {
         addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
     }
 
-    public void addCounter(String cfName, ByteBuffer name, long value)
+    public void addCounter(String cfName, CellName name, long value)
     {
         addOrGet(cfName).addCounter(name, value);
     }
 
-    public void add(String cfName, ByteBuffer name, ByteBuffer value, long timestamp)
+    public void add(String cfName, CellName name, ByteBuffer value, long timestamp)
     {
         add(cfName, name, value, timestamp, 0);
     }
@@ -160,13 +162,13 @@ public class RowMutation implements IMutation
         addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
     }
 
-    public void delete(String cfName, ByteBuffer name, long timestamp)
+    public void delete(String cfName, CellName name, long timestamp)
     {
         int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
         addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
     }
 
-    public void deleteRange(String cfName, ByteBuffer start, ByteBuffer end, long timestamp)
+    public void deleteRange(String cfName, Composite start, Composite end, long timestamp)
     {
         int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
         addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
@@ -213,7 +215,7 @@ public class RowMutation implements IMutation
 
     public MessageOut<RowMutation> createMessage(MessagingService.Verb verb)
     {
-        return new MessageOut<RowMutation>(verb, this, serializer);
+        return new MessageOut<>(verb, this, serializer);
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 60487c8..78531f7 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -82,7 +82,9 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         ByteBufferUtil.writeWithShortLength(command.key, out);
         out.writeUTF(command.cfName);
         out.writeLong(cmd.timestamp);
-        NamesQueryFilter.serializer.serialize(command.filter, out, version);
+
+        CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
+        metadata.comparator.namesQueryFilterSerializer().serialize(command.filter, out, version);
     }
 
     public ReadCommand deserialize(DataInput in, int version) throws IOException
@@ -93,7 +95,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         String cfName = in.readUTF();
         long timestamp = in.readLong();
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
-        NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator);
+        NamesQueryFilter filter = metadata.comparator.namesQueryFilterSerializer().deserialize(in, version);
         ReadCommand command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter);
         command.setDigestQuery(isDigest);
         return command;
@@ -106,11 +108,13 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         int size = sizes.sizeof(command.isDigestQuery());
         int keySize = command.key.remaining();
 
+        CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
+
         size += sizes.sizeof(command.ksName);
         size += sizes.sizeof((short)keySize) + keySize;
         size += sizes.sizeof(command.cfName);
         size += sizes.sizeof(cmd.timestamp);
-        size += NamesQueryFilter.serializer.serializedSize(command.filter, version);
+        size += metadata.comparator.namesQueryFilterSerializer().serializedSize(command.filter, version);
 
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 72de2ca..f6ff89a 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -24,6 +24,8 @@ import java.nio.ByteBuffer;
 
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
@@ -136,7 +138,8 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         ByteBufferUtil.writeWithShortLength(realRM.key, out);
         out.writeUTF(realRM.cfName);
         out.writeLong(realRM.timestamp);
-        SliceQueryFilter.serializer.serialize(realRM.filter, out, version);
+        CFMetaData metadata = Schema.instance.getCFMetaData(realRM.ksName, realRM.cfName);
+        metadata.comparator.sliceQueryFilterSerializer().serialize(realRM.filter, out, version);
     }
 
     public ReadCommand deserialize(DataInput in, int version) throws IOException
@@ -146,7 +149,8 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
         String cfName = in.readUTF();
         long timestamp = in.readLong();
-        SliceQueryFilter filter = SliceQueryFilter.serializer.deserialize(in, version);
+        CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
+        SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
         ReadCommand command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
         command.setDigestQuery(isDigest);
         return command;
@@ -158,12 +162,14 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         SliceFromReadCommand command = (SliceFromReadCommand) cmd;
         int keySize = command.key.remaining();
 
+        CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
+
         int size = sizes.sizeof(cmd.isDigestQuery()); // boolean
         size += sizes.sizeof(command.ksName);
         size += sizes.sizeof((short) keySize) + keySize;
         size += sizes.sizeof(command.cfName);
         size += sizes.sizeof(cmd.timestamp);
-        size += SliceQueryFilter.serializer.serializedSize(command.filter, version);
+        size += metadata.comparator.sliceQueryFilterSerializer().serializedSize(command.filter, version);
 
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
index 513db38..402dc7e 100644
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -31,16 +31,16 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SuperColumns
 {
-    public static Iterator<OnDiskAtom> onDiskIterator(DataInput in, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore)
+    public static Iterator<OnDiskAtom> onDiskIterator(DataInput in, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore, CellNameType type)
     {
-        return new SCIterator(in, superColumnCount, flag, expireBefore);
+        return new SCIterator(in, superColumnCount, flag, expireBefore, type);
     }
 
     public static void serializeSuperColumnFamily(ColumnFamily scf, DataOutput out, int version) throws IOException
@@ -55,44 +55,48 @@ public class SuperColumns
          *   subcolumns range deletions).
          */
         DeletionInfo delInfo = scf.deletionInfo();
-        Map<ByteBuffer, List<Column>> scMap = groupSuperColumns(scf);
+        Map<CellName, List<Column>> scMap = groupSuperColumns(scf);
 
         // Actually Serialize
-        DeletionInfo.serializer().serialize(new DeletionInfo(delInfo.getTopLevelDeletion()), out, version);
+        scf.getComparator().deletionInfoSerializer().serialize(new DeletionInfo(delInfo.getTopLevelDeletion()), out, version);
         out.writeInt(scMap.size());
 
-        for (Map.Entry<ByteBuffer, List<Column>> entry : scMap.entrySet())
+        CellNameType subComparator = subType(scf.getComparator());
+        for (Map.Entry<CellName, List<Column>> entry : scMap.entrySet())
         {
-            ByteBufferUtil.writeWithShortLength(entry.getKey(), out);
+            scf.getComparator().cellSerializer().serialize(entry.getKey(), out);
 
             DeletionTime delTime = delInfo.rangeCovering(entry.getKey());
             DeletionInfo scDelInfo = delTime == null ? DeletionInfo.live() : new DeletionInfo(delTime);
             DeletionTime.serializer.serialize(scDelInfo.getTopLevelDeletion(), out);
 
             out.writeInt(entry.getValue().size());
+            ColumnSerializer serializer = subComparator.columnSerializer();
             for (Column subColumn : entry.getValue())
-                Column.serializer.serialize(subColumn, out);
+                serializer.serialize(subColumn, out);
         }
     }
 
-    private static Map<ByteBuffer, List<Column>> groupSuperColumns(ColumnFamily scf)
+    private static Map<CellName, List<Column>> groupSuperColumns(ColumnFamily scf)
     {
-        CompositeType type = (CompositeType)scf.getComparator();
+        CellNameType type = scf.getComparator();
         // The order of insertion matters!
-        Map<ByteBuffer, List<Column>> scMap = new LinkedHashMap<ByteBuffer, List<Column>>();
+        Map<CellName, List<Column>> scMap = new LinkedHashMap<>();
 
-        ByteBuffer scName = null;
+        CellName scName = null;
         List<Column> subColumns = null;
+        CellNameType scType = scType(type);
+        CellNameType subType = subType(type);
         for (Column column : scf)
         {
-            ByteBuffer newScName = scName(column.name());
-            ByteBuffer newSubName = subName(column.name());
+            CellName newScName = scType.makeCellName(scName(column.name()));
+            CellName newSubName = subType.makeCellName(subName(column.name()));
 
-            if (scName == null || type.types.get(0).compare(scName, newScName) != 0)
+            if (scName == null || scType.compare(scName, newScName) != 0)
             {
                 // new super column
                 scName = newScName;
-                subColumns = new ArrayList<Column>();
+                subColumns = new ArrayList<>();
                 scMap.put(scName, subColumns);
             }
 
@@ -104,25 +108,28 @@ public class SuperColumns
     public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int version) throws IOException
     {
         // Note that there was no way to insert a range tombstone in a SCF in 1.2
-        cf.delete(DeletionInfo.serializer().deserialize(in, version, cf.getComparator()));
+        cf.delete(cf.getComparator().deletionInfoSerializer().deserialize(in, version));
         assert !cf.deletionInfo().rangeIterator().hasNext();
 
-        Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, Integer.MIN_VALUE);
+        Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, Integer.MIN_VALUE, cf.getComparator());
         while (iter.hasNext())
             cf.addAtom(iter.next());
     }
 
     public static long serializedSize(ColumnFamily scf, TypeSizes typeSizes, int version)
     {
-        Map<ByteBuffer, List<Column>> scMap = groupSuperColumns(scf);
+        Map<CellName, List<Column>> scMap = groupSuperColumns(scf);
         DeletionInfo delInfo = scf.deletionInfo();
 
         // Actually Serialize
-        long size = DeletionInfo.serializer().serializedSize(new DeletionInfo(delInfo.getTopLevelDeletion()), version);
-        for (Map.Entry<ByteBuffer, List<Column>> entry : scMap.entrySet())
+        long size = scType(scf.getComparator()).deletionInfoSerializer().serializedSize(new DeletionInfo(delInfo.getTopLevelDeletion()), version);
+
+        CellNameType scType = scType(scf.getComparator());
+        CellNameType subType = subType(scf.getComparator());
+        ColumnSerializer colSer = subType.columnSerializer();
+        for (Map.Entry<CellName, List<Column>> entry : scMap.entrySet())
         {
-            int nameSize = entry.getKey().remaining();
-            size += typeSizes.sizeof((short) nameSize) + nameSize;
+            size += scType.cellSerializer().serializedSize(entry.getKey(), typeSizes);
 
             DeletionTime delTime = delInfo.rangeCovering(entry.getKey());
             DeletionInfo scDelInfo = delTime == null ? DeletionInfo.live() : new DeletionInfo(delTime);
@@ -130,7 +137,7 @@ public class SuperColumns
 
             size += typeSizes.sizeof(entry.getValue().size());
             for (Column subColumn : entry.getValue())
-                size += Column.serializer.serializedSize(subColumn, typeSizes);
+                size += colSer.serializedSize(subColumn, typeSizes);
         }
         return size;
     }
@@ -143,16 +150,19 @@ public class SuperColumns
         private final ColumnSerializer.Flag flag;
         private final int expireBefore;
 
+        private final CellNameType type;
+
         private int read;
         private ByteBuffer scName;
         private Iterator<Column> subColumnsIterator;
 
-        private SCIterator(DataInput in, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore)
+        private SCIterator(DataInput in, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore, CellNameType type)
         {
             this.in = in;
             this.scCount = superColumnCount;
             this.flag = flag;
             this.expireBefore = expireBefore;
+            this.type = type;
         }
 
         public boolean hasNext()
@@ -167,7 +177,7 @@ public class SuperColumns
                 if (subColumnsIterator != null && subColumnsIterator.hasNext())
                 {
                     Column c = subColumnsIterator.next();
-                    return c.withUpdatedName(CompositeType.build(scName, c.name()));
+                    return c.withUpdatedName(type.makeCellName(scName, c.name().toByteBuffer()));
                 }
 
                 // Read one more super column
@@ -175,14 +185,14 @@ public class SuperColumns
 
                 scName = ByteBufferUtil.readWithShortLength(in);
                 DeletionInfo delInfo = new DeletionInfo(DeletionTime.serializer.deserialize(in));
-                assert !delInfo.rangeIterator().hasNext(); // We assume no range tombstone (there was no way to insert some in a SCF in 1.2)
 
                 /* read the number of columns */
                 int size = in.readInt();
-                List<Column> subColumns = new ArrayList<Column>(size);
+                List<Column> subColumns = new ArrayList<>(size);
 
+                ColumnSerializer colSer = subType(type).columnSerializer();
                 for (int i = 0; i < size; ++i)
-                    subColumns.add(Column.serializer.deserialize(in, flag, expireBefore));
+                    subColumns.add(colSer.deserialize(in, flag, expireBefore));
 
                 subColumnsIterator = subColumns.iterator();
 
@@ -205,6 +215,16 @@ public class SuperColumns
         }
     }
 
+    private static CellNameType scType(CellNameType type)
+    {
+        return new SimpleDenseCellNameType(type.subtype(0));
+    }
+
+    private static CellNameType subType(CellNameType type)
+    {
+        return new SimpleDenseCellNameType(type.subtype(1));
+    }
+
     public static AbstractType<?> getComparatorFor(CFMetaData metadata, ByteBuffer superColumn)
     {
         return getComparatorFor(metadata, superColumn != null);
@@ -213,44 +233,33 @@ public class SuperColumns
     public static AbstractType<?> getComparatorFor(CFMetaData metadata, boolean subColumn)
     {
         return metadata.isSuper()
-             ? ((CompositeType)metadata.comparator).types.get(subColumn ? 1 : 0)
-             : metadata.comparator;
+             ? metadata.comparator.subtype(subColumn ? 1 : 0)
+             : metadata.comparator.asAbstractType();
     }
 
     // Extract the first component of a columnName, i.e. the super column name
-    public static ByteBuffer scName(ByteBuffer columnName)
+    public static ByteBuffer scName(Composite columnName)
     {
-        return CompositeType.extractComponent(columnName, 0);
+        return columnName.get(0);
     }
 
     // Extract the 2nd component of a columnName, i.e. the sub-column name
-    public static ByteBuffer subName(ByteBuffer columnName)
+    public static ByteBuffer subName(Composite columnName)
     {
-        return CompositeType.extractComponent(columnName, 1);
+        return columnName.get(1);
     }
 
-    // We don't use CompositeType.Builder mostly because we want to avoid having to provide the comparator.
-    public static ByteBuffer startOf(ByteBuffer scName)
+    public static Composite startOf(ByteBuffer scName)
     {
-        int length = scName.remaining();
-        ByteBuffer bb = ByteBuffer.allocate(2 + length + 1);
-
-        bb.put((byte) ((length >> 8) & 0xFF));
-        bb.put((byte) (length & 0xFF));
-        bb.put(scName.duplicate());
-        bb.put((byte) 0);
-        bb.flip();
-        return bb;
+        return CellNames.compositeDense(scName).start();
     }
 
-    public static ByteBuffer endOf(ByteBuffer scName)
+    public static Composite endOf(ByteBuffer scName)
     {
-        ByteBuffer bb = startOf(scName);
-        bb.put(bb.remaining() - 1, (byte)1);
-        return bb;
+        return CellNames.compositeDense(scName).end();
     }
 
-    public static SCFilter filterToSC(CompositeType type, IDiskAtomFilter filter)
+    public static SCFilter filterToSC(CellNameType type, IDiskAtomFilter filter)
     {
         if (filter instanceof NamesQueryFilter)
             return namesFilterToSC(type, (NamesQueryFilter)filter);
@@ -258,11 +267,12 @@ public class SuperColumns
             return sliceFilterToSC(type, (SliceQueryFilter)filter);
     }
 
-    public static SCFilter namesFilterToSC(CompositeType type, NamesQueryFilter filter)
+    public static SCFilter namesFilterToSC(CellNameType type, NamesQueryFilter filter)
     {
         ByteBuffer scName = null;
-        SortedSet<ByteBuffer> newColumns = new TreeSet<ByteBuffer>(filter.columns.comparator());
-        for (ByteBuffer name : filter.columns)
+        CellNameType subComparator = subType(type);
+        SortedSet<CellName> newColumns = new TreeSet<CellName>(subComparator);
+        for (CellName name : filter.columns)
         {
             ByteBuffer newScName = scName(name);
 
@@ -270,18 +280,23 @@ public class SuperColumns
             {
                 scName = newScName;
             }
-            else if (type.types.get(0).compare(scName, newScName) != 0)
+            else if (type.subtype(0).compare(scName, newScName) != 0)
             {
                 // If we're selecting column across multiple SC, it's not something we can translate for an old node
                 throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
             }
 
-            newColumns.add(subName(name));
+            newColumns.add(subComparator.makeCellName(name));
         }
         return new SCFilter(scName, new NamesQueryFilter(newColumns));
     }
 
-    public static SCFilter sliceFilterToSC(CompositeType type, SliceQueryFilter filter)
+    private static boolean isEndOfRange(Composite c)
+    {
+        return c.eoc() == Composite.EOC.END;
+    }
+
+    public static SCFilter sliceFilterToSC(CellNameType type, SliceQueryFilter filter)
     {
         /*
          * There is 3 main cases that we can translate back into super column
@@ -300,64 +315,67 @@ public class SuperColumns
         boolean reversed = filter.reversed;
         if (filter.slices.length == 1)
         {
-            ByteBuffer start = filter.slices[0].start;
-            ByteBuffer finish = filter.slices[0].start;
+            Composite start = filter.slices[0].start;
+            Composite finish = filter.slices[0].start;
 
             if (filter.compositesToGroup == 1)
             {
                 // Note: all the resulting filter must have compositeToGroup == 0 because this
                 // make no sense for super column on the destination node otherwise
-                if (start.remaining() == 0)
+                if (start.isEmpty())
                 {
-                    if (finish.remaining() == 0)
+                    if (finish.isEmpty())
                         // An 'IdentityFilter', keep as is (except for the compositeToGroup)
                         return new SCFilter(null, new SliceQueryFilter(filter.start(), filter.finish(), reversed, filter.count));
 
                     if (subName(finish) == null
-                            && ((!reversed && !firstEndOfComponent(finish)) || (reversed && firstEndOfComponent(finish))))
-                        return new SCFilter(null, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, scName(finish), reversed, filter.count));
+                            && ((!reversed && !isEndOfRange(finish)) || (reversed && isEndOfRange(finish))))
+                        return new SCFilter(null, new SliceQueryFilter(Composites.EMPTY, CellNames.simpleDense(scName(finish)), reversed, filter.count));
                 }
-                else if (finish.remaining() == 0)
+                else if (finish.isEmpty())
                 {
                     if (subName(start) == null
-                            && ((!reversed && firstEndOfComponent(start)) || (reversed && !firstEndOfComponent(start))))
-                        return new SCFilter(null, new SliceQueryFilter(scName(start), ByteBufferUtil.EMPTY_BYTE_BUFFER, reversed, filter.count));
+                            && ((!reversed && isEndOfRange(start)) || (reversed && !isEndOfRange(start))))
+                        return new SCFilter(null, new SliceQueryFilter(CellNames.simpleDense(scName(start)), Composites.EMPTY, reversed, filter.count));
                 }
                 else if (subName(start) == null && subName(finish) == null
-                        && ((   reversed && !firstEndOfComponent(start) &&  firstEndOfComponent(finish))
-                            || (!reversed &&  firstEndOfComponent(start) && !firstEndOfComponent(finish))))
+                        && ((   reversed && !isEndOfRange(start) && isEndOfRange(finish))
+                            || (!reversed &&  isEndOfRange(start) && !isEndOfRange(finish))))
                 {
                     // A slice of supercolumns
-                    return new SCFilter(null, new SliceQueryFilter(scName(start), scName(finish), reversed, filter.count));
+                    return new SCFilter(null, new SliceQueryFilter(CellNames.simpleDense(scName(start)),
+                                                                   CellNames.simpleDense(scName(finish)),
+                                                                   reversed,
+                                                                   filter.count));
                 }
             }
-            else if (filter.compositesToGroup == 0 && type.types.get(0).compare(scName(start), scName(finish)) == 0)
+            else if (filter.compositesToGroup == 0 && type.subtype(0).compare(scName(start), scName(finish)) == 0)
             {
                 // A slice of subcolumns
-                return new SCFilter(scName(start), filter.withUpdatedSlice(subName(start), subName(finish)));
+                return new SCFilter(scName(start), filter.withUpdatedSlice(CellNames.simpleDense(subName(start)), CellNames.simpleDense(subName(finish))));
             }
         }
         else if (!reversed)
         {
-            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(type.types.get(0));
+            SortedSet<CellName> columns = new TreeSet<CellName>(scType(type));
             for (int i = 0; i < filter.slices.length; ++i)
             {
-                ByteBuffer start = filter.slices[i].start;
-                ByteBuffer finish = filter.slices[i].finish;
+                Composite start = filter.slices[i].start;
+                Composite finish = filter.slices[i].finish;
 
                 if (subName(start) != null || subName(finish) != null
-                  || type.types.get(0).compare(scName(start), scName(finish)) != 0
-                  || firstEndOfComponent(start) || !firstEndOfComponent(finish))
+                  || type.subtype(0).compare(scName(start), scName(finish)) != 0
+                  || isEndOfRange(start) || !isEndOfRange(finish))
                     throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
 
-                columns.add(scName(start));
+                columns.add(CellNames.simpleDense(scName(start)));
             }
             return new SCFilter(null, new NamesQueryFilter(columns));
         }
         throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
     }
 
-    public static IDiskAtomFilter fromSCFilter(CompositeType type, ByteBuffer scName, IDiskAtomFilter filter)
+    public static IDiskAtomFilter fromSCFilter(CellNameType type, ByteBuffer scName, IDiskAtomFilter filter)
     {
         if (filter instanceof NamesQueryFilter)
             return fromSCNamesFilter(type, scName, (NamesQueryFilter)filter);
@@ -365,65 +383,55 @@ public class SuperColumns
             return fromSCSliceFilter(type, scName, (SliceQueryFilter)filter);
     }
 
-    public static IDiskAtomFilter fromSCNamesFilter(CompositeType type, ByteBuffer scName, NamesQueryFilter filter)
+    public static IDiskAtomFilter fromSCNamesFilter(CellNameType type, ByteBuffer scName, NamesQueryFilter filter)
     {
         if (scName == null)
         {
             ColumnSlice[] slices = new ColumnSlice[filter.columns.size()];
             int i = 0;
-            for (ByteBuffer bb : filter.columns)
+            for (CellName name : filter.columns)
             {
-                CompositeType.Builder builder = type.builder().add(bb);
-                slices[i++] = new ColumnSlice(builder.build(), builder.buildAsEndOfRange());
+                slices[i++] = name.slice();
             }
             return new SliceQueryFilter(slices, false, slices.length, 1);
         }
         else
         {
-            SortedSet<ByteBuffer> newColumns = new TreeSet<ByteBuffer>(type);
-            for (ByteBuffer c : filter.columns)
-                newColumns.add(CompositeType.build(scName, c));
+            SortedSet<CellName> newColumns = new TreeSet<CellName>(type);
+            for (CellName c : filter.columns)
+                newColumns.add(type.makeCellName(scName, c.toByteBuffer()));
             return filter.withUpdatedColumns(newColumns);
         }
     }
 
-    public static SliceQueryFilter fromSCSliceFilter(CompositeType type, ByteBuffer scName, SliceQueryFilter filter)
+    public static SliceQueryFilter fromSCSliceFilter(CellNameType type, ByteBuffer scName, SliceQueryFilter filter)
     {
         assert filter.slices.length == 1;
         if (scName == null)
         {
-            ByteBuffer start = filter.start().remaining() == 0
-                             ? filter.start()
-                             : (filter.reversed ? type.builder().add(filter.start()).buildAsEndOfRange()
-                                                : type.builder().add(filter.start()).build());
-            ByteBuffer finish = filter.finish().remaining() == 0
-                              ? filter.finish()
-                              : (filter.reversed ? type.builder().add(filter.finish()).build()
-                                                 : type.builder().add(filter.finish()).buildAsEndOfRange());
+            // The filter is on the super column name
+            CBuilder builder = type.builder();
+            Composite start = filter.start().isEmpty()
+                            ? Composites.EMPTY
+                            : builder.buildWith(filter.start().toByteBuffer()).withEOC(filter.reversed ? Composite.EOC.END : Composite.EOC.START);
+            Composite finish = filter.finish().isEmpty()
+                             ? Composites.EMPTY
+                             : builder.buildWith(filter.finish().toByteBuffer()).withEOC(filter.reversed ? Composite.EOC.START : Composite.EOC.END);
             return new SliceQueryFilter(start, finish, filter.reversed, filter.count, 1);
         }
         else
         {
-            CompositeType.Builder builder = type.builder().add(scName);
-            ByteBuffer start = filter.start().remaining() == 0
-                             ? filter.reversed ? builder.buildAsEndOfRange() : builder.build()
-                             : builder.copy().add(filter.start()).build();
-            ByteBuffer end = filter.finish().remaining() == 0
-                             ? filter.reversed ? builder.build() : builder.buildAsEndOfRange()
-                             : builder.add(filter.finish()).build();
+            CBuilder builder = type.builder().add(scName);
+            Composite start = filter.start().isEmpty()
+                            ? builder.build().withEOC(filter.reversed ? Composite.EOC.END : Composite.EOC.START)
+                            : builder.buildWith(filter.start().toByteBuffer());
+            Composite end = filter.finish().isEmpty()
+                          ? builder.build().withEOC(filter.reversed ? Composite.EOC.START : Composite.EOC.END)
+                          : builder.buildWith(filter.finish().toByteBuffer());
             return new SliceQueryFilter(start, end, filter.reversed, filter.count);
         }
     }
 
-    private static boolean firstEndOfComponent(ByteBuffer bb)
-    {
-        bb = bb.duplicate();
-        int length = (bb.get() & 0xFF) << 8;
-        length |= (bb.get() & 0xFF);
-
-        return bb.get(length + 2) == 1;
-    }
-
     public static class SCFilter
     {
         public final ByteBuffer scName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1ecb0e4..e650d57 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -31,6 +31,8 @@ import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.transport.Server;
 import org.apache.commons.lang3.StringUtils;
@@ -586,7 +588,7 @@ public class SystemKeyspace
         ColumnFamilyStore cfs = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
         QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
                                                         INDEX_CF,
-                                                        ByteBufferUtil.bytes(indexName),
+                                                        FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()),
                                                         System.currentTimeMillis());
         return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
     }
@@ -594,7 +596,7 @@ public class SystemKeyspace
     public static void setIndexBuilt(String keyspaceName, String indexName)
     {
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, INDEX_CF);
-        cf.addColumn(new Column(ByteBufferUtil.bytes(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
+        cf.addColumn(new Column(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
         RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf);
         rm.apply();
     }
@@ -602,7 +604,7 @@ public class SystemKeyspace
     public static void setIndexRemoved(String keyspaceName, String indexName)
     {
         RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
-        rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros());
+        rm.delete(INDEX_CF, CFMetaData.IndexCf.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
         rm.apply();
     }
 
@@ -650,14 +652,14 @@ public class SystemKeyspace
         // Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one)
         QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY),
                                                         COUNTER_ID_CF,
-                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                        Composites.EMPTY,
+                                                        Composites.EMPTY,
                                                         true,
                                                         1,
                                                         System.currentTimeMillis());
         ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
         if (cf != null && cf.getColumnCount() != 0)
-            return CounterId.wrap(cf.iterator().next().name());
+            return CounterId.wrap(cf.iterator().next().name().toByteBuffer());
         else
             return null;
     }
@@ -673,7 +675,7 @@ public class SystemKeyspace
         ByteBuffer ip = ByteBuffer.wrap(FBUtilities.getBroadcastAddress().getAddress());
 
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, COUNTER_ID_CF);
-        cf.addColumn(new Column(newCounterId.bytes(), ip, now));
+        cf.addColumn(new Column(cf.getComparator().makeCellName(newCounterId.bytes()), ip, now));
         RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf);
         rm.apply();
         forceBlockingFlush(COUNTER_ID_CF);
@@ -695,7 +697,7 @@ public class SystemKeyspace
 
             // this will ignore the last column on purpose since it is the
             // current local node id
-            previous = CounterId.wrap(c.name());
+            previous = CounterId.wrap(c.name().toByteBuffer());
         }
         return l;
     }
@@ -799,9 +801,10 @@ public class SystemKeyspace
     {
         DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
         ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
+        Composite prefix = schemaCFS.getComparator().make(cfName);
         ColumnFamily cf = schemaCFS.getColumnFamily(key,
-                                                    DefsTables.searchComposite(cfName, true),
-                                                    DefsTables.searchComposite(cfName, false),
+                                                    prefix,
+                                                    prefix.end(),
                                                     false,
                                                     Integer.MAX_VALUE,
                                                     System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
index 466833b..597cae6 100644
--- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.SortedMap;
@@ -27,13 +26,14 @@ import java.util.TreeMap;
 import com.google.common.base.Function;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Allocator;
 
 public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumns
 {
-    private final TreeMap<ByteBuffer, Column> map;
+    private final TreeMap<CellName, Column> map;
 
     public static final ColumnFamily.Factory<TreeMapBackedSortedColumns> factory = new Factory<TreeMapBackedSortedColumns>()
     {
@@ -44,21 +44,21 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         }
     };
 
-    public AbstractType<?> getComparator()
+    public CellNameType getComparator()
     {
-        return (AbstractType<?>)map.comparator();
+        return (CellNameType)map.comparator();
     }
 
     private TreeMapBackedSortedColumns(CFMetaData metadata)
     {
         super(metadata);
-        this.map = new TreeMap<ByteBuffer, Column>(metadata.comparator);
+        this.map = new TreeMap<>(metadata.comparator);
     }
 
-    private TreeMapBackedSortedColumns(CFMetaData metadata, SortedMap<ByteBuffer, Column> columns)
+    private TreeMapBackedSortedColumns(CFMetaData metadata, SortedMap<CellName, Column> columns)
     {
         super(metadata);
-        this.map = new TreeMap<ByteBuffer, Column>(columns);
+        this.map = new TreeMap<>(columns);
     }
 
     public ColumnFamily.Factory getFactory()
@@ -82,7 +82,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
     */
     public void addColumn(Column column, Allocator allocator)
     {
-        ByteBuffer name = column.name();
+        CellName name = column.name();
         // this is a slightly unusual way to structure this; a more natural way is shown in ThreadSafeSortedColumns,
         // but TreeMap lacks putAbsent.  Rather than split it into a "get, then put" check, we do it as follows,
         // which saves the extra "get" in the no-conflict case [for both normal and super columns],
@@ -128,7 +128,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         return true;
     }
 
-    public Column getColumn(ByteBuffer name)
+    public Column getColumn(CellName name)
     {
         return map.get(name);
     }
@@ -154,7 +154,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         return map.descendingMap().values();
     }
 
-    public SortedSet<ByteBuffer> getColumnNames()
+    public SortedSet<CellName> getColumnNames()
     {
         return map.navigableKeySet();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/UnsortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnsortedColumns.java b/src/java/org/apache/cassandra/db/UnsortedColumns.java
index 2b33cd0..d6520b0 100644
--- a/src/java/org/apache/cassandra/db/UnsortedColumns.java
+++ b/src/java/org/apache/cassandra/db/UnsortedColumns.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -26,6 +25,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.utils.Allocator;
 
@@ -101,16 +101,16 @@ public class UnsortedColumns extends AbstractThreadUnsafeSortedColumns
         throw new UnsupportedOperationException();
     }
 
-    public Column getColumn(ByteBuffer name)
+    public Column getColumn(CellName name)
     {
         throw new UnsupportedOperationException();
     }
 
-    public Iterable<ByteBuffer> getColumnNames()
+    public Iterable<CellName> getColumnNames()
     {
-        return Iterables.transform(columns, new Function<Column, ByteBuffer>()
+        return Iterables.transform(columns, new Function<Column, CellName>()
         {
-            public ByteBuffer apply(Column column)
+            public CellName apply(Column column)
             {
                 return column.name;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java b/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
index 828f7e5..7185eef 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
@@ -17,8 +17,8 @@
  */
 package org.apache.cassandra.db.columniterator;
 
+import org.apache.cassandra.db.composites.Composites;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class IdentityQueryFilter extends SliceQueryFilter
 {
@@ -27,7 +27,7 @@ public class IdentityQueryFilter extends SliceQueryFilter
      */
     public IdentityQueryFilter()
     {
-        super(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
+        super(Composites.EMPTY, Composites.EMPTY, false, Integer.MAX_VALUE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index 036d0cf..8715a90 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -18,17 +18,16 @@
 package org.apache.cassandra.db.columniterator;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
@@ -54,7 +53,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
     private final ColumnSlice[] slices;
     private final BlockFetcher fetcher;
     private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>();
-    private final AbstractType<?> comparator;
+    private final CellNameType comparator;
 
     // Holds range tombstone in reverse queries. See addColumn()
     private final Deque<OnDiskAtom> rangeTombstonesReversed;
@@ -191,7 +190,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
         /*
          * Return the smallest key selected by the current ColumnSlice.
          */
-        protected ByteBuffer currentStart()
+        protected Composite currentStart()
         {
             return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start;
         }
@@ -199,7 +198,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
         /*
          * Return the biggest key selected by the current ColumnSlice.
          */
-        protected ByteBuffer currentFinish()
+        protected Composite currentFinish()
         {
             return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish;
         }
@@ -213,22 +212,22 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             return isBeforeSliceStart(column.name());
         }
 
-        protected boolean isBeforeSliceStart(ByteBuffer name)
+        protected boolean isBeforeSliceStart(Composite name)
         {
-            ByteBuffer start = currentStart();
-            return start.remaining() != 0 && comparator.compare(name, start) < 0;
+            Composite start = currentStart();
+            return !start.isEmpty() && comparator.compare(name, start) < 0;
         }
 
         protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
         {
-            ByteBuffer finish = currentFinish();
-            return finish.remaining() == 0 || comparator.compare(column.name(), finish) <= 0;
+            Composite finish = currentFinish();
+            return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0;
         }
 
-        protected boolean isAfterSliceFinish(ByteBuffer name)
+        protected boolean isAfterSliceFinish(Composite name)
         {
-            ByteBuffer finish = currentFinish();
-            return finish.remaining() != 0 && comparator.compare(name, finish) > 0;
+            Composite finish = currentFinish();
+            return !finish.isEmpty() && comparator.compare(name, finish) > 0;
         }
     }
 
@@ -361,9 +360,8 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             if (file == null)
                 file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput;
 
-            // Give a bogus atom count since we'll deserialize as long as we're
-            // within the index block but we don't know how much atom is there
-            Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, Integer.MAX_VALUE, sstable.descriptor.version);
+            AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
+
             file.seek(positionToSeek);
             FileMark mark = file.mark();
 
@@ -371,63 +369,65 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             boolean inSlice = false;
 
             // scan from index start
-            OnDiskAtom column = null;
-            while (file.bytesPastMark(mark) < currentIndex.width || column != null)
+            while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed())
             {
-                // Only fetch a new column if we haven't dealt with the previous one.
-                if (column == null)
-                    column = atomIterator.next();
-
                 // col is before slice
                 // (If in slice, don't bother checking that until we change slice)
-                if (!inSlice && isColumnBeforeSliceStart(column))
+                Composite start = currentStart();
+                if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
                 {
                     if (reversed)
                     {
                         // the next slice select columns that are before the current one, so it may
                         // match this column, so keep it around.
-                        prefetched.addFirst(column);
+                        prefetched.addFirst(deserializer.readNext());
+                    }
+                    else
+                    {
+                        deserializer.skipNext();
                     }
-                    column = null;
                 }
                 // col is within slice
-                else if (isColumnBeforeSliceFinish(column))
-                {
-                    inSlice = true;
-                    addColumn(column);
-                    column = null;
-                }
-                // col is after slice.
                 else
                 {
-                    // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice.
-                    // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous
-                    // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first
-                    // columns of the block were not part of the current slice, i.e. if we have columns in prefetched.
-                    if (reversed && prefetched.isEmpty())
-                        break;
+                    Composite finish = currentFinish();
+                    if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
+                    {
+                        inSlice = true;
+                        addColumn(deserializer.readNext());
+                    }
+                    // col is after slice.
+                    else
+                    {
+                        // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice.
+                        // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous
+                        // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first
+                        // columns of the block were not part of the current slice, i.e. if we have columns in prefetched.
+                        if (reversed && prefetched.isEmpty())
+                            break;
 
-                    if (!setNextSlice())
-                        break;
+                        if (!setNextSlice())
+                            break;
 
-                    inSlice = false;
+                        inSlice = false;
 
-                    // The next index block now corresponds to the first block that may have columns for the newly set slice.
-                    // So if it's different from the current block, we're done with this block. And in that case, we know
-                    // that our prefetched columns won't match.
-                    if (nextIndexIdx != lastDeserializedBlock)
-                    {
+                        // The next index block now corresponds to the first block that may have columns for the newly set slice.
+                        // So if it's different from the current block, we're done with this block. And in that case, we know
+                        // that our prefetched columns won't match.
+                        if (nextIndexIdx != lastDeserializedBlock)
+                        {
+                            if (reversed)
+                                prefetched.clear();
+                            break;
+                        }
+
+                        // Even if the next slice may have column in this blocks, if we're reversed, those columns have been
+                        // prefetched and we're done with that block
                         if (reversed)
-                            prefetched.clear();
-                        break;
-                    }
-
-                    // Even if the next slice may have column in this blocks, if we're reversed, those columns have been
-                    // prefetched and we're done with that block
-                    if (reversed)
-                        break;
+                            break;
 
-                    // otherwise, we will deal with that column at the next iteration
+                        // otherwise, we will deal with that column at the next iteration
+                    }
                 }
             }
             return true;
@@ -446,28 +446,27 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             boolean inSlice = false;
 
             int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
-            Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
-            OnDiskAtom column = null;
-            while (atomIterator.hasNext() || column != null)
+            AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
+            int deserialized = 0;
+            while (deserializer.hasNext() && deserialized < columnCount)
             {
-                // Only fetch a new column if we haven't dealt with the previous one.
-                if (column == null)
-                    column = atomIterator.next();
-
                 // col is before slice
                 // (If in slice, don't bother checking that until we change slice)
-                if (!inSlice && isColumnBeforeSliceStart(column))
+                Composite start = currentStart();
+                if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
                 {
-                    column = null;
+                    deserializer.skipNext();
+                    ++deserialized;
                     continue;
                 }
 
                 // col is within slice
-                if (isColumnBeforeSliceFinish(column))
+                Composite finish = currentFinish();
+                if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
                 {
                     inSlice = true;
-                    addColumn(column);
-                    column = null;
+                    addColumn(deserializer.readNext());
+                    ++deserialized;
                 }
                 // col is after slice. more slices?
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index 3467244..41b95f9 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -18,17 +18,14 @@
 package org.apache.cassandra.db.columniterator;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
+import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -43,10 +40,10 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement
     private final SSTableReader sstable;
     private FileDataInput fileToClose;
     private Iterator<OnDiskAtom> iter;
-    public final SortedSet<ByteBuffer> columns;
+    public final SortedSet<CellName> columns;
     public final DecoratedKey key;
 
-    public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<ByteBuffer> columns)
+    public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName> columns)
     {
         assert columns != null;
         this.sstable = sstable;
@@ -73,7 +70,7 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement
         }
     }
 
-    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<ByteBuffer> columns, RowIndexEntry indexEntry)
+    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
     {
         assert columns != null;
         this.sstable = sstable;
@@ -152,7 +149,7 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement
         iter = result.iterator();
     }
 
-    private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<OnDiskAtom> result, int columnCount)
+    private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames, List<OnDiskAtom> result, int columnCount)
     {
         Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
         int n = 0;
@@ -177,17 +174,17 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement
 
     private void readIndexedColumns(CFMetaData metadata,
                                     FileDataInput file,
-                                    SortedSet<ByteBuffer> columnNames,
+                                    SortedSet<CellName> columnNames,
                                     List<IndexHelper.IndexInfo> indexList,
                                     long basePosition,
                                     List<OnDiskAtom> result)
     throws IOException
     {
         /* get the various column ranges we have to read */
-        AbstractType<?> comparator = metadata.comparator;
+        CellNameType comparator = metadata.comparator;
         List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>();
         int lastIndexIdx = -1;
-        for (ByteBuffer name : columns)
+        for (CellName name : columnNames)
         {
             int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx);
             if (index < 0 || index == indexList.size())
@@ -203,6 +200,8 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement
         if (ranges.isEmpty())
             return;
 
+        Iterator<CellName> toFetch = columnNames.iterator();
+        CellName nextToFetch = toFetch.next();
         for (IndexHelper.IndexInfo indexInfo : ranges)
         {
             long positionToSeek = basePosition + indexInfo.offset;
@@ -211,17 +210,22 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement
             if (file == null)
                 file = createFileDataInput(positionToSeek);
 
-            // We'll read as much atom as there is in the index block, so provide a bogus atom count
-            Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, Integer.MAX_VALUE, sstable.descriptor.version);
+            AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
             file.seek(positionToSeek);
             FileMark mark = file.mark();
-            // TODO only completely deserialize columns we are interested in
-            while (file.bytesPastMark(mark) < indexInfo.width)
+            while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null)
             {
-                OnDiskAtom column = atomIterator.next();
-                // we check vs the original Set, not the filtered List, for efficiency
-                if (!(column instanceof Column) || columnNames.contains(column.name()))
-                    result.add(column);
+                int cmp = deserializer.compareNextTo(nextToFetch);
+                if (cmp == 0)
+                {
+                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
+                    result.add(deserializer.readNext());
+                    continue;
+                }
+
+                deserializer.skipNext();
+                if (cmp > 0)
+                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
index 4faa651..d338580 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
@@ -62,7 +62,7 @@ public class SSTableSliceIterator implements OnDiskAtomIterator
 
     private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed)
     {
-        return slices.length == 1 && slices[0].start.remaining() == 0 && !reversed
+        return slices.length == 1 && slices[0].start.isEmpty() && !reversed
              ? new SimpleSliceReader(sstable, indexEntry, file, slices[0].finish)
              : new IndexedSliceReader(sstable, indexEntry, file, slices, reversed);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index 48d20db..862fad2 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.columniterator;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
@@ -26,7 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -40,12 +40,12 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
 
     private final FileDataInput file;
     private final boolean needsClosing;
-    private final ByteBuffer finishColumn;
-    private final AbstractType<?> comparator;
+    private final Composite finishColumn;
+    private final CellNameType comparator;
     private final ColumnFamily emptyColumnFamily;
     private final Iterator<OnDiskAtom> atomIterator;
 
-    public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer finishColumn)
+    public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, Composite finishColumn)
     {
         Tracing.trace("Seeking to partition beginning in data file");
         this.finishColumn = finishColumn;
@@ -89,7 +89,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
             return endOfData();
 
         OnDiskAtom column = atomIterator.next();
-        if (finishColumn.remaining() > 0 && comparator.compare(column.name(), finishColumn) > 0)
+        if (!finishColumn.isEmpty() && comparator.compare(column.name(), finishColumn) > 0)
             return endOfData();
 
         return column;