You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:03:43 UTC

[07/13] Push composites support in the storage engine

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/composites/CompoundSparseCellName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/CompoundSparseCellName.java b/src/java/org/apache/cassandra/db/composites/CompoundSparseCellName.java
new file mode 100644
index 0000000..77b311e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/composites/CompoundSparseCellName.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class CompoundSparseCellName extends CompoundComposite implements CellName
+{
+    private static final ByteBuffer[] EMPTY_PREFIX = new ByteBuffer[0];
+
+    protected final ColumnIdentifier columnName;
+
+    // Not meant to be used directly, you should use the CellNameType method instead
+    CompoundSparseCellName(ColumnIdentifier columnName)
+    {
+        this(EMPTY_PREFIX, columnName);
+    }
+
+    CompoundSparseCellName(ByteBuffer[] elements, ColumnIdentifier columnName)
+    {
+        this(elements, elements.length, columnName);
+    }
+
+    CompoundSparseCellName(ByteBuffer[] elements, int size, ColumnIdentifier columnName)
+    {
+        super(elements, size);
+        this.columnName = columnName;
+    }
+
+    public int size()
+    {
+        return size + 1;
+    }
+
+    public ByteBuffer get(int i)
+    {
+        return i == size ? columnName.bytes : elements[i];
+    }
+
+    public int clusteringSize()
+    {
+        return size;
+    }
+
+    public ColumnIdentifier cql3ColumnName()
+    {
+        return columnName;
+    }
+
+    public ByteBuffer collectionElement()
+    {
+        return null;
+    }
+
+    public boolean isCollectionCell()
+    {
+        return false;
+    }
+
+    public boolean isSameCQL3RowAs(CellName other)
+    {
+        if (clusteringSize() != other.clusteringSize())
+            return false;
+
+        for (int i = 0; i < clusteringSize(); i++)
+        {
+            if (!elements[i].equals(other.get(i)))
+                return false;
+        }
+        return true;
+    }
+
+    public CellName copy(Allocator allocator)
+    {
+        if (elements.length == 0)
+            return this;
+
+        // We don't copy columnName because it's interned in SparseCellNameType
+        return new CompoundSparseCellName(elementsCopy(allocator), columnName);
+    }
+
+    @Override
+    public long memorySize()
+    {
+        return ObjectSizes.getSuperClassFieldSize(super.memorySize())
+             + ObjectSizes.getFieldSize(ObjectSizes.getReferenceSize()) + columnName.memorySize();
+    }
+
+    public static class WithCollection extends CompoundSparseCellName
+    {
+        private final ByteBuffer collectionElement;
+
+        WithCollection(ColumnIdentifier columnName, ByteBuffer collectionElement)
+        {
+            this(EMPTY_PREFIX, columnName, collectionElement);
+        }
+
+        WithCollection(ByteBuffer[] elements, ColumnIdentifier columnName, ByteBuffer collectionElement)
+        {
+            this(elements, elements.length, columnName, collectionElement);
+        }
+
+        WithCollection(ByteBuffer[] elements, int size, ColumnIdentifier columnName, ByteBuffer collectionElement)
+        {
+            super(elements, size, columnName);
+            this.collectionElement = collectionElement;
+        }
+
+        public int size()
+        {
+            return size + 2;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            return i == size + 1 ? collectionElement : super.get(i);
+        }
+
+        @Override
+        public ByteBuffer collectionElement()
+        {
+            return collectionElement;
+        }
+
+        @Override
+        public boolean isCollectionCell()
+        {
+            return true;
+        }
+
+        @Override
+        public CellName copy(Allocator allocator)
+        {
+            // We don't copy columnName because it's interned in SparseCellNameType
+            return new CompoundSparseCellName.WithCollection(elements.length == 0 ? elements : elementsCopy(allocator), size, columnName, allocator.clone(collectionElement));
+        }
+
+        @Override
+        public long memorySize()
+        {
+            return ObjectSizes.getSuperClassFieldSize(super.memorySize())
+                 + ObjectSizes.getFieldSize(ObjectSizes.getReferenceSize()) + ObjectSizes.getSize(collectionElement);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java b/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
new file mode 100644
index 0000000..2cc8516
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/composites/CompoundSparseCellNameType.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.composites;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.cql3.CQL3Row;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ColumnToCollectionType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class CompoundSparseCellNameType extends AbstractCompoundCellNameType
+{
+    private static final AbstractType<?> columnNameType = UTF8Type.instance;
+    private static final ColumnIdentifier rowMarkerId = new ColumnIdentifier(ByteBufferUtil.EMPTY_BYTE_BUFFER, UTF8Type.instance);
+    private static final CellName rowMarkerNoPrefix = new CompoundSparseCellName(rowMarkerId);
+
+    protected final Map<ByteBuffer, ColumnIdentifier> internedIds;
+
+    public CompoundSparseCellNameType(List<AbstractType<?>> types)
+    {
+        this(new CompoundCType(types));
+    }
+
+    public CompoundSparseCellNameType(CompoundCType clusteringType)
+    {
+        this(clusteringType, makeCType(clusteringType, null), new HashMap<ByteBuffer, ColumnIdentifier>());
+    }
+
+    private CompoundSparseCellNameType(CompoundCType clusteringType, CompoundCType fullType, Map<ByteBuffer, ColumnIdentifier> internedIds)
+    {
+        super(clusteringType, fullType);
+        this.internedIds = internedIds;
+    }
+
+    protected static CompoundCType makeCType(CompoundCType clusteringType, ColumnToCollectionType collectionType)
+    {
+        List<AbstractType<?>> allSubtypes = new ArrayList<AbstractType<?>>(clusteringType.size() + (collectionType == null ? 1 : 2));
+        for (int i = 0; i < clusteringType.size(); i++)
+            allSubtypes.add(clusteringType.subtype(i));
+        allSubtypes.add(columnNameType);
+        if (collectionType != null)
+            allSubtypes.add(collectionType);
+        return new CompoundCType(allSubtypes);
+    }
+
+    public CellNameType setSubtype(int position, AbstractType<?> newType)
+    {
+        if (position < clusteringSize)
+            return new CompoundSparseCellNameType(clusteringType.setSubtype(position, newType), fullType.setSubtype(position, newType), internedIds);
+
+        if (position == clusteringSize)
+            throw new IllegalArgumentException();
+
+        throw new IndexOutOfBoundsException();
+    }
+
+    @Override
+    public CellNameType addCollection(ColumnIdentifier columnName, CollectionType newCollection)
+    {
+        return new WithCollection(clusteringType, ColumnToCollectionType.getInstance(Collections.singletonMap(columnName.bytes, newCollection)), internedIds);
+    }
+
+    public boolean isDense()
+    {
+        return false;
+    }
+
+    public boolean supportCollections()
+    {
+        return true;
+    }
+
+    public CellName create(Composite prefix, ColumnIdentifier columnName)
+    {
+        assert prefix.size() == clusteringSize;
+
+        if (prefix.isEmpty())
+            return new CompoundSparseCellName(columnName);
+
+        assert prefix instanceof CompoundComposite;
+        CompoundComposite lc = (CompoundComposite)prefix;
+        return new CompoundSparseCellName(lc.elements, clusteringSize, columnName);
+    }
+
+    public CellName rowMarker(Composite prefix)
+    {
+        if (prefix.isEmpty())
+            return rowMarkerNoPrefix;
+
+        return create(prefix, rowMarkerId);
+    }
+
+    protected ColumnIdentifier idFor(ByteBuffer bb)
+    {
+        ColumnIdentifier id = internedIds.get(bb);
+        return id == null ? new ColumnIdentifier(bb, columnNameType) : id;
+    }
+
+    protected Composite makeWith(ByteBuffer[] components, int size, Composite.EOC eoc)
+    {
+        if (size < clusteringSize + 1 || eoc != Composite.EOC.NONE)
+            return new CompoundComposite(components, size).withEOC(eoc);
+
+        return new CompoundSparseCellName(components, clusteringSize, idFor(components[clusteringSize]));
+    }
+
+    protected Composite copyAndMakeWith(ByteBuffer[] components, int size, Composite.EOC eoc)
+    {
+        if (size < clusteringSize + 1 || eoc != Composite.EOC.NONE)
+            return new CompoundComposite(Arrays.copyOfRange(components, 0, size), size).withEOC(eoc);
+
+        ByteBuffer[] clusteringColumns = Arrays.copyOfRange(components, 0, clusteringSize);
+        return new CompoundSparseCellName(clusteringColumns, idFor(components[clusteringSize]));
+    }
+
+    public void addCQL3Column(ColumnIdentifier id)
+    {
+        internedIds.put(id.bytes, id);
+    }
+
+    public void removeCQL3Column(ColumnIdentifier id)
+    {
+        internedIds.remove(id.bytes);
+    }
+
+    public CQL3Row.Builder CQL3RowBuilder(long now)
+    {
+        return makeSparseCQL3RowBuilder(now);
+    }
+
+    public static class WithCollection extends CompoundSparseCellNameType
+    {
+        private final ColumnToCollectionType collectionType;
+
+        public WithCollection(List<AbstractType<?>> types, ColumnToCollectionType collectionType)
+        {
+            this(new CompoundCType(types), collectionType);
+        }
+
+        WithCollection(CompoundCType clusteringType, ColumnToCollectionType collectionType)
+        {
+            this(clusteringType, makeCType(clusteringType, collectionType), collectionType, new HashMap<ByteBuffer, ColumnIdentifier>());
+        }
+
+        private WithCollection(CompoundCType clusteringType, ColumnToCollectionType collectionType, Map<ByteBuffer, ColumnIdentifier> internedIds)
+        {
+            this(clusteringType, makeCType(clusteringType, collectionType), collectionType, internedIds);
+        }
+
+        private WithCollection(CompoundCType clusteringType, CompoundCType fullCType, ColumnToCollectionType collectionType, Map<ByteBuffer, ColumnIdentifier> internedIds)
+        {
+            super(clusteringType, fullCType, internedIds);
+            this.collectionType = collectionType;
+        }
+
+        @Override
+        public CellNameType setSubtype(int position, AbstractType<?> newType)
+        {
+            if (position < clusteringSize)
+                return new WithCollection(clusteringType.setSubtype(position, newType), collectionType, internedIds);
+
+            throw position >= fullType.size() ? new IndexOutOfBoundsException() : new IllegalArgumentException();
+        }
+
+        @Override
+        public CellNameType addCollection(ColumnIdentifier columnName, CollectionType newCollection)
+        {
+            Map<ByteBuffer, CollectionType> newMap = new HashMap<>(collectionType.defined);
+            newMap.put(columnName.bytes, newCollection);
+            return new WithCollection(clusteringType, ColumnToCollectionType.getInstance(newMap), internedIds);
+        }
+
+        @Override
+        public CellName create(Composite prefix, ColumnIdentifier columnName, ByteBuffer collectionElement)
+        {
+            // We ignore the columnName because it's just the COMPACT_VALUE name which is not store in the cell name
+            assert prefix.size() == clusteringSize;
+
+            if (prefix.isEmpty())
+                return new CompoundSparseCellName.WithCollection(columnName, collectionElement);
+
+            assert prefix instanceof CompoundComposite;
+            CompoundComposite lc = (CompoundComposite)prefix;
+            return new CompoundSparseCellName.WithCollection(lc.elements, clusteringSize, columnName, collectionElement);
+        }
+
+        @Override
+        public boolean hasCollections()
+        {
+            return true;
+        }
+
+        @Override
+        public ColumnToCollectionType collectionType()
+        {
+            return collectionType;
+        }
+
+        @Override
+        protected Composite makeWith(ByteBuffer[] components, int size, Composite.EOC eoc)
+        {
+            if (size < fullSize)
+                return super.makeWith(components, size, eoc);
+
+            return new CompoundSparseCellName.WithCollection(components, clusteringSize, idFor(components[clusteringSize]), components[fullSize - 1]);
+        }
+
+        protected Composite copyAndMakeWith(ByteBuffer[] components, int size, Composite.EOC eoc)
+        {
+            if (size < fullSize)
+                return super.copyAndMakeWith(components, size, eoc);
+
+            ByteBuffer[] clusteringColumns = Arrays.copyOfRange(components, 0, clusteringSize);
+            return new CompoundSparseCellName.WithCollection(clusteringColumns, idFor(components[clusteringSize]), components[fullSize + 1]);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/composites/SimpleCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleCType.java b/src/java/org/apache/cassandra/db/composites/SimpleCType.java
new file mode 100644
index 0000000..08ace8b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/composites/SimpleCType.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
+/**
+ * A not truly-composite CType.
+ */
+public class SimpleCType extends AbstractCType
+{
+    protected final AbstractType<?> type;
+
+    public SimpleCType(AbstractType<?> type)
+    {
+        this.type = type;
+    }
+
+    public boolean isCompound()
+    {
+        return false;
+    }
+
+    public int size()
+    {
+        return 1;
+    }
+
+    public AbstractType<?> subtype(int i)
+    {
+        if (i != 0)
+            throw new IndexOutOfBoundsException();
+        return type;
+    }
+
+    public Composite fromByteBuffer(ByteBuffer bytes)
+    {
+        return !bytes.hasRemaining() ? Composites.EMPTY : new SimpleComposite(bytes);
+    }
+
+    public CBuilder builder()
+    {
+        return new SimpleCBuilder(this);
+    }
+
+    public CType setSubtype(int position, AbstractType<?> newType)
+    {
+        if (position != 0)
+            throw new IndexOutOfBoundsException();
+        return new SimpleCType(newType);
+    }
+
+    // Use sparingly, it defeats the purpose
+    public AbstractType<?> asAbstractType()
+    {
+        return type;
+    }
+
+    public static class SimpleCBuilder implements CBuilder
+    {
+        private final CType type;
+        private ByteBuffer value;
+
+        public SimpleCBuilder(CType type)
+        {
+            this.type = type;
+        }
+
+        public int remainingCount()
+        {
+            return value == null ? 1 : 0;
+        }
+
+        public CBuilder add(ByteBuffer value)
+        {
+            if (this.value != null)
+                throw new IllegalStateException();
+            this.value = value;
+            return this;
+        }
+
+        public CBuilder add(Object value)
+        {
+            return add(((AbstractType)type.subtype(0)).decompose(value));
+        }
+
+        public Composite build()
+        {
+            if (value == null || !value.hasRemaining())
+                return Composites.EMPTY;
+
+            // If we're building a dense cell name, then we can directly allocate the
+            // CellName object as it's complete.
+            if (type instanceof CellNameType && ((CellNameType)type).isDense())
+                return new SimpleDenseCellName(value);
+
+            return new SimpleComposite(value);
+        }
+
+        public Composite buildWith(ByteBuffer value)
+        {
+            if (this.value != null)
+                throw new IllegalStateException();
+
+            if (value == null || !value.hasRemaining())
+                return Composites.EMPTY;
+
+            // If we're building a dense cell name, then we can directly allocate the
+            // CellName object as it's complete.
+            if (type instanceof CellNameType && ((CellNameType)type).isDense())
+                return new SimpleDenseCellName(value);
+
+            return new SimpleComposite(value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/composites/SimpleComposite.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleComposite.java b/src/java/org/apache/cassandra/db/composites/SimpleComposite.java
new file mode 100644
index 0000000..e88afe7
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/composites/SimpleComposite.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * A "simple" (not-truly-composite) Composite.
+ */
+public class SimpleComposite extends AbstractComposite
+{
+    protected final ByteBuffer element;
+
+    SimpleComposite(ByteBuffer element)
+    {
+        // We have to be careful with empty ByteBuffers as we shouldn't store them.
+        // To avoid errors (and so isEmpty() works as we intend), we don't allow simpleComposite with
+        // an empty element (but it's ok for CompoundComposite, it's a row marker in that case).
+        assert element.hasRemaining();
+        this.element = element;
+    }
+
+    public int size()
+    {
+        return 1;
+    }
+
+    public ByteBuffer get(int i)
+    {
+        if (i != 0)
+            throw new IndexOutOfBoundsException();
+
+        return element;
+    }
+
+    @Override
+    public Composite withEOC(EOC newEoc)
+    {
+        // EOC makes no sense for not truly composites.
+        return this;
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer()
+    {
+        return element;
+    }
+
+    public long memorySize()
+    {
+        return ObjectSizes.getFieldSize(ObjectSizes.getSize(element));
+    }
+
+    public Composite copy(Allocator allocator)
+    {
+        return new SimpleComposite(allocator.clone(element));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/composites/SimpleDenseCellName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleDenseCellName.java b/src/java/org/apache/cassandra/db/composites/SimpleDenseCellName.java
new file mode 100644
index 0000000..338e8a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/composites/SimpleDenseCellName.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class SimpleDenseCellName extends SimpleComposite implements CellName
+{
+    // Not meant to be used directly, you should use the CellNameType method instead
+    SimpleDenseCellName(ByteBuffer element)
+    {
+        super(element);
+    }
+
+    public int clusteringSize()
+    {
+        return 1;
+    }
+
+    public ColumnIdentifier cql3ColumnName()
+    {
+        return null;
+    }
+
+    public ByteBuffer collectionElement()
+    {
+        return null;
+    }
+
+    public boolean isCollectionCell()
+    {
+        return false;
+    }
+
+    public boolean isSameCQL3RowAs(CellName other)
+    {
+        // Dense cell imply one cell by CQL row so no other cell will be the same row.
+        return equals(other);
+    }
+
+    @Override
+    public long memorySize()
+    {
+        return ObjectSizes.getSuperClassFieldSize(super.memorySize());
+    }
+
+    // If cellnames were sharing some prefix components, this will break it, so
+    // we might want to try to do better.
+    @Override
+    public CellName copy(Allocator allocator)
+    {
+        return new SimpleDenseCellName(allocator.clone(element));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/composites/SimpleDenseCellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleDenseCellNameType.java b/src/java/org/apache/cassandra/db/composites/SimpleDenseCellNameType.java
new file mode 100644
index 0000000..cafb521
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/composites/SimpleDenseCellNameType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.CQL3Row;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class SimpleDenseCellNameType extends AbstractSimpleCellNameType
+{
+    public SimpleDenseCellNameType(AbstractType<?> type)
+    {
+        super(type);
+    }
+
+    public int clusteringPrefixSize()
+    {
+        return 1;
+    }
+
+    public CBuilder prefixBuilder()
+    {
+        // Simple dense is "all" prefix
+        return builder();
+    }
+
+    public CellNameType setSubtype(int position, AbstractType<?> newType)
+    {
+        if (position != 0)
+            throw new IllegalArgumentException();
+        return new SimpleDenseCellNameType(newType);
+    }
+
+    public boolean isDense()
+    {
+        return true;
+    }
+
+    public CellName create(Composite prefix, ColumnIdentifier columnName)
+    {
+        assert prefix.size() == 1;
+        // We ignore the columnName because it's just the COMPACT_VALUE name which is not store in the cell name
+        return new SimpleDenseCellName(prefix.get(0));
+    }
+
+    @Override
+    public Composite fromByteBuffer(ByteBuffer bb)
+    {
+        return !bb.hasRemaining()
+             ? Composites.EMPTY
+             : new SimpleDenseCellName(bb);
+    }
+
+    public void addCQL3Column(ColumnIdentifier id) {}
+    public void removeCQL3Column(ColumnIdentifier id) {}
+
+    public CQL3Row.Builder CQL3RowBuilder(long now)
+    {
+        return makeDenseCQL3RowBuilder(now);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/composites/SimpleSparseCellName.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleSparseCellName.java b/src/java/org/apache/cassandra/db/composites/SimpleSparseCellName.java
new file mode 100644
index 0000000..8105683
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/composites/SimpleSparseCellName.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class SimpleSparseCellName extends AbstractComposite implements CellName
+{
+    private final ColumnIdentifier columnName;
+
+    // Not meant to be used directly, you should use the CellNameType method instead
+    SimpleSparseCellName(ColumnIdentifier columnName)
+    {
+        this.columnName = columnName;
+    }
+
+    public int size()
+    {
+        return 1;
+    }
+
+    public ByteBuffer get(int i)
+    {
+        if (i != 0)
+            throw new IndexOutOfBoundsException();
+
+        return columnName.bytes;
+    }
+
+    @Override
+    public Composite withEOC(EOC newEoc)
+    {
+        // EOC makes no sense for not truly composites.
+        return this;
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer()
+    {
+        return columnName.bytes;
+    }
+
+    public int clusteringSize()
+    {
+        return 0;
+    }
+
+    public ColumnIdentifier cql3ColumnName()
+    {
+        return columnName;
+    }
+
+    public ByteBuffer collectionElement()
+    {
+        return null;
+    }
+
+    public boolean isCollectionCell()
+    {
+        return false;
+    }
+
+    public boolean isSameCQL3RowAs(CellName other)
+    {
+        return true;
+    }
+
+    @Override
+    public long memorySize()
+    {
+        return ObjectSizes.getFieldSize(ObjectSizes.getReferenceSize()) + columnName.memorySize();
+    }
+
+    @Override
+    public CellName copy(Allocator allocator)
+    {
+        // We're interning those instance in SparceCellNameType so don't need to copy.
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/composites/SimpleSparseCellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/SimpleSparseCellNameType.java b/src/java/org/apache/cassandra/db/composites/SimpleSparseCellNameType.java
new file mode 100644
index 0000000..6d86ce2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/composites/SimpleSparseCellNameType.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.composites;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.cql3.CQL3Row;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class SimpleSparseCellNameType extends AbstractSimpleCellNameType
+{
+    // Simple sparse means static thrift CF or non-clustered CQL3. This means that cell names will mainly
+    // be those that have been declared and we can intern the whole CellName instances.
+    private final Map<ByteBuffer, CellName> internedNames;
+
+    public SimpleSparseCellNameType(AbstractType<?> type)
+    {
+        this(type, new HashMap<ByteBuffer, CellName>());
+    }
+
+    private SimpleSparseCellNameType(AbstractType<?> type, Map<ByteBuffer, CellName> internedNames)
+    {
+        super(type);
+        this.internedNames = internedNames;
+    }
+
+    public int clusteringPrefixSize()
+    {
+        return 0;
+    }
+
+    public CellNameType setSubtype(int position, AbstractType<?> newType)
+    {
+        if (position != 0)
+            throw new IllegalArgumentException();
+        return new SimpleSparseCellNameType(newType, internedNames);
+    }
+
+    public CBuilder prefixBuilder()
+    {
+        return Composites.EMPTY_BUILDER;
+    }
+
+    public boolean isDense()
+    {
+        return false;
+    }
+
+    public CellName create(Composite prefix, ColumnIdentifier columnName)
+    {
+        assert prefix.isEmpty();
+        CellName cn = internedNames.get(columnName.bytes);
+        return cn == null ? new SimpleSparseCellName(columnName) : cn;
+    }
+
+    @Override
+    public Composite fromByteBuffer(ByteBuffer bb)
+    {
+        if (!bb.hasRemaining())
+            return Composites.EMPTY;
+
+        CellName cn = internedNames.get(bb);
+        return cn == null ? new SimpleSparseCellName(new ColumnIdentifier(bb, type)) : cn;
+    }
+
+    public void addCQL3Column(ColumnIdentifier id)
+    {
+        internedNames.put(id.bytes, new SimpleSparseCellName(id));
+    }
+
+    public void removeCQL3Column(ColumnIdentifier id)
+    {
+        internedNames.remove(id.bytes);
+    }
+
+    public CQL3Row.Builder CQL3RowBuilder(long now)
+    {
+        return makeSparseCQL3RowBuilder(now);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index c2c0ade..61f919e 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -20,13 +20,11 @@
  */
 package org.apache.cassandra.db.filter;
 
-import java.nio.ByteBuffer;
-
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnCounter
 {
@@ -75,9 +73,9 @@ public class ColumnCounter
 
     public static class GroupByPrefix extends ColumnCounter
     {
-        private final CompositeType type;
+        private final CellNameType type;
         private final int toGroup;
-        private ByteBuffer[] last;
+        private CellName last;
 
         /**
          * A column counter that count only 1 for all the columns sharing a
@@ -89,7 +87,7 @@ public class ColumnCounter
          *                column. If 0, all columns are grouped, otherwise we group
          *                those for which the {@code toGroup} first component are equals.
          */
-        public GroupByPrefix(long timestamp, CompositeType type, int toGroup)
+        public GroupByPrefix(long timestamp, CellNameType type, int toGroup)
         {
             super(timestamp);
             this.type = type;
@@ -112,15 +110,15 @@ public class ColumnCounter
                 return;
             }
 
-            ByteBuffer[] current = type.split(column.name());
-            assert current.length >= toGroup;
+            CellName current = column.name();
+            assert current.size() >= toGroup;
 
             if (last != null)
             {
                 boolean isSameGroup = true;
                 for (int i = 0; i < toGroup; i++)
                 {
-                    if (ByteBufferUtil.compareUnsigned(last[i], current[i]) != 0)
+                    if (type.subtype(i).compare(last.get(i), current.get(i)) != 0)
                     {
                         isSameGroup = false;
                         break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index e0c576e..5b504a5 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -21,48 +21,67 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NavigableMap;
 
 import com.google.common.collect.AbstractIterator;
 
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Allocator;
 
 public class ColumnSlice
 {
-    public static final Serializer serializer = new Serializer();
-
-    public static final ColumnSlice ALL_COLUMNS = new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    public static final ColumnSlice ALL_COLUMNS = new ColumnSlice(Composites.EMPTY, Composites.EMPTY);
     public static final ColumnSlice[] ALL_COLUMNS_ARRAY = new ColumnSlice[]{ ALL_COLUMNS };
 
-    public final ByteBuffer start;
-    public final ByteBuffer finish;
+    public final Composite start;
+    public final Composite finish;
 
-    public ColumnSlice(ByteBuffer start, ByteBuffer finish)
+    public ColumnSlice(Composite start, Composite finish)
     {
         assert start != null && finish != null;
         this.start = start;
         this.finish = finish;
     }
 
-    public boolean isAlwaysEmpty(AbstractType<?> comparator, boolean reversed)
+    public boolean isAlwaysEmpty(CellNameType comparator, boolean reversed)
     {
-        Comparator<ByteBuffer> orderedComparator = reversed ? comparator.reverseComparator : comparator;
-        return (start.remaining() > 0 && finish.remaining() > 0 && orderedComparator.compare(start, finish) > 0);
+        Comparator<Composite> orderedComparator = reversed ? comparator.reverseComparator() : comparator;
+        return !start.isEmpty() && !finish.isEmpty() && orderedComparator.compare(start, finish) > 0;
     }
 
-    public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)
+    public boolean includes(Comparator<Composite> cmp, Composite name)
     {
-        return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
+        return cmp.compare(start, name) <= 0 && (finish.isEmpty() || cmp.compare(finish, name) >= 0);
     }
 
-    public boolean isBefore(Comparator<ByteBuffer> cmp, ByteBuffer name)
+    public boolean isBefore(Comparator<Composite> cmp, Composite name)
     {
-        return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
+        return !finish.isEmpty() && cmp.compare(finish, name) < 0;
     }
 
+    public boolean intersects(List<ByteBuffer> minCellNames, List<ByteBuffer> maxCellNames, CellNameType comparator, boolean reversed)
+    {
+        assert minCellNames.size() == maxCellNames.size();
+
+        Composite sStart = reversed ? finish : start;
+        Composite sEnd = reversed ? start : finish;
+
+        for (int i = 0; i < minCellNames.size(); i++)
+        {
+            AbstractType<?> t = comparator.subtype(i);
+            if (  (i < sEnd.size() && t.compare(sEnd.get(i), minCellNames.get(i)) < 0)
+               || (i < sStart.size() && t.compare(sStart.get(i), maxCellNames.get(i)) > 0))
+                return false;
+        }
+        return true;
+    }
 
     @Override
     public final int hashCode()
@@ -83,47 +102,49 @@ public class ColumnSlice
     @Override
     public String toString()
     {
-        return "[" + ByteBufferUtil.bytesToHex(start) + ", " + ByteBufferUtil.bytesToHex(finish) + "]";
+        return "[" + ByteBufferUtil.bytesToHex(start.toByteBuffer()) + ", " + ByteBufferUtil.bytesToHex(finish.toByteBuffer()) + "]";
     }
 
     public static class Serializer implements IVersionedSerializer<ColumnSlice>
     {
+        private final CType type;
+
+        public Serializer(CType type)
+        {
+            this.type = type;
+        }
+
         public void serialize(ColumnSlice cs, DataOutput out, int version) throws IOException
         {
-            ByteBufferUtil.writeWithShortLength(cs.start, out);
-            ByteBufferUtil.writeWithShortLength(cs.finish, out);
+            ISerializer<Composite> serializer = type.serializer();
+            serializer.serialize(cs.start, out);
+            serializer.serialize(cs.finish, out);
         }
 
         public ColumnSlice deserialize(DataInput in, int version) throws IOException
         {
-            ByteBuffer start = ByteBufferUtil.readWithShortLength(in);
-            ByteBuffer finish = ByteBufferUtil.readWithShortLength(in);
+            ISerializer<Composite> serializer = type.serializer();
+            Composite start = serializer.deserialize(in);
+            Composite finish = serializer.deserialize(in);
             return new ColumnSlice(start, finish);
         }
 
         public long serializedSize(ColumnSlice cs, int version)
         {
-            TypeSizes sizes = TypeSizes.NATIVE;
-
-            int startSize = cs.start.remaining();
-            int finishSize = cs.finish.remaining();
-
-            int size = 0;
-            size += sizes.sizeof((short) startSize) + startSize;
-            size += sizes.sizeof((short) finishSize) + finishSize;
-            return size;
+            ISerializer<Composite> serializer = type.serializer();
+            return serializer.serializedSize(cs.start, TypeSizes.NATIVE) + serializer.serializedSize(cs.finish, TypeSizes.NATIVE);
         }
     }
 
     public static class NavigableMapIterator extends AbstractIterator<Column>
     {
-        private final NavigableMap<ByteBuffer, Column> map;
+        private final NavigableMap<CellName, Column> map;
         private final ColumnSlice[] slices;
 
         private int idx = 0;
         private Iterator<Column> currentSlice;
 
-        public NavigableMapIterator(NavigableMap<ByteBuffer, Column> map, ColumnSlice[] slices)
+        public NavigableMapIterator(NavigableMap<CellName, Column> map, ColumnSlice[] slices)
         {
             this.map = map;
             this.slices = slices;
@@ -139,20 +160,20 @@ public class ColumnSlice
                 ColumnSlice slice = slices[idx++];
                 // Note: we specialize the case of start == "" and finish = "" because it is slightly more efficient, but also they have a specific
                 // meaning (namely, they always extend to the beginning/end of the range).
-                if (slice.start.remaining() == 0)
+                if (slice.start.isEmpty())
                 {
-                    if (slice.finish.remaining() == 0)
+                    if (slice.finish.isEmpty())
                         currentSlice = map.values().iterator();
                     else
-                        currentSlice = map.headMap(slice.finish, true).values().iterator();
+                        currentSlice = map.headMap(new FakeCell(slice.finish), true).values().iterator();
                 }
-                else if (slice.finish.remaining() == 0)
+                else if (slice.finish.isEmpty())
                 {
-                    currentSlice = map.tailMap(slice.start, true).values().iterator();
+                    currentSlice = map.tailMap(new FakeCell(slice.start), true).values().iterator();
                 }
                 else
                 {
-                    currentSlice = map.subMap(slice.start, true, slice.finish, true).values().iterator();
+                    currentSlice = map.subMap(new FakeCell(slice.start), true, new FakeCell(slice.finish), true).values().iterator();
                 }
             }
 
@@ -163,4 +184,73 @@ public class ColumnSlice
             return computeNext();
         }
     }
+
+    /*
+    * We need to take a slice (headMap/tailMap/subMap) of a CellName map
+    * based on a Composite. While CellName and Composite are comparable
+    * and so this should work, I haven't found how to generify it properly.
+    * So instead we create a "fake" CellName object that just encapsulate
+    * the prefix. I might not be a valid CellName with respect to the CF
+    * CellNameType, but this doesn't matter here (since we only care about
+    * comparison). This is arguably a bit of a hack.
+    */
+    private static class FakeCell extends AbstractComposite implements CellName
+    {
+        private final Composite prefix;
+
+        private FakeCell(Composite prefix)
+        {
+            this.prefix = prefix;
+        }
+
+        public int size()
+        {
+            return prefix.size();
+        }
+
+        public ByteBuffer get(int i)
+        {
+            return prefix.get(i);
+        }
+
+        public Composite.EOC eoc()
+        {
+            return prefix.eoc();
+        }
+
+        public int clusteringSize()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public ColumnIdentifier cql3ColumnName()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer collectionElement()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isCollectionCell()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isSameCQL3RowAs(CellName other)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public CellName copy(Allocator allocator)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public long memorySize()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index e749871..f5a6bc3 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -19,19 +19,21 @@ package org.apache.cassandra.db.filter;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.apache.cassandra.db.marshal.CollectionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 
 /**
@@ -129,7 +131,7 @@ public abstract class ExtendedFilter
      * @return true if the provided data satisfies all the expressions from
      * the clause of this filter.
      */
-    public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement);
+    public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, Composite prefix, ByteBuffer collectionElement);
 
     public static boolean satisfies(int comparison, IndexExpression.Operator op)
     {
@@ -180,7 +182,7 @@ public abstract class ExtendedFilter
              * We also don't want to do for paging ranges as the actual filter depends on the row key (it would
              * probably be possible to make it work but we won't really use it so we don't bother).
              */
-            if (cfs.metadata.hasCompositeComparator() || dataRange instanceof DataRange.Paging)
+            if (cfs.getComparator().isCompound() || dataRange instanceof DataRange.Paging)
                 return null;
 
             IDiskAtomFilter filter = dataRange.columnFilter(null); // ok since not a paging range
@@ -200,9 +202,9 @@ public abstract class ExtendedFilter
                 assert filter instanceof NamesQueryFilter;
                 if (!clause.isEmpty())
                 {
-                    SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
+                    SortedSet<CellName> columns = new TreeSet<CellName>(cfs.getComparator());
                     for (IndexExpression expr : clause)
-                        columns.add(expr.column);
+                        columns.add(cfs.getComparator().cellFromByteBuffer(expr.column));
                     columns.addAll(((NamesQueryFilter) filter).columns);
                     return ((NamesQueryFilter) filter).withUpdatedColumns(columns);
                 }
@@ -233,7 +235,7 @@ public abstract class ExtendedFilter
 
             for (IndexExpression expr : clause)
             {
-                if (data.getColumn(expr.column) == null)
+                if (data.getColumn(data.getComparator().cellFromByteBuffer(expr.column)) == null)
                 {
                     logger.debug("adding extraFilter to cover additional expressions");
                     return true;
@@ -251,18 +253,18 @@ public abstract class ExtendedFilter
              * 2) We don't yet allow non-indexed range slice with filters in CQL3 (i.e. this will never be
              * called by CFS.filter() for composites).
              */
-            assert !cfs.metadata.hasCompositeComparator();
-
             if (!needsExtraQuery(rowKey.key, data))
                 return null;
 
             // Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
             // why we do the dance of avoiding to query any column we already have (it's also more efficient anyway)
-            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
+            SortedSet<CellName> columns = new TreeSet<CellName>(cfs.getComparator());
             for (IndexExpression expr : clause)
-                if (data.getColumn(expr.column) == null)
-                    columns.add(expr.column);
-
+            {
+                CellName name = data.getComparator().cellFromByteBuffer(expr.column);
+                if (data.getColumn(name) == null)
+                    columns.add(name);
+            }
             assert !columns.isEmpty();
             return new NamesQueryFilter(columns);
         }
@@ -279,7 +281,7 @@ public abstract class ExtendedFilter
             return pruned;
         }
 
-        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement)
+        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, Composite prefix, ByteBuffer collectionElement)
         {
             for (IndexExpression expression : clause)
             {
@@ -290,7 +292,7 @@ public abstract class ExtendedFilter
                 {
                     // This can't happen with CQL3 as this should be rejected upfront. For thrift however,
                     // column name are not predefined. But that means the column name correspond to an internal one.
-                    Column column = data.getColumn(expression.column);
+                    Column column = data.getColumn(data.getComparator().cellFromByteBuffer(expression.column));
                     if (column != null)
                     {
                         dataValue = column.value();
@@ -301,12 +303,12 @@ public abstract class ExtendedFilter
                 {
                     if (def.type.isCollection())
                     {
-                        if (!collectionSatisfies(def, data, builder, expression, collectionElement))
+                        if (!collectionSatisfies(def, data, prefix, expression, collectionElement))
                             return false;
                         continue;
                     }
 
-                    dataValue = extractDataValue(def, rowKey.key, data, builder);
+                    dataValue = extractDataValue(def, rowKey.key, data, prefix);
                     validator = def.type;
                 }
 
@@ -320,35 +322,33 @@ public abstract class ExtendedFilter
             return true;
         }
 
-        private static boolean collectionSatisfies(ColumnDefinition def, ColumnFamily data, ColumnNameBuilder builder, IndexExpression expr, ByteBuffer collectionElement)
+        private static boolean collectionSatisfies(ColumnDefinition def, ColumnFamily data, Composite prefix, IndexExpression expr, ByteBuffer collectionElement)
         {
             assert def.type.isCollection();
 
             CollectionType type = (CollectionType)def.type;
-            builder = builder.copy().add(def.name.bytes);
-
             switch (type.kind)
             {
                 case LIST:
                     assert collectionElement != null;
-                    return type.valueComparator().compare(data.getColumn(builder.add(collectionElement).build()).value(), expr.value) == 0;
+                    return type.valueComparator().compare(data.getColumn(data.getComparator().create(prefix, def.name, collectionElement)).value(), expr.value) == 0;
                 case SET:
-                    return data.getColumn(builder.add(expr.value).build()) != null;
+                    return data.getColumn(data.getComparator().create(prefix, def.name, expr.value)) != null;
                 case MAP:
                     if (expr.operator == IndexExpression.Operator.CONTAINS_KEY)
                     {
-                        return data.getColumn(builder.add(expr.value).build()) != null;
+                        return data.getColumn(data.getComparator().create(prefix, def.name, expr.value)) != null;
                     }
                     else
                     {
                         assert collectionElement != null;
-                        return type.valueComparator().compare(data.getColumn(builder.add(collectionElement).build()).value(), expr.value) == 0;
+                        return type.valueComparator().compare(data.getColumn(data.getComparator().create(prefix, def.name, collectionElement)).value(), expr.value) == 0;
                     }
             }
             throw new AssertionError();
         }
 
-        private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+        private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, Composite prefix)
         {
             switch (def.kind)
             {
@@ -357,10 +357,13 @@ public abstract class ExtendedFilter
                          ? rowKey
                          : ((CompositeType)data.metadata().getKeyValidator()).split(rowKey)[def.position()];
                 case CLUSTERING_COLUMN:
-                    return builder.get(def.position());
+                    return prefix.get(def.position());
                 case REGULAR:
-                    ByteBuffer colName = builder == null ? def.name.bytes : builder.copy().add(def.name).build();
-                    Column column = data.getColumn(colName);
+                    CellName cname = prefix == null
+                                   ? data.getComparator().cellFromByteBuffer(def.name.bytes)
+                                   : data.getComparator().create(prefix, def.name);
+
+                    Column column = data.getColumn(cname);
                     return column == null ? null : column.value();
                 case COMPACT_VALUE:
                     assert data.getColumnCount() == 1;
@@ -392,7 +395,7 @@ public abstract class ExtendedFilter
             return data;
         }
 
-        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement)
+        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, Composite prefix, ByteBuffer collectionElement)
         {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 69a8950..b55cfd7 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -22,11 +22,11 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
@@ -68,53 +68,53 @@ public interface IDiskAtomFilter
      */
     public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now);
 
-    public Comparator<Column> getColumnComparator(AbstractType<?> comparator);
+    public Comparator<Column> getColumnComparator(CellNameType comparator);
 
     public boolean isReversed();
     public void updateColumnsLimit(int newLimit);
 
     public int getLiveCount(ColumnFamily cf, long now);
-    public ColumnCounter columnCounter(AbstractType<?> comparator, long now);
+    public ColumnCounter columnCounter(CellNameType comparator, long now);
 
     public IDiskAtomFilter cloneShallow();
-    public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);
+    public boolean maySelectPrefix(Comparator<Composite> cmp, Composite prefix);
 
     boolean shouldInclude(SSTableReader sstable);
 
     public static class Serializer implements IVersionedSerializer<IDiskAtomFilter>
     {
-        public static Serializer instance = new Serializer();
+        private final CellNameType type;
+
+        public Serializer(CellNameType type)
+        {
+            this.type = type;
+        }
 
         public void serialize(IDiskAtomFilter filter, DataOutput out, int version) throws IOException
         {
             if (filter instanceof SliceQueryFilter)
             {
                 out.writeByte(0);
-                SliceQueryFilter.serializer.serialize((SliceQueryFilter)filter, out, version);
+                type.sliceQueryFilterSerializer().serialize((SliceQueryFilter)filter, out, version);
             }
             else
             {
                 out.writeByte(1);
-                NamesQueryFilter.serializer.serialize((NamesQueryFilter)filter, out, version);
+                type.namesQueryFilterSerializer().serialize((NamesQueryFilter)filter, out, version);
             }
         }
 
         public IDiskAtomFilter deserialize(DataInput in, int version) throws IOException
         {
-            throw new UnsupportedOperationException();
-        }
-
-        public IDiskAtomFilter deserialize(DataInput in, int version, AbstractType<?> comparator) throws IOException
-        {
-            int type = in.readByte();
-            if (type == 0)
+            int b = in.readByte();
+            if (b == 0)
             {
-                return SliceQueryFilter.serializer.deserialize(in, version);
+                return type.sliceQueryFilterSerializer().deserialize(in, version);
             }
             else
             {
-                assert type == 1;
-                return NamesQueryFilter.serializer.deserialize(in, version, comparator);
+                assert b == 1;
+                return type.namesQueryFilterSerializer().deserialize(in, version);
             }
         }
 
@@ -122,9 +122,9 @@ public interface IDiskAtomFilter
         {
             int size = 1;
             if (filter instanceof SliceQueryFilter)
-                size += SliceQueryFilter.serializer.serializedSize((SliceQueryFilter)filter, version);
+                size += type.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)filter, version);
             else
-                size += NamesQueryFilter.serializer.serializedSize((NamesQueryFilter)filter, version);
+                size += type.namesQueryFilterSerializer().serializedSize((NamesQueryFilter)filter, version);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index f65bf3a..9058101 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.filter;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.SortedSet;
@@ -32,46 +31,40 @@ import com.google.common.collect.AbstractIterator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.SSTableNamesIterator;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 public class NamesQueryFilter implements IDiskAtomFilter
 {
-    public static final Serializer serializer = new Serializer();
-
-    public final SortedSet<ByteBuffer> columns;
+    public final SortedSet<CellName> columns;
 
     // If true, getLiveCount will always return either 0 or 1. This uses the fact that we know 
     // CQL3 will never use a name filter with cell names spanning multiple CQL3 rows.
     private final boolean countCQL3Rows;
 
-    public NamesQueryFilter(SortedSet<ByteBuffer> columns)
+    public NamesQueryFilter(SortedSet<CellName> columns)
     {
         this(columns, false);
     }
 
-    public NamesQueryFilter(SortedSet<ByteBuffer> columns, boolean countCQL3Rows)
+    public NamesQueryFilter(SortedSet<CellName> columns, boolean countCQL3Rows)
     {
         this.columns = columns;
         this.countCQL3Rows = countCQL3Rows;
     }
 
-    public NamesQueryFilter(ByteBuffer column)
-    {
-        this(FBUtilities.singleton(column));
-    }
-
     public NamesQueryFilter cloneShallow()
     {
         // NQF is immutable as far as shallow cloning is concerned, so save the allocation.
         return this;
     }
 
-    public NamesQueryFilter withUpdatedColumns(SortedSet<ByteBuffer> newColumns)
+    public NamesQueryFilter withUpdatedColumns(SortedSet<CellName> newColumns)
     {
        return new NamesQueryFilter(newColumns, countCQL3Rows);
     }
@@ -99,9 +92,9 @@ public class NamesQueryFilter implements IDiskAtomFilter
             container.addIfRelevant(reducedColumns.next(), tester, gcBefore);
     }
 
-    public Comparator<Column> getColumnComparator(AbstractType<?> comparator)
+    public Comparator<Column> getColumnComparator(CellNameType comparator)
     {
-        return comparator.columnComparator;
+        return comparator.columnComparator();
     }
 
     @Override
@@ -137,11 +130,11 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return count;
     }
 
-    public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix)
+    public boolean maySelectPrefix(Comparator<Composite> cmp, Composite prefix)
     {
-        for (ByteBuffer column : columns)
+        for (CellName column : columns)
         {
-            if (ByteBufferUtil.isPrefix(prefix, column))
+            if (prefix.isPrefixOf(column))
                 return true;
         }
         return false;
@@ -157,7 +150,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return countCQL3Rows;
     }
 
-    public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
+    public ColumnCounter columnCounter(CellNameType comparator, long now)
     {
         return countCQL3Rows
              ? new ColumnCounter.GroupByPrefix(now, null, 0)
@@ -168,9 +161,9 @@ public class NamesQueryFilter implements IDiskAtomFilter
     {
         private final ColumnFamily cf;
         private final DecoratedKey key;
-        private final Iterator<ByteBuffer> iter;
+        private final Iterator<CellName> iter;
 
-        public ByNameColumnIterator(Iterator<ByteBuffer> iter, ColumnFamily cf, DecoratedKey key)
+        public ByNameColumnIterator(Iterator<CellName> iter, ColumnFamily cf, DecoratedKey key)
         {
             this.iter = iter;
             this.cf = cf;
@@ -191,7 +184,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
         {
             while (iter.hasNext())
             {
-                ByteBuffer current = iter.next();
+                CellName current = iter.next();
                 Column column = cf.getColumn(current);
                 if (column != null)
                     return column;
@@ -204,27 +197,31 @@ public class NamesQueryFilter implements IDiskAtomFilter
 
     public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
     {
+        private CellNameType type;
+
+        public Serializer(CellNameType type)
+        {
+            this.type = type;
+        }
+
         public void serialize(NamesQueryFilter f, DataOutput out, int version) throws IOException
         {
             out.writeInt(f.columns.size());
-            for (ByteBuffer cName : f.columns)
+            ISerializer<CellName> serializer = type.cellSerializer();
+            for (CellName cName : f.columns)
             {
-                ByteBufferUtil.writeWithShortLength(cName, out);
+                serializer.serialize(cName, out);
             }
             out.writeBoolean(f.countCQL3Rows);
         }
 
         public NamesQueryFilter deserialize(DataInput in, int version) throws IOException
         {
-            throw new UnsupportedOperationException();
-        }
-
-        public NamesQueryFilter deserialize(DataInput in, int version, AbstractType comparator) throws IOException
-        {
             int size = in.readInt();
-            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(comparator);
+            SortedSet<CellName> columns = new TreeSet<CellName>(type);
+            ISerializer<CellName> serializer = type.cellSerializer();
             for (int i = 0; i < size; ++i)
-                columns.add(ByteBufferUtil.readWithShortLength(in));
+                columns.add(serializer.deserialize(in));
             boolean countCQL3Rows = in.readBoolean();
             return new NamesQueryFilter(columns, countCQL3Rows);
         }
@@ -233,11 +230,9 @@ public class NamesQueryFilter implements IDiskAtomFilter
         {
             TypeSizes sizes = TypeSizes.NATIVE;
             int size = sizes.sizeof(f.columns.size());
-            for (ByteBuffer cName : f.columns)
-            {
-                int cNameSize = cName.remaining();
-                size += sizes.sizeof((short) cNameSize) + cNameSize;
-            }
+            ISerializer<CellName> serializer = type.cellSerializer();
+            for (CellName cName : f.columns)
+                size += serializer.serializedSize(cName, sizes);
             size += sizes.sizeof(f.countCQL3Rows);
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 4f71f3a..33e8904 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -17,12 +17,13 @@
  */
 package org.apache.cassandra.db.filter;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.HeapAllocator;
@@ -194,8 +195,8 @@ public class QueryFilter
      */
     public static QueryFilter getSliceFilter(DecoratedKey key,
                                              String cfName,
-                                             ByteBuffer start,
-                                             ByteBuffer finish,
+                                             Composite start,
+                                             Composite finish,
                                              boolean reversed,
                                              int limit,
                                              long timestamp)
@@ -218,19 +219,11 @@ public class QueryFilter
      * @param cfName column family to query
      * @param columns the column names to restrict the results to, sorted in comparator order
      */
-    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns, long timestamp)
+    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<CellName> columns, long timestamp)
     {
         return new QueryFilter(key, cfName, new NamesQueryFilter(columns), timestamp);
     }
 
-    /**
-     * convenience method for creating a name filter matching a single column
-     */
-    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column, long timestamp)
-    {
-        return new QueryFilter(key, cfName, new NamesQueryFilter(column), timestamp);
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 794d31c..c3b019f 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.db.filter;
 
+import java.nio.ByteBuffer;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.slf4j.Logger;
@@ -30,8 +30,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.SSTableSliceIterator;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
@@ -40,7 +41,6 @@ import org.apache.cassandra.tracing.Tracing;
 public class SliceQueryFilter implements IDiskAtomFilter
 {
     private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
-    public static final Serializer serializer = new Serializer();
 
     public final ColumnSlice[] slices;
     public final boolean reversed;
@@ -50,14 +50,24 @@ public class SliceQueryFilter implements IDiskAtomFilter
     // Not serialized, just a ack for range slices to find the number of live column counted, even when we group
     private ColumnCounter columnCounter;
 
-    public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
+    public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count)
+    {
+        this(new ColumnSlice(start, finish), reversed, count);
+    }
+
+    public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count, int compositesToGroup)
+    {
+        this(new ColumnSlice(start, finish), reversed, count, compositesToGroup);
+    }
+
+    public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count)
     {
-        this(new ColumnSlice[] { new ColumnSlice(start, finish) }, reversed, count);
+        this(new ColumnSlice[]{ slice }, reversed, count);
     }
 
-    public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean reversed, int count, int compositesToGroup)
+    public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count, int compositesToGroup)
     {
-        this(new ColumnSlice[] { new ColumnSlice(start, finish) }, reversed, count, compositesToGroup);
+        this(new ColumnSlice[]{ slice }, reversed, count, compositesToGroup);
     }
 
     /**
@@ -92,9 +102,9 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
     }
 
-    public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, AbstractType<?> comparator)
+    public SliceQueryFilter withUpdatedStart(Composite newStart, CellNameType comparator)
     {
-        Comparator<ByteBuffer> cmp = reversed ? comparator.reverseComparator : comparator;
+        Comparator<Composite> cmp = reversed ? comparator.reverseComparator() : comparator;
 
         List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>();
         boolean pastNewStart = false;
@@ -121,7 +131,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()]));
     }
 
-    public SliceQueryFilter withUpdatedSlice(ByteBuffer start, ByteBuffer finish)
+    public SliceQueryFilter withUpdatedSlice(Composite start, Composite finish)
     {
         return new SliceQueryFilter(new ColumnSlice[]{ new ColumnSlice(start, finish) }, reversed, count, compositesToGroup);
     }
@@ -172,9 +182,9 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return new SSTableSliceIterator(sstable, file, key, slices, reversed, indexEntry);
     }
 
-    public Comparator<Column> getColumnComparator(AbstractType<?> comparator)
+    public Comparator<Column> getColumnComparator(CellNameType comparator)
     {
-        return reversed ? comparator.columnReverseComparator : comparator.columnComparator;
+        return reversed ? comparator.columnReverseComparator() : comparator.columnComparator();
     }
 
     public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
@@ -219,14 +229,14 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return columnCounter(cf.getComparator(), now).countAll(cf).live();
     }
 
-    public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
+    public ColumnCounter columnCounter(CellNameType comparator, long now)
     {
         if (compositesToGroup < 0)
             return new ColumnCounter(now);
         else if (compositesToGroup == 0)
             return new ColumnCounter.GroupByPrefix(now, null, 0);
         else
-            return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup);
+            return new ColumnCounter.GroupByPrefix(now, comparator, compositesToGroup);
     }
 
     public void trim(ColumnFamily cf, int trimTo, long now)
@@ -256,17 +266,17 @@ public class SliceQueryFilter implements IDiskAtomFilter
         }
     }
 
-    public ByteBuffer start()
+    public Composite start()
     {
         return this.slices[0].start;
     }
 
-    public ByteBuffer finish()
+    public Composite finish()
     {
         return this.slices[slices.length - 1].finish;
     }
 
-    public void setStart(ByteBuffer start)
+    public void setStart(Composite start)
     {
         assert slices.length == 1;
         this.slices[0] = new ColumnSlice(start, this.slices[0].finish);
@@ -303,7 +313,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
         count = newLimit;
     }
 
-    public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix)
+    public boolean maySelectPrefix(Comparator<Composite> cmp, Composite prefix)
     {
         for (ColumnSlice slice : slices)
             if (slice.includes(cmp, prefix))
@@ -316,21 +326,32 @@ public class SliceQueryFilter implements IDiskAtomFilter
         List<ByteBuffer> minColumnNames = sstable.getSSTableMetadata().minColumnNames;
         List<ByteBuffer> maxColumnNames = sstable.getSSTableMetadata().maxColumnNames;
         assert minColumnNames.size() == maxColumnNames.size();
-        AbstractType<?> comparator = sstable.metadata.comparator;
+        CellNameType comparator = sstable.metadata.comparator;
 
         if (minColumnNames.isEmpty() || maxColumnNames.isEmpty())
             return true;
 
-        return comparator.intersects(minColumnNames, maxColumnNames, this);
+        for (ColumnSlice slice : slices)
+            if (slice.intersects(minColumnNames, maxColumnNames, comparator, reversed))
+                return true;
+
+        return false;
     }
 
     public static class Serializer implements IVersionedSerializer<SliceQueryFilter>
     {
+        private CType type;
+
+        public Serializer(CType type)
+        {
+            this.type = type;
+        }
+
         public void serialize(SliceQueryFilter f, DataOutput out, int version) throws IOException
         {
             out.writeInt(f.slices.length);
             for (ColumnSlice slice : f.slices)
-                ColumnSlice.serializer.serialize(slice, out, version);
+                type.sliceSerializer().serialize(slice, out, version);
             out.writeBoolean(f.reversed);
             int count = f.count;
             out.writeInt(count);
@@ -343,7 +364,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
             ColumnSlice[] slices;
             slices = new ColumnSlice[in.readInt()];
             for (int i = 0; i < slices.length; i++)
-                slices[i] = ColumnSlice.serializer.deserialize(in, version);
+                slices[i] = type.sliceSerializer().deserialize(in, version);
             boolean reversed = in.readBoolean();
             int count = in.readInt();
             int compositesToGroup = -1;
@@ -359,7 +380,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
             int size = 0;
             size += sizes.sizeof(f.slices.length);
             for (ColumnSlice slice : f.slices)
-                size += ColumnSlice.serializer.serializedSize(slice, version);
+                size += type.sliceSerializer().serializedSize(slice, version);
             size += sizes.sizeof(f.reversed);
             size += sizes.sizeof(f.count);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index b7593ad..ce7a021 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -46,7 +48,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
         columnDef = columnDefs.iterator().next();
 
-        AbstractType indexComparator = SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
+        CellNameType indexComparator = SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
         CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              indexedCfMetadata.cfName,
@@ -65,7 +67,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         return new DecoratedKey(new LocalToken(getIndexKeyComparator(), value), value);
     }
 
-    protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column);
+    protected abstract CellName makeIndexColumnName(ByteBuffer rowKey, Column column);
 
     protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column);
 
@@ -88,9 +90,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, column));
         int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
         ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
-        ByteBuffer name = makeIndexColumnName(rowKey, column);
-        assert name.remaining() > 0 && name.remaining() <= Column.MAX_NAME_LENGTH : name.remaining();
-        cfi.addTombstone(name, localDeletionTime, column.timestamp());
+        cfi.addTombstone(makeIndexColumnName(rowKey, column), localDeletionTime, column.timestamp());
         indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater);
         if (logger.isDebugEnabled())
             logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
@@ -100,8 +100,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, column));
         ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
-        ByteBuffer name = makeIndexColumnName(rowKey, column);
-        assert name.remaining() > 0 && name.remaining() <= Column.MAX_NAME_LENGTH : name.remaining();
+        CellName name = makeIndexColumnName(rowKey, column);
         if (column instanceof ExpiringColumn)
         {
             ExpiringColumn ec = (ExpiringColumn)column;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/362cc053/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 43ecb61..38d09be 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -273,20 +274,9 @@ public abstract class SecondaryIndex
     }
 
     /**
-     * Returns true if the provided column name is indexed by this secondary index.
-     *
-     * The default implement checks whether the name is one the columnDef name,
-     * but this should be overriden but subclass if needed.
+     * Returns true if the provided cell name is indexed by this secondary index.
      */
-    public boolean indexes(ByteBuffer name)
-    {
-        for (ColumnDefinition columnDef : columnDefs)
-        {
-            if (baseCfs.getComparator().compare(columnDef.name.bytes, name) == 0)
-                return true;
-        }
-        return false;
-    }
+    public abstract boolean indexes(CellName name);
 
     /**
      * This is the primary way to create a secondary index instance for a CF column.
@@ -342,12 +332,12 @@ public abstract class SecondaryIndex
      * Note: it would be cleaner to have this be a member method. However we need this when opening indexes
      * sstables, but by then the CFS won't be fully initiated, so the SecondaryIndex object won't be accessible.
      */
-    public static AbstractType<?> getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cdef)
+    public static CellNameType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cdef)
     {
         switch (cdef.getIndexType())
         {
             case KEYS:
-                return keyComparator;
+                return new SimpleDenseCellNameType(keyComparator);
             case COMPOSITES:
                 return CompositesIndex.getIndexComparator(baseMetadata, cdef);
             case CUSTOM: