You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:46 UTC

[10/13] Push composites support in the storage engine

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java b/src/java/org/apache/cassandra/db/CFRowAdder.java
new file mode 100644
index 0000000..7c70cf6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CFRowAdder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Convenience object to populate a given CQL3 row in a ColumnFamily object.
+ *
+ * This is meant for when performance is not of the utmost importance. When
+ * performance matters, it might be worth allocating such builder.
+ */
+public class CFRowAdder
+{
+    public final ColumnFamily cf;
+    public final Composite prefix;
+    public final long timestamp;
+    private final int ldt;
+
+    public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp)
+    {
+        this.cf = cf;
+        this.prefix = prefix;
+        this.timestamp = timestamp;
+        this.ldt = (int) (System.currentTimeMillis() / 1000);
+
+        // If a CQL3 table, add the row marker
+        if (cf.metadata().isCQL3Table())
+            cf.addColumn(new Column(cf.getComparator().rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+    }
+
+    public CFRowAdder add(String cql3ColumnName, Object value)
+    {
+        ColumnDefinition def = getDefinition(cql3ColumnName);
+        return add(cf.getComparator().create(prefix, def.name), def, value);
+    }
+
+    public CFRowAdder addMapEntry(String cql3ColumnName, Object key, Object value)
+    {
+        ColumnDefinition def = getDefinition(cql3ColumnName);
+        assert def.type instanceof MapType;
+        MapType mt = (MapType)def.type;
+        CellName name = cf.getComparator().create(prefix, def.name, mt.keys.decompose(key));
+        return add(name, def, value);
+    }
+
+    private ColumnDefinition getDefinition(String name)
+    {
+        return cf.metadata().getColumnDefinition(new ColumnIdentifier(name, false));
+    }
+
+    private CFRowAdder add(CellName name, ColumnDefinition def, Object value)
+    {
+        if (value == null)
+            cf.addColumn(new DeletedColumn(name, ldt, timestamp));
+        else
+            cf.addColumn(new Column(name, ((AbstractType)def.type).decompose(value), timestamp));
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index d07e772..12f53db 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.db;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -94,7 +94,7 @@ public class CollationController
             // avoid changing the filter columns of the original filter
             // (reduceNameFilter removes columns that are known to be irrelevant)
             NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
-            TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns);
+            TreeSet<CellName> filterColumns = new TreeSet<>(namesFilter.columns);
             QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
 
             /* add the SSTables on disk */
@@ -173,9 +173,9 @@ public class CollationController
         if (container == null)
             return;
 
-        for (Iterator<ByteBuffer> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext(); )
+        for (Iterator<CellName> iterator = ((NamesQueryFilter) filter.filter).columns.iterator(); iterator.hasNext(); )
         {
-            ByteBuffer filterColumn = iterator.next();
+            CellName filterColumn = iterator.next();
             Column column = container.getColumn(filterColumn);
             if (column != null && column.timestamp() > sstableTimestamp)
                 iterator.remove();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index e103cd3..4be1eeb 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -20,16 +20,15 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.IOError;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -46,18 +45,16 @@ public class Column implements OnDiskAtom
 {
     public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
 
-    public static final ColumnSerializer serializer = new ColumnSerializer();
-
-    public static OnDiskAtom.Serializer onDiskSerializer()
-    {
-        return OnDiskAtom.Serializer.instance;
-    }
-
     /**
      * For 2.0-formatted sstables (where column count is not stored), @param count should be Integer.MAX_VALUE,
      * and we will look for the end-of-row column name marker instead of relying on that.
      */
-    public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in, final int count, final ColumnSerializer.Flag flag, final int expireBefore, final Descriptor.Version version)
+    public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in,
+                                                      final int count,
+                                                      final ColumnSerializer.Flag flag,
+                                                      final int expireBefore,
+                                                      final Descriptor.Version version,
+                                                      final CellNameType type)
     {
         return new AbstractIterator<OnDiskAtom>()
         {
@@ -71,7 +68,7 @@ public class Column implements OnDiskAtom
                 OnDiskAtom atom;
                 try
                 {
-                    atom = onDiskSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
+                    atom = type.onDiskAtomSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
                 }
                 catch (IOException e)
                 {
@@ -85,31 +82,30 @@ public class Column implements OnDiskAtom
         };
     }
 
-    protected final ByteBuffer name;
+    protected final CellName name;
     protected final ByteBuffer value;
     protected final long timestamp;
 
-    Column(ByteBuffer name)
+    Column(CellName name)
     {
         this(name, ByteBufferUtil.EMPTY_BYTE_BUFFER);
     }
 
-    public Column(ByteBuffer name, ByteBuffer value)
+    public Column(CellName name, ByteBuffer value)
     {
         this(name, value, 0);
     }
 
-    public Column(ByteBuffer name, ByteBuffer value, long timestamp)
+    public Column(CellName name, ByteBuffer value, long timestamp)
     {
         assert name != null;
         assert value != null;
-        assert name.remaining() <= Column.MAX_NAME_LENGTH;
         this.name = name;
         this.value = value;
         this.timestamp = timestamp;
     }
 
-    public Column withUpdatedName(ByteBuffer newName)
+    public Column withUpdatedName(CellName newName)
     {
         return new Column(newName, value, timestamp);
     }
@@ -119,7 +115,7 @@ public class Column implements OnDiskAtom
         return new Column(name, value, newTimestamp);
     }
 
-    public ByteBuffer name()
+    public CellName name()
     {
         return name;
     }
@@ -162,10 +158,10 @@ public class Column implements OnDiskAtom
 
     public int dataSize()
     {
-        return name().remaining() + value.remaining() + TypeSizes.NATIVE.sizeof(timestamp);
+        return name().dataSize() + value.remaining() + TypeSizes.NATIVE.sizeof(timestamp);
     }
 
-    public int serializedSize(TypeSizes typeSizes)
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
     {
         /*
          * Size of a column is =
@@ -175,14 +171,8 @@ public class Column implements OnDiskAtom
          * + 4 bytes which basically indicates the size of the byte array
          * + entire byte array.
         */
-        int nameSize = name.remaining();
         int valueSize = value.remaining();
-        return typeSizes.sizeof((short) nameSize) + nameSize + 1 + typeSizes.sizeof(timestamp) + typeSizes.sizeof(valueSize) + valueSize;
-    }
-
-    public long serializedSizeForSSTable()
-    {
-        return serializedSize(TypeSizes.NATIVE);
+        return ((int)type.cellSerializer().serializedSize(name, typeSizes)) + 1 + typeSizes.sizeof(timestamp) + typeSizes.sizeof(valueSize) + valueSize;
     }
 
     public int serializationFlags()
@@ -199,7 +189,7 @@ public class Column implements OnDiskAtom
 
     public void updateDigest(MessageDigest digest)
     {
-        digest.update(name.duplicate());
+        digest.update(name.toByteBuffer().duplicate());
         digest.update(value.duplicate());
 
         DataOutputBuffer buffer = new DataOutputBuffer();
@@ -273,10 +263,10 @@ public class Column implements OnDiskAtom
 
     public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
-        return new Column(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp);
+        return new Column(name.copy(allocator), allocator.clone(value), timestamp);
     }
 
-    public String getString(AbstractType<?> comparator)
+    public String getString(CellNameType comparator)
     {
         StringBuilder sb = new StringBuilder();
         sb.append(comparator.getString(name));
@@ -298,7 +288,7 @@ public class Column implements OnDiskAtom
     {
         validateName(metadata);
 
-        AbstractType<?> valueValidator = metadata.getValueValidatorFromCellName(name());
+        AbstractType<?> valueValidator = metadata.getValueValidator(name());
         if (valueValidator != null)
             valueValidator.validate(value());
     }
@@ -308,7 +298,7 @@ public class Column implements OnDiskAtom
         return getLocalDeletionTime() < gcBefore;
     }
 
-    public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, CFMetaData metadata)
+    public static Column create(CellName name, ByteBuffer value, long timestamp, int ttl, CFMetaData metadata)
     {
         if (ttl <= 0)
             ttl = metadata.getDefaultTimeToLive();
@@ -317,53 +307,4 @@ public class Column implements OnDiskAtom
                ? new ExpiringColumn(name, value, timestamp, ttl)
                : new Column(name, value, timestamp);
     }
-
-    public static Column create(String value, long timestamp, String... names)
-    {
-        return new Column(decomposeName(names), UTF8Type.instance.decompose(value), timestamp);
-    }
-
-    public static Column create(int value, long timestamp, String... names)
-    {
-        return new Column(decomposeName(names), Int32Type.instance.decompose(value), timestamp);
-    }
-
-    public static Column create(boolean value, long timestamp, String... names)
-    {
-        return new Column(decomposeName(names), BooleanType.instance.decompose(value), timestamp);
-    }
-
-    public static Column create(double value, long timestamp, String... names)
-    {
-        return new Column(decomposeName(names), DoubleType.instance.decompose(value), timestamp);
-    }
-
-    public static Column create(ByteBuffer value, long timestamp, String... names)
-    {
-        return new Column(decomposeName(names), value, timestamp);
-    }
-
-    public static Column create(InetAddress value, long timestamp, String... names)
-    {
-        return new Column(decomposeName(names), InetAddressType.instance.decompose(value), timestamp);
-    }
-
-    static ByteBuffer decomposeName(String... names)
-    {
-        assert names.length > 0;
-
-        if (names.length == 1)
-            return UTF8Type.instance.decompose(names[0]);
-
-        // not super performant.  at this time, only infrequently called schema code uses this.
-        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(names.length);
-        for (int i = 0; i < names.length; i++)
-            types.add(UTF8Type.instance);
-
-        CompositeType.Builder builder = new CompositeType.Builder(CompositeType.getInstance(types));
-        for (String name : names)
-            builder.add(UTF8Type.instance.decompose(name));
-        return builder.build();
-    }
 }
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 2c00071..c2134c2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -36,8 +36,10 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -116,29 +118,29 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         addColumn(column, HeapAllocator.instance);
     }
 
-    public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    public void addColumn(CellName name, ByteBuffer value, long timestamp)
     {
         addColumn(name, value, timestamp, 0);
     }
 
-    public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive)
+    public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
     {
         assert !metadata().getDefaultValidator().isCommutative();
         Column column = Column.create(name, value, timestamp, timeToLive, metadata());
         addColumn(column);
     }
 
-    public void addCounter(ByteBuffer name, long value)
+    public void addCounter(CellName name, long value)
     {
         addColumn(new CounterUpdateColumn(name, value, System.currentTimeMillis()));
     }
 
-    public void addTombstone(ByteBuffer name, ByteBuffer localDeletionTime, long timestamp)
+    public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp)
     {
         addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
     }
 
-    public void addTombstone(ByteBuffer name, int localDeletionTime, long timestamp)
+    public void addTombstone(CellName name, int localDeletionTime, long timestamp)
     {
         addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
     }
@@ -220,13 +222,13 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
      * Get a column given its name, returning null if the column is not
      * present.
      */
-    public abstract Column getColumn(ByteBuffer name);
+    public abstract Column getColumn(CellName name);
 
     /**
      * Returns an iterable with the names of columns in this column map in the same order
      * as the underlying columns themselves.
      */
-    public abstract Iterable<ByteBuffer> getColumnNames();
+    public abstract Iterable<CellName> getColumnNames();
 
     /**
      * Returns the columns of this column map as a collection.
@@ -300,7 +302,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         // takes care of those for us.)
         for (Column columnExternal : cfComposite)
         {
-            ByteBuffer cName = columnExternal.name();
+            CellName cName = columnExternal.name();
             Column columnInternal = getColumn(cName);
             if (columnInternal == null)
             {
@@ -372,7 +374,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         if (isMarkedForDelete())
             sb.append(" -").append(deletionInfo()).append("-");
 
-        sb.append(" [").append(getComparator().getColumnsString(this)).append("])");
+        sb.append(" [").append(CellNames.getColumnsString(getComparator(), this)).append("])");
         return sb.toString();
     }
 
@@ -440,7 +442,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     /**
      * @return the comparator whose sorting order the contained columns conform to
      */
-    public AbstractType<?> getComparator()
+    public CellNameType getComparator()
     {
         return metadata.comparator;
     }
@@ -478,9 +480,9 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         return false;
     }
 
-    public Map<ByteBuffer, ByteBuffer> asMap()
+    public Map<CellName, ByteBuffer> asMap()
     {
-        ImmutableMap.Builder<ByteBuffer, ByteBuffer> builder = ImmutableMap.builder();
+        ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder();
         for (Column column : this)
             builder.put(column.name, column.value);
         return builder.build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 3d832b2..13ec6fc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -67,8 +67,8 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
                 return;
             }
 
-            DeletionInfo.serializer().serialize(cf.deletionInfo(), out, version);
-            ColumnSerializer columnSerializer = Column.serializer;
+            cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version);
+            ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
             int count = cf.getColumnCount();
             out.writeInt(count);
             int written = 0;
@@ -108,9 +108,9 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
         }
         else
         {
-            cf.delete(DeletionInfo.serializer().deserialize(in, version, cf.getComparator()));
+            cf.delete(cf.getComparator().deletionInfoSerializer().deserialize(in, version));
 
-            ColumnSerializer columnSerializer = Column.serializer;
+            ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
             int size = in.readInt();
             for (int i = 0; i < size; ++i)
                 cf.addColumn(columnSerializer.deserialize(in, flag));
@@ -128,10 +128,11 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
         }
         else
         {
-            size += DeletionInfo.serializer().serializedSize(cf.deletionInfo(), typeSizes, version);
+            size += cf.getComparator().deletionInfoSerializer().serializedSize(cf.deletionInfo(), typeSizes, version);
             size += typeSizes.sizeof(cf.getColumnCount());
+            ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
             for (Column column : cf)
-                size += column.serializedSize(typeSizes);
+                size += columnSerializer.serializedSize(column, typeSizes);
         }
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a98e30b..f00e281 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,7 +33,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,10 +50,10 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -94,9 +93,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public final SecondaryIndexManager indexManager;
 
-    private static final int INTERN_CUTOFF = 256;
-    public final ConcurrentMap<ByteBuffer, ByteBuffer> internedNames = new NonBlockingHashMap<ByteBuffer, ByteBuffer>();
-
     /* These are locally held copies to be changed from the config during runtime */
     private volatile DefaultInteger minCompactionThreshold;
     private volatile DefaultInteger maxCompactionThreshold;
@@ -932,8 +928,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     // 2. if it has been re-added since then, this particular column was inserted before the last drop
     private static boolean isDroppedColumn(Column c, CFMetaData meta)
     {
-        ByteBuffer cql3ColumnName = ((CompositeType) meta.comparator).extractLastComponent(c.name());
-        Long droppedAt = meta.getDroppedColumns().get(meta.getColumnDefinition(cql3ColumnName).name);
+        Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName());
         return droppedAt != null && c.timestamp() <= droppedAt;
     }
 
@@ -1230,8 +1225,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     public ColumnFamily getColumnFamily(DecoratedKey key,
-                                        ByteBuffer start,
-                                        ByteBuffer finish,
+                                        Composite start,
+                                        Composite finish,
                                         boolean reversed,
                                         int limit,
                                         long timestamp)
@@ -1617,8 +1612,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange,
                                              SliceQueryFilter columnRange,
-                                             ByteBuffer columnStart,
-                                             ByteBuffer columnStop,
+                                             Composite columnStart,
+                                             Composite columnStop,
                                              List<IndexExpression> rowFilter,
                                              int maxResults,
                                              long now)
@@ -1756,7 +1751,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public AbstractType<?> getComparator()
+    public CellNameType getComparator()
     {
         return metadata.comparator;
     }
@@ -2244,35 +2239,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return partitioner instanceof LocalPartitioner;
     }
 
-    private ByteBuffer intern(ByteBuffer name)
-    {
-        ByteBuffer internedName = internedNames.get(name);
-        if (internedName == null)
-        {
-            internedName = ByteBufferUtil.clone(name);
-            ByteBuffer concurrentName = internedNames.putIfAbsent(internedName, internedName);
-            if (concurrentName != null)
-                internedName = concurrentName;
-        }
-        return internedName;
-    }
-
-    public ByteBuffer internOrCopy(ByteBuffer name, Allocator allocator)
-    {
-        if (internedNames.size() >= INTERN_CUTOFF)
-            return allocator.clone(name);
-
-        return intern(name);
-    }
-
-    public ByteBuffer maybeIntern(ByteBuffer name)
-    {
-        if (internedNames.size() >= INTERN_CUTOFF)
-            return null;
-
-        return intern(name);
-    }
-
     public Iterable<ColumnFamilyStore> concatWithIndexes()
     {
         return Iterables.concat(indexManager.getIndexesBackedByCfs(), Collections.singleton(this));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index cd1baca..3e6d55d 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -25,6 +25,7 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -53,8 +54,6 @@ public class ColumnIndex
      */
     public static class Builder
     {
-        private static final OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer();
-
         private final ColumnIndex result;
         private final long indexOffset;
         private long startPosition = -1;
@@ -69,6 +68,8 @@ public class ColumnIndex
         private final ByteBuffer key;
         private final DeletionInfo deletionInfo; // only used for serializing and calculating row header size
 
+        private final OnDiskAtom.Serializer atomSerializer;
+
         public Builder(ColumnFamily cf,
                        ByteBuffer key,
                        DataOutput output)
@@ -83,6 +84,7 @@ public class ColumnIndex
             this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
             this.output = output;
             this.tombstoneTracker = new RangeTombstone.Tracker(cf.getComparator());
+            this.atomSerializer = cf.getComparator().onDiskAtomSerializer();
         }
 
         /**
@@ -119,7 +121,7 @@ public class ColumnIndex
         public ColumnIndex build(ColumnFamily cf) throws IOException
         {
             // cf has disentangled the columns and range tombstones, we need to re-interleave them in comparator order
-            Comparator<ByteBuffer> comparator = cf.getComparator();
+            Comparator<Composite> comparator = cf.getComparator();
             DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester();
             Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
             RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
@@ -183,7 +185,7 @@ public class ColumnIndex
                                // where we wouldn't make any progress because a block is filled by said marker
             }
 
-            long size = column.serializedSizeForSSTable();
+            long size = atomSerializer.serializedSizeForSSTable(column);
             endPosition += size;
             blockSize += size;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index fb38b5f..a1c6ebd 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -22,9 +22,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnSerializer implements ISerializer<Column>
@@ -51,10 +54,17 @@ public class ColumnSerializer implements ISerializer<Column>
         LOCAL, FROM_REMOTE, PRESERVE_SIZE;
     }
 
+    private final CellNameType type;
+
+    public ColumnSerializer(CellNameType type)
+    {
+        this.type = type;
+    }
+
     public void serialize(Column column, DataOutput out) throws IOException
     {
-        assert column.name().remaining() > 0;
-        ByteBufferUtil.writeWithShortLength(column.name(), out);
+        assert !column.name().isEmpty();
+        type.cellSerializer().serialize(column.name(), out);
         try
         {
             out.writeByte(column.serializationFlags());
@@ -93,15 +103,13 @@ public class ColumnSerializer implements ISerializer<Column>
 
     public Column deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
-        ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
-        if (name.remaining() <= 0)
-            throw CorruptColumnException.create(in, name);
+        CellName name = type.cellSerializer().deserialize(in);
 
         int b = in.readUnsignedByte();
         return deserializeColumnBody(in, name, b, flag, expireBefore);
     }
 
-    Column deserializeColumnBody(DataInput in, ByteBuffer name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
+    Column deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         if ((mask & COUNTER_MASK) != 0)
         {
@@ -130,9 +138,22 @@ public class ColumnSerializer implements ISerializer<Column>
         }
     }
 
-    public long serializedSize(Column column, TypeSizes type)
+    void skipColumnBody(DataInput in, int mask) throws IOException
+    {
+        if ((mask & COUNTER_MASK) != 0)
+            FileUtils.skipBytesFully(in, 16);
+        else if ((mask & EXPIRATION_MASK) != 0)
+            FileUtils.skipBytesFully(in, 16);
+        else
+            FileUtils.skipBytesFully(in, 8);
+
+        int length = in.readInt();
+        FileUtils.skipBytesFully(in, length);
+    }
+
+    public long serializedSize(Column column, TypeSizes typeSizes)
     {
-        return column.serializedSize(type);
+        return column.serializedSize(type, typeSizes);
     }
 
     public static class CorruptColumnException extends IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index b470c5a..ac2c88e 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -31,9 +31,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.context.IContext.ContextRelationship;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -53,28 +54,28 @@ public class CounterColumn extends Column
 
     private final long timestampOfLastDelete;
 
-    public CounterColumn(ByteBuffer name, long value, long timestamp)
+    public CounterColumn(CellName name, long value, long timestamp)
     {
         this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
     }
 
-    public CounterColumn(ByteBuffer name, long value, long timestamp, long timestampOfLastDelete)
+    public CounterColumn(CellName name, long value, long timestamp, long timestampOfLastDelete)
     {
         this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
     }
 
-    public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    public CounterColumn(CellName name, ByteBuffer value, long timestamp)
     {
         this(name, value, timestamp, Long.MIN_VALUE);
     }
 
-    public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
+    public CounterColumn(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
     {
         super(name, value, timestamp);
         this.timestampOfLastDelete = timestampOfLastDelete;
     }
 
-    public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
+    public static CounterColumn create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
     {
         // #elt being negative means we have to clean delta
         short count = value.getShort(value.position());
@@ -84,7 +85,7 @@ public class CounterColumn extends Column
     }
 
     @Override
-    public Column withUpdatedName(ByteBuffer newName)
+    public Column withUpdatedName(CellName newName)
     {
         return new CounterColumn(newName, value, timestamp, timestampOfLastDelete);
     }
@@ -110,9 +111,9 @@ public class CounterColumn extends Column
     }
 
     @Override
-    public int serializedSize(TypeSizes typeSizes)
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
     {
-        return super.serializedSize(typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
+        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
     }
 
     @Override
@@ -147,7 +148,7 @@ public class CounterColumn extends Column
     @Override
     public void updateDigest(MessageDigest digest)
     {
-        digest.update(name.duplicate());
+        digest.update(name.toByteBuffer().duplicate());
         // We don't take the deltas into account in a digest
         contextManager.updateDigest(digest, value);
         DataOutputBuffer buffer = new DataOutputBuffer();
@@ -217,17 +218,17 @@ public class CounterColumn extends Column
     @Override
     public Column localCopy(ColumnFamilyStore cfs)
     {
-        return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp, timestampOfLastDelete);
+        return localCopy(cfs, HeapAllocator.instance);
     }
 
     @Override
     public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
-        return new CounterColumn(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
+        return new CounterColumn(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
     }
 
     @Override
-    public String getString(AbstractType<?> comparator)
+    public String getString(CellNameType comparator)
     {
         StringBuilder sb = new StringBuilder();
         sb.append(comparator.getString(name));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index fb363c2..eae8e12 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -30,11 +30,11 @@ import java.util.UUID;
 
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.HeapAllocator;
 
@@ -109,7 +109,7 @@ public class CounterMutation implements IMutation
 
     private void addReadCommandFromColumnFamily(String keyspaceName, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
     {
-        SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(columnFamily.metadata().comparator);
+        SortedSet<CellName> s = new TreeSet<>(columnFamily.metadata().comparator);
         Iterables.addAll(s, columnFamily.getColumnNames());
         commands.add(new SliceByNamesReadCommand(keyspaceName, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 1ae7dd7..aaf3307 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -33,12 +34,12 @@ import org.apache.cassandra.utils.HeapAllocator;
  */
 public class CounterUpdateColumn extends Column
 {
-    public CounterUpdateColumn(ByteBuffer name, long value, long timestamp)
+    public CounterUpdateColumn(CellName name, long value, long timestamp)
     {
         this(name, ByteBufferUtil.bytes(value), timestamp);
     }
 
-    public CounterUpdateColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    public CounterUpdateColumn(CellName name, ByteBuffer value, long timestamp)
     {
         super(name, value, timestamp);
     }
@@ -81,7 +82,7 @@ public class CounterUpdateColumn extends Column
     @Override
     public CounterColumn localCopy(ColumnFamilyStore cfs)
     {
-        return new CounterColumn(cfs.internOrCopy(name, HeapAllocator.instance),
+        return new CounterColumn(name.copy(HeapAllocator.instance),
                                  CounterContext.instance().create(delta(), HeapAllocator.instance),
                                  timestamp(),
                                  Long.MIN_VALUE);
@@ -90,7 +91,7 @@ public class CounterUpdateColumn extends Column
     @Override
     public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
-        return new CounterColumn(cfs.internOrCopy(name, allocator),
+        return new CounterColumn(name.copy(allocator),
                                  CounterContext.instance().create(delta(), allocator),
                                  timestamp(),
                                  Long.MIN_VALUE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 713027c..453b16a 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -23,8 +23,9 @@ import java.util.Comparator;
 import java.util.List;
 
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.*;
 
 /**
@@ -57,8 +58,8 @@ public class DataRange
     public static boolean isFullRowSlice(SliceQueryFilter filter)
     {
         return filter.slices.length == 1
-            && filter.start().remaining() == 0
-            && filter.finish().remaining() == 0
+            && filter.start().isEmpty()
+            && filter.finish().isEmpty()
             && filter.count == Integer.MAX_VALUE;
     }
 
@@ -124,11 +125,11 @@ public class DataRange
     public static class Paging extends DataRange
     {
         private final SliceQueryFilter sliceFilter;
-        private final Comparator<ByteBuffer> comparator;
-        private final ByteBuffer columnStart;
-        private final ByteBuffer columnFinish;
+        private final Comparator<Composite> comparator;
+        private final Composite columnStart;
+        private final Composite columnFinish;
 
-        private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, Comparator<ByteBuffer> comparator)
+        private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, Comparator<Composite> comparator)
         {
             super(range, filter);
 
@@ -142,9 +143,9 @@ public class DataRange
             this.columnFinish = columnFinish;
         }
 
-        public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator)
+        public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, CellNameType comparator)
         {
-            this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator : comparator);
+            this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator() : comparator);
         }
 
         @Override
@@ -184,8 +185,8 @@ public class DataRange
         private ColumnSlice[] slicesForKey(ByteBuffer key)
         {
             // We don't call that until it's necessary, so assume we have to do some hard work
-            ByteBuffer newStart = equals(startKey(), key) ? columnStart : null;
-            ByteBuffer newFinish = equals(stopKey(), key) ? columnFinish : null;
+            Composite newStart = equals(startKey(), key) ? columnStart : null;
+            Composite newFinish = equals(stopKey(), key) ? columnFinish : null;
 
             List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 828981e..f30e256 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -136,19 +136,6 @@ public class DefsTables
         return keyspaces;
     }
 
-    public static ByteBuffer searchComposite(String name, boolean start)
-    {
-        assert name != null;
-        ByteBuffer nameBytes = UTF8Type.instance.decompose(name);
-        int length = nameBytes.remaining();
-        byte[] bytes = new byte[2 + length + 1];
-        bytes[0] = (byte)((length >> 8) & 0xFF);
-        bytes[1] = (byte)(length & 0xFF);
-        ByteBufferUtil.arrayCopy(nameBytes, 0, bytes, 2, length);
-        bytes[bytes.length - 1] = (byte)(start ? 0 : 1);
-        return ByteBuffer.wrap(bytes);
-    }
-
     private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
     {
         ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
index 377df27..ec88015 100644
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ b/src/java/org/apache/cassandra/db/DeletedColumn.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
@@ -30,18 +31,18 @@ import org.apache.cassandra.utils.HeapAllocator;
 
 public class DeletedColumn extends Column
 {
-    public DeletedColumn(ByteBuffer name, int localDeletionTime, long timestamp)
+    public DeletedColumn(CellName name, int localDeletionTime, long timestamp)
     {
         this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
     }
 
-    public DeletedColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    public DeletedColumn(CellName name, ByteBuffer value, long timestamp)
     {
         super(name, value, timestamp);
     }
 
     @Override
-    public Column withUpdatedName(ByteBuffer newName)
+    public Column withUpdatedName(CellName newName)
     {
         return new DeletedColumn(newName, value, timestamp);
     }
@@ -67,7 +68,7 @@ public class DeletedColumn extends Column
     @Override
     public void updateDigest(MessageDigest digest)
     {
-        digest.update(name.duplicate());
+        digest.update(name.toByteBuffer().duplicate());
 
         DataOutputBuffer buffer = new DataOutputBuffer();
         try
@@ -99,13 +100,13 @@ public class DeletedColumn extends Column
     @Override
     public Column localCopy(ColumnFamilyStore cfs)
     {
-        return new DeletedColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
+        return new DeletedColumn(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
     }
 
     @Override
     public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
-        return new DeletedColumn(cfs.internOrCopy(name, allocator), allocator.clone(value), timestamp);
+        return new DeletedColumn(name.copy(allocator), allocator.clone(value), timestamp);
     }
 
     @Override
@@ -123,9 +124,4 @@ public class DeletedColumn extends Column
         if (getLocalDeletionTime() < 0)
             throw new MarshalException("The local deletion time should not be negative");
     }
-
-    public static DeletedColumn create(int localDeletionTime, long timestamp, String... names)
-    {
-        return new DeletedColumn(decomposeName(names), localDeletionTime, timestamp);
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 13fc824..0bd0635 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
 
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 
 /**
@@ -35,8 +35,6 @@ import org.apache.cassandra.io.IVersionedSerializer;
  */
 public class DeletionInfo
 {
-    private static final Serializer serializer = new Serializer();
-
     /**
      * This represents a deletion of the entire row.  We can't represent this within the RangeTombstoneList, so it's
      * kept separately.  This also slightly optimizes the common case of a full row deletion.
@@ -67,13 +65,13 @@ public class DeletionInfo
         this(topLevel, null);
     }
 
-    public DeletionInfo(ByteBuffer start, ByteBuffer end, Comparator<ByteBuffer> comparator, long markedForDeleteAt, int localDeletionTime)
+    public DeletionInfo(Composite start, Composite end, Comparator<Composite> comparator, long markedForDeleteAt, int localDeletionTime)
     {
         this(DeletionTime.LIVE, new RangeTombstoneList(comparator, 1));
         ranges.add(start, end, markedForDeleteAt, localDeletionTime);
     }
 
-    public DeletionInfo(RangeTombstone rangeTombstone, Comparator<ByteBuffer> comparator)
+    public DeletionInfo(RangeTombstone rangeTombstone, Comparator<Composite> comparator)
     {
         this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime);
     }
@@ -92,11 +90,6 @@ public class DeletionInfo
         return new DeletionInfo(DeletionTime.LIVE);
     }
 
-    public static Serializer serializer()
-    {
-        return serializer;
-    }
-
     public DeletionInfo copy()
     {
         return new DeletionInfo(topLevel, ranges == null ? null : ranges.copy());
@@ -123,7 +116,7 @@ public class DeletionInfo
         return isDeleted(column.name(), column.timestamp());
     }
 
-    public boolean isDeleted(ByteBuffer name, long timestamp)
+    public boolean isDeleted(Composite name, long timestamp)
     {
         // We do rely on this test: if topLevel.markedForDeleteAt is MIN_VALUE, we should not
         // consider the column deleted even if timestamp=MIN_VALUE, otherwise this break QueryFilter.isRelevant
@@ -194,7 +187,7 @@ public class DeletionInfo
             topLevel = newInfo;
     }
 
-    public void add(RangeTombstone tombstone, Comparator<ByteBuffer> comparator)
+    public void add(RangeTombstone tombstone, Comparator<Composite> comparator)
     {
         if (ranges == null)
             ranges = new RangeTombstoneList(comparator, 1);
@@ -255,7 +248,7 @@ public class DeletionInfo
         return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator();
     }
 
-    public DeletionTime rangeCovering(ByteBuffer name)
+    public DeletionTime rangeCovering(Composite name)
     {
         return ranges == null ? null : ranges.search(name);
     }
@@ -284,15 +277,15 @@ public class DeletionInfo
     {
         assert !ranges.isEmpty();
         StringBuilder sb = new StringBuilder();
-        AbstractType at = (AbstractType)ranges.comparator();
-        assert at != null;
+        CType type = (CType)ranges.comparator();
+        assert type != null;
         Iterator<RangeTombstone> iter = rangeIterator();
         while (iter.hasNext())
         {
             RangeTombstone i = iter.next();
             sb.append("[");
-            sb.append(at.getString(i.min)).append("-");
-            sb.append(at.getString(i.max)).append(", ");
+            sb.append(type.getString(i.min)).append("-");
+            sb.append(type.getString(i.max)).append(", ");
             sb.append(i.data);
             sb.append("]");
         }
@@ -326,32 +319,30 @@ public class DeletionInfo
 
     public static class Serializer implements IVersionedSerializer<DeletionInfo>
     {
-        public void serialize(DeletionInfo info, DataOutput out, int version) throws IOException
+        private final RangeTombstoneList.Serializer rtlSerializer;
+
+        public Serializer(CType type)
         {
-            DeletionTime.serializer.serialize(info.topLevel, out);
-            RangeTombstoneList.serializer.serialize(info.ranges, out, version);
+            this.rtlSerializer = new RangeTombstoneList.Serializer(type);
         }
 
-        /*
-         * Range tombstones internally depend on the column family serializer, but it is not serialized.
-         * Thus deserialize(DataInput, int, Comparator<ByteBuffer>) should be used instead of this method.
-         */
-        public DeletionInfo deserialize(DataInput in, int version) throws IOException
+        public void serialize(DeletionInfo info, DataOutput out, int version) throws IOException
         {
-            throw new UnsupportedOperationException();
+            DeletionTime.serializer.serialize(info.topLevel, out);
+            rtlSerializer.serialize(info.ranges, out, version);
         }
 
-        public DeletionInfo deserialize(DataInput in, int version, Comparator<ByteBuffer> comparator) throws IOException
+        public DeletionInfo deserialize(DataInput in, int version) throws IOException
         {
             DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
-            RangeTombstoneList ranges = RangeTombstoneList.serializer.deserialize(in, version, comparator);
+            RangeTombstoneList ranges = rtlSerializer.deserialize(in, version);
             return new DeletionInfo(topLevel, ranges);
         }
 
         public long serializedSize(DeletionInfo info, TypeSizes typeSizes, int version)
         {
             long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes);
-            return size + RangeTombstoneList.serializer.serializedSize(info.ranges, typeSizes, version);
+            return size + rtlSerializer.serializedSize(info.ranges, typeSizes, version);
         }
 
         public long serializedSize(DeletionInfo info, int version)
@@ -389,7 +380,7 @@ public class DeletionInfo
             return isDeleted(column.name(), column.timestamp());
         }
 
-        public boolean isDeleted(ByteBuffer name, long timestamp)
+        public boolean isDeleted(Composite name, long timestamp)
         {
             if (timestamp <= topLevel.markedForDeleteAt)
                 return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 7952fcb..c60b423 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ObjectSizes;
 
 /**
@@ -50,7 +51,7 @@ public class DeletionTime implements Comparable<DeletionTime>
      */
     public final int localDeletionTime;
 
-    public static final ISerializer<DeletionTime> serializer = new Serializer();
+    public static final Serializer serializer = new Serializer();
 
     @VisibleForTesting
     public DeletionTime(long markedForDeleteAt, int localDeletionTime)
@@ -110,7 +111,7 @@ public class DeletionTime implements Comparable<DeletionTime>
         return ObjectSizes.getFieldSize(fields);
     }
 
-    private static class Serializer implements ISerializer<DeletionTime>
+    public static class Serializer implements ISerializer<DeletionTime>
     {
         public void serialize(DeletionTime delTime, DataOutput out) throws IOException
         {
@@ -128,6 +129,11 @@ public class DeletionTime implements Comparable<DeletionTime>
                 return new DeletionTime(mfda, ldt);
         }
 
+        public void skip(DataInput in) throws IOException
+        {
+            FileUtils.skipBytesFully(in, 4 + 8);
+        }
+
         public long serializedSize(DeletionTime delTime, TypeSizes typeSizes)
         {
             return typeSizes.sizeof(delTime.localDeletionTime)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/EmptyColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyColumns.java b/src/java/org/apache/cassandra/db/EmptyColumns.java
index 129ddc5..782ffc9 100644
--- a/src/java/org/apache/cassandra/db/EmptyColumns.java
+++ b/src/java/org/apache/cassandra/db/EmptyColumns.java
@@ -21,12 +21,12 @@ package org.apache.cassandra.db;
  */
 
 
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.utils.Allocator;
 
@@ -78,12 +78,12 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
         throw new UnsupportedOperationException();
     }
 
-    public Column getColumn(ByteBuffer name)
+    public Column getColumn(CellName name)
     {
         throw new UnsupportedOperationException();
     }
 
-    public Iterable<ByteBuffer> getColumnNames()
+    public Iterable<CellName> getColumnNames()
     {
         return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index e11567f..92c8306 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -22,11 +22,11 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.HeapAllocator;
 
 /**
@@ -46,12 +46,12 @@ public class ExpiringColumn extends Column
     private final int localExpirationTime;
     private final int timeToLive;
 
-    public ExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive)
+    public ExpiringColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
     {
       this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
     }
 
-    public ExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
+    public ExpiringColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
     {
         super(name, value, timestamp);
         assert timeToLive > 0 : timeToLive;
@@ -61,7 +61,7 @@ public class ExpiringColumn extends Column
     }
 
     /** @return Either a DeletedColumn, or an ExpiringColumn. */
-    public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
+    public static Column create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
     {
         if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
             return new ExpiringColumn(name, value, timestamp, timeToLive, localExpirationTime);
@@ -78,7 +78,7 @@ public class ExpiringColumn extends Column
     }
 
     @Override
-    public Column withUpdatedName(ByteBuffer newName)
+    public Column withUpdatedName(CellName newName)
     {
         return new ExpiringColumn(newName, value, timestamp, timeToLive, localExpirationTime);
     }
@@ -96,20 +96,20 @@ public class ExpiringColumn extends Column
     }
 
     @Override
-    public int serializedSize(TypeSizes typeSizes)
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
     {
         /*
          * An expired column adds to a Column :
          *    4 bytes for the localExpirationTime
          *  + 4 bytes for the timeToLive
         */
-        return super.serializedSize(typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
+        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
     }
 
     @Override
     public void updateDigest(MessageDigest digest)
     {
-        digest.update(name.duplicate());
+        digest.update(name.toByteBuffer().duplicate());
         digest.update(value.duplicate());
 
         DataOutputBuffer buffer = new DataOutputBuffer();
@@ -135,20 +135,17 @@ public class ExpiringColumn extends Column
     @Override
     public Column localCopy(ColumnFamilyStore cfs)
     {
-        return new ExpiringColumn(cfs.internOrCopy(name, HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
+        return localCopy(cfs, HeapAllocator.instance);
     }
 
     @Override
     public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
     {
-        ByteBuffer clonedName = cfs.maybeIntern(name);
-        if (clonedName == null)
-            clonedName = allocator.clone(name);
-        return new ExpiringColumn(clonedName, allocator.clone(value), timestamp, timeToLive, localExpirationTime);
+        return new ExpiringColumn(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
     }
 
     @Override
-    public String getString(AbstractType<?> comparator)
+    public String getString(CellNameType comparator)
     {
         StringBuilder sb = new StringBuilder();
         sb.append(super.getString(comparator));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 39afd12..8868286 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -41,12 +41,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.IPartitioner;
@@ -102,7 +104,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private volatile boolean hintedHandOffPaused = false;
 
-    static final CompositeType comparator = CompositeType.getInstance(Arrays.<AbstractType<?>>asList(UUIDType.instance, Int32Type.instance));
     static final int maxHintTTL = Integer.parseInt(System.getProperty("cassandra.maxHintTTL", String.valueOf(Integer.MAX_VALUE)));
 
     private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
@@ -133,7 +134,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
         UUID hintId = UUIDGen.getTimeUUID();
         // serialize the hint with id and version as a composite column name
-        ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version);
+        CellName name = CFMetaData.HintsCf.comparator.makeCellName(hintId, MessagingService.current_version);
         ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version));
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF));
         cf.addColumn(name, value, System.currentTimeMillis(), ttl);
@@ -178,7 +179,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
     }
 
-    private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
+    private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
     {
         RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, tokenBytes);
         rm.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
@@ -238,11 +239,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         return CompactionManager.instance.submitUserDefined(hintStore, descriptors, (int) (System.currentTimeMillis() / 1000));
     }
 
-    private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn)
+    private static boolean pagingFinished(ColumnFamily hintColumnFamily, Composite startColumn)
     {
         // done if no hints found or the start column (same as last column processed in previous iteration) is the only one
         return hintColumnFamily == null
-               || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
+               || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn((CellName)startColumn) != null);
     }
 
     private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
@@ -325,7 +326,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         DecoratedKey epkey =  StorageService.getPartitioner().decorateKey(hostIdBytes);
 
         final AtomicInteger rowsReplayed = new AtomicInteger(0);
-        ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        Composite startColumn = Composites.EMPTY;
 
         int pageSize = calculatePageSize();
         logger.debug("Using pageSize of {}", pageSize);
@@ -343,7 +344,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             QueryFilter filter = QueryFilter.getSliceFilter(epkey,
                                                             SystemKeyspace.HINTS_CF,
                                                             startColumn,
-                                                            ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                            Composites.EMPTY,
                                                             false,
                                                             pageSize,
                                                             now);
@@ -381,8 +382,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
                 startColumn = hint.name();
 
-                ByteBuffer[] components = comparator.split(hint.name());
-                int version = Int32Type.instance.compose(components[1]);
+                int version = Int32Type.instance.compose(hint.name().get(1));
                 DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
                 RowMutation rm;
                 try
@@ -493,7 +493,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         IPartitioner p = StorageService.getPartitioner();
         RowPosition minPos = p.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
-        IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
+        IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<CellName>of());
         List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
         for (Row row : rows)
         {
@@ -564,8 +564,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     private List<Row> getHintsSlice(int columnCount)
     {
         // Get count # of columns...
-        SliceQueryFilter predicate = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                          ByteBufferUtil.EMPTY_BYTE_BUFFER,
+        SliceQueryFilter predicate = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY,
                                                           false,
                                                           columnCount);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 785f0c2..ffbdd37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -111,7 +112,7 @@ public class Memtable
     // Record the comparator of the CFS at the creation of the memtable. This
     // is only used when a user update the CF comparator, to know if the
     // memtable was created with the new or old comparator.
-    public final AbstractType initialComparator;
+    public final CellNameType initialComparator;
 
     public Memtable(ColumnFamilyStore cfs)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 2956d6b..bda46d5 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -18,18 +18,19 @@
 package org.apache.cassandra.db;
 
 import java.io.*;
-import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public interface OnDiskAtom
 {
-    public ByteBuffer name();
+    public Composite name();
 
     /**
      * For a standard column, this is the same as timestamp().
@@ -39,28 +40,28 @@ public interface OnDiskAtom
     public long maxTimestamp();
     public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
 
-    public int serializedSize(TypeSizes typeSizes);
-    public long serializedSizeForSSTable();
-
     public void validateFields(CFMetaData metadata) throws MarshalException;
     public void updateDigest(MessageDigest digest);
 
     public static class Serializer implements ISSTableSerializer<OnDiskAtom>
     {
-        public static Serializer instance = new Serializer();
+        private final CellNameType type;
 
-        private Serializer() {}
+        public Serializer(CellNameType type)
+        {
+            this.type = type;
+        }
 
         public void serializeForSSTable(OnDiskAtom atom, DataOutput out) throws IOException
         {
             if (atom instanceof Column)
             {
-                Column.serializer.serialize((Column) atom, out);
+                type.columnSerializer().serialize((Column)atom, out);
             }
             else
             {
                 assert atom instanceof RangeTombstone;
-                RangeTombstone.serializer.serializeForSSTable((RangeTombstone)atom, out);
+                type.rangeTombstoneSerializer().serializeForSSTable((RangeTombstone)atom, out);
             }
         }
 
@@ -71,8 +72,8 @@ public interface OnDiskAtom
 
         public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
         {
-            ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
-            if (name.remaining() <= 0)
+            Composite name = type.serializer().deserialize(in);
+            if (name.isEmpty())
             {
                 // SSTableWriter.END_OF_ROW
                 return null;
@@ -80,9 +81,22 @@ public interface OnDiskAtom
 
             int b = in.readUnsignedByte();
             if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
-                return RangeTombstone.serializer.deserializeBody(in, name, version);
+                return type.rangeTombstoneSerializer().deserializeBody(in, name, version);
             else
-                return Column.serializer.deserializeColumnBody(in, name, b, flag, expireBefore);
+                return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore);
+        }
+
+        public long serializedSizeForSSTable(OnDiskAtom atom)
+        {
+            if (atom instanceof Column)
+            {
+                return type.columnSerializer().serializedSize((Column)atom, TypeSizes.NATIVE);
+            }
+            else
+            {
+                assert atom instanceof RangeTombstone;
+                return type.rangeTombstoneSerializer().serializedSizeForSSTable((RangeTombstone)atom);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
index 0e24859..1a009d9 100644
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -35,8 +37,8 @@ public class PagedRangeCommand extends AbstractRangeCommand
 {
     public static final IVersionedSerializer<PagedRangeCommand> serializer = new Serializer();
 
-    public final ByteBuffer start;
-    public final ByteBuffer stop;
+    public final Composite start;
+    public final Composite stop;
     public final int limit;
 
     public PagedRangeCommand(String keyspace,
@@ -44,8 +46,8 @@ public class PagedRangeCommand extends AbstractRangeCommand
                              long timestamp,
                              AbstractBounds<RowPosition> keyRange,
                              SliceQueryFilter predicate,
-                             ByteBuffer start,
-                             ByteBuffer stop,
+                             Composite start,
+                             Composite stop,
                              List<IndexExpression> rowFilter,
                              int limit)
     {
@@ -57,13 +59,13 @@ public class PagedRangeCommand extends AbstractRangeCommand
 
     public MessageOut<PagedRangeCommand> createMessage()
     {
-        return new MessageOut<PagedRangeCommand>(MessagingService.Verb.PAGED_RANGE, this, serializer);
+        return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, serializer);
     }
 
     public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
     {
-        ByteBuffer newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
-        ByteBuffer newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
+        Composite newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
+        Composite newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
         return new PagedRangeCommand(keyspace,
                                      columnFamily,
                                      timestamp,
@@ -125,13 +127,15 @@ public class PagedRangeCommand extends AbstractRangeCommand
 
             AbstractBounds.serializer.serialize(cmd.keyRange, out, version);
 
+            CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
+
             // SliceQueryFilter (the count is not used)
             SliceQueryFilter filter = (SliceQueryFilter)cmd.predicate;
-            SliceQueryFilter.serializer.serialize(filter, out, version);
+            metadata.comparator.sliceQueryFilterSerializer().serialize(filter, out, version);
 
             // The start and stop of the page
-            ByteBufferUtil.writeWithShortLength(cmd.start, out);
-            ByteBufferUtil.writeWithShortLength(cmd.stop, out);
+            metadata.comparator.serializer().serialize(cmd.start, out);
+            metadata.comparator.serializer().serialize(cmd.stop, out);
 
             out.writeInt(cmd.rowFilter.size());
             for (IndexExpression expr : cmd.rowFilter)
@@ -152,10 +156,12 @@ public class PagedRangeCommand extends AbstractRangeCommand
 
             AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, version).toRowBounds();
 
-            SliceQueryFilter predicate = SliceQueryFilter.serializer.deserialize(in, version);
+            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
+
+            SliceQueryFilter predicate = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
 
-            ByteBuffer start = ByteBufferUtil.readWithShortLength(in);
-            ByteBuffer stop = ByteBufferUtil.readWithShortLength(in);
+            Composite start = metadata.comparator.serializer().deserialize(in);
+            Composite stop =  metadata.comparator.serializer().deserialize(in);
 
             int filterCount = in.readInt();
             List<IndexExpression> rowFilter = new ArrayList<IndexExpression>(filterCount);
@@ -181,10 +187,12 @@ public class PagedRangeCommand extends AbstractRangeCommand
 
             size += AbstractBounds.serializer.serializedSize(cmd.keyRange, version);
 
-            size += SliceQueryFilter.serializer.serializedSize((SliceQueryFilter)cmd.predicate, version);
+            CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
+
+            size += metadata.comparator.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)cmd.predicate, version);
 
-            size += TypeSizes.NATIVE.sizeofWithShortLength(cmd.start);
-            size += TypeSizes.NATIVE.sizeofWithShortLength(cmd.stop);
+            size += metadata.comparator.serializer().serializedSize(cmd.start, TypeSizes.NATIVE);
+            size += metadata.comparator.serializer().serializedSize(cmd.stop, TypeSizes.NATIVE);
 
             size += TypeSizes.NATIVE.sizeof(cmd.rowFilter.size());
             for (IndexExpression expr : cmd.rowFilter)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index a9a48a0..b8f67ba 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -157,7 +157,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         out.writeUTF(sliceCommand.columnFamily);
         out.writeLong(sliceCommand.timestamp);
 
-        IDiskAtomFilter.Serializer.instance.serialize(sliceCommand.predicate, out, version);
+        CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.keyspace, sliceCommand.columnFamily);
+
+        metadata.comparator.diskAtomFilterSerializer().serialize(sliceCommand.predicate, out, version);
 
         if (sliceCommand.rowFilter == null)
         {
@@ -187,7 +189,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
 
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 
-        IDiskAtomFilter predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator);
+        IDiskAtomFilter predicate = metadata.comparator.diskAtomFilterSerializer().deserialize(in, version);
 
         List<IndexExpression> rowFilter;
         int filterCount = in.readInt();
@@ -214,9 +216,11 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
         size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
 
+        CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
+
         IDiskAtomFilter filter = rsc.predicate;
 
-        size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version);
+        size += metadata.comparator.diskAtomFilterSerializer().serializedSize(filter, version);
 
         if (rsc.rowFilter == null)
         {