You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/02 13:41:16 UTC

[7/9] Replace supercolumns internally by composites

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 476af88..3d726a0 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -129,7 +129,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) throws IOException
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes);
-        rm.delete(new QueryPath(SystemTable.HINTS_CF, null, columnName), timestamp);
+        rm.delete(SystemTable.HINTS_CF, columnName, timestamp);
         rm.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
     }
 
@@ -155,7 +155,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
         ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
         final RowMutation rm = new RowMutation(Table.SYSTEM_KS, hostIdBytes);
-        rm.delete(new QueryPath(SystemTable.HINTS_CF), System.currentTimeMillis());
+        rm.delete(SystemTable.HINTS_CF, System.currentTimeMillis());
 
         // execute asynchronously to avoid blocking caller (which may be processing gossip)
         Runnable runnable = new Runnable()
@@ -303,7 +303,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
         while (true)
         {
-            QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(SystemTable.HINTS_CF), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
+            QueryFilter filter = QueryFilter.getSliceFilter(epkey, SystemTable.HINTS_CF, startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
             ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int)(System.currentTimeMillis() / 1000));
             if (pagingFinished(hintsPage, startColumn))
             {
@@ -322,7 +322,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
             }
 
-            for (final IColumn hint : hintsPage.getSortedColumns())
+            for (final Column hint : hintsPage.getSortedColumns())
             {
                 // Skip tombstones:
                 // if we iterate quickly enough, it's possible that we could request a new page in the same millisecond
@@ -400,7 +400,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         RowPosition minPos = p.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
         IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
-        List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter, null);
+        List<Row> rows = hintStore.getRangeSlice(range, Integer.MAX_VALUE, filter, null);
         for (Row row : rows)
         {
             UUID hostId = UUIDGen.getUUID(row.key.key);
@@ -473,9 +473,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private List<Row> getHintsSlice(int columnCount)
     {
-        // ColumnParent for HintsCF...
-        ColumnParent parent = new ColumnParent(SystemTable.HINTS_CF);
-
         // Get count # of columns...
         SliceQueryFilter predicate = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                           ByteBufferUtil.EMPTY_BYTE_BUFFER,
@@ -491,7 +488,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         List<Row> rows;
         try
         {
-            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(Table.SYSTEM_KS, parent, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(Table.SYSTEM_KS, SystemTable.HINTS_CF, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/IColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IColumn.java b/src/java/org/apache/cassandra/db/IColumn.java
deleted file mode 100644
index 0a7fe7a..0000000
--- a/src/java/org/apache/cassandra/db/IColumn.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.FBUtilities;
-
-/** TODO: rename */
-public interface IColumn extends OnDiskAtom
-{
-    public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
-
-    /**
-     * @return true if the column has been deleted (is a tombstone).  This depends on comparing the server clock
-     * with getLocalDeletionTime, so it can change during a single request if you're not careful.
-     */
-    public boolean isMarkedForDelete();
-
-    public long getMarkedForDeleteAt();
-    public long mostRecentLiveChangeAt();
-    public long mostRecentNonGCableChangeAt(int gcbefore);
-    /** the size of user-provided data, not including internal overhead */
-    public int dataSize();
-    public int serializationFlags();
-    public long timestamp();
-    public ByteBuffer value();
-    public Collection<IColumn> getSubColumns();
-    public IColumn getSubColumn(ByteBuffer columnName);
-    public void addColumn(IColumn column);
-    public void addColumn(IColumn column, Allocator allocator);
-    public IColumn diff(IColumn column);
-    public IColumn reconcile(IColumn column);
-    public IColumn reconcile(IColumn column, Allocator allocator);
-    public String getString(AbstractType<?> comparator);
-    public void validateFields(CFMetaData metadata) throws MarshalException;
-
-    /** clones the column for the row cache, interning column names and making copies of other underlying byte buffers */
-    IColumn localCopy(ColumnFamilyStore cfs);
-
-    /**
-     * clones the column for the memtable, interning column names and making copies of other underlying byte buffers.
-     * Unlike the other localCopy, this uses Allocator to allocate values in contiguous memory regions,
-     * which helps avoid heap fragmentation.
-     */
-    IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator);
-
-    /**
-     * For a simple column, live == !isMarkedForDelete.
-     * For a supercolumn, live means it has at least one subcolumn whose timestamp is greater than the
-     * supercolumn deleted-at time.
-     */
-    boolean isLive();
-
-    /**
-     * For a standard column, this is the same as timestamp().
-     * For a super column, this is the max column timestamp of the sub columns.
-     */
-    public long maxTimestamp();
-
-    /**
-     * @return true if the column or any its subcolumns is no longer relevant after @param gcBefore
-     */
-    public boolean hasIrrelevantData(int gcBefore);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/IColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IColumnContainer.java b/src/java/org/apache/cassandra/db/IColumnContainer.java
deleted file mode 100644
index a3bd210..0000000
--- a/src/java/org/apache/cassandra/db/IColumnContainer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.Allocator;
-
-public interface IColumnContainer
-{
-    public void addColumn(IColumn column);
-    public void addColumn(IColumn column, Allocator allocator);
-    public void remove(ByteBuffer columnName);
-
-    /**
-     * Replace oldColumn if represent by newColumn.
-     * Returns true if oldColumn was present (and thus replaced)
-     * oldColumn and newColumn should have the same name.
-     * !NOTE! This should only be used if you know this is what you need. To
-     * add a column such that it use the usual column resolution rules in a
-     * thread safe manner, use addColumn.
-     */
-    public boolean replace(IColumn oldColumn, IColumn newColumn);
-
-    public boolean isMarkedForDelete();
-    public DeletionInfo deletionInfo();
-    public boolean hasIrrelevantData(int gcBefore);
-
-    public AbstractType<?> getComparator();
-
-    public Collection<IColumn> getSortedColumns();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ISortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java
index 5ccba19..df288aa 100644
--- a/src/java/org/apache/cassandra/db/ISortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -62,7 +62,7 @@ public interface ISortedColumns extends IIterableColumns
      * If a column with the same name is already present in the map, it will
      * be replaced by the newly added column.
      */
-    public void addColumn(IColumn column, Allocator allocator);
+    public void addColumn(Column column, Allocator allocator);
 
     /**
      * Adds all the columns of a given column map to this column map.
@@ -75,19 +75,19 @@ public interface ISortedColumns extends IIterableColumns
      *
      *  @return the difference in size seen after merging the given columns
      */
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer);
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer);
 
     /**
      * Adds the columns without necessarily computing the size delta
      */
-    public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation);
+    public void addAll(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation);
 
     /**
      * Replace oldColumn if present by newColumn.
      * Returns true if oldColumn was present and thus replaced.
      * oldColumn and newColumn should have the same name.
      */
-    public boolean replace(IColumn oldColumn, IColumn newColumn);
+    public boolean replace(Column oldColumn, Column newColumn);
 
     /**
      * Remove if present a column by name.
@@ -103,7 +103,7 @@ public interface ISortedColumns extends IIterableColumns
      * Get a column given its name, returning null if the column is not
      * present.
      */
-    public IColumn getColumn(ByteBuffer name);
+    public Column getColumn(ByteBuffer name);
 
     /**
      * Returns a set with the names of columns in this column map.
@@ -117,14 +117,14 @@ public interface ISortedColumns extends IIterableColumns
      * The columns in the returned collection should be sorted as the columns
      * in this map.
      */
-    public Collection<IColumn> getSortedColumns();
+    public Collection<Column> getSortedColumns();
 
     /**
      * Returns the columns of this column map as a collection.
      * The columns in the returned collection should be sorted in reverse
      * order of the columns in this map.
      */
-    public Collection<IColumn> getReverseSortedColumns();
+    public Collection<Column> getReverseSortedColumns();
 
     /**
      * Returns the number of columns in this map.
@@ -140,13 +140,13 @@ public interface ISortedColumns extends IIterableColumns
      * Returns an iterator over the columns of this map that returns only the matching @param slices.
      * The provided slices must be in order and must be non-overlapping.
      */
-    public Iterator<IColumn> iterator(ColumnSlice[] slices);
+    public Iterator<Column> iterator(ColumnSlice[] slices);
 
     /**
      * Returns a reversed iterator over the columns of this map that returns only the matching @param slices.
      * The provided slices must be in reversed order and must be non-overlapping.
      */
-    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices);
+    public Iterator<Column> reverseIterator(ColumnSlice[] slices);
 
     /**
      * Returns if this map only support inserts in reverse order.
@@ -171,6 +171,6 @@ public interface ISortedColumns extends IIterableColumns
          * columns in the provided sorted map.
          * See {@code create} for the description of {@code insertReversed}
          */
-        public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sm, boolean insertReversed);
+        public ISortedColumns fromSorted(SortedMap<ByteBuffer, Column> sm, boolean insertReversed);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6b708d5..32e6852 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -93,9 +93,9 @@ public class Memtable
 
     private final SlabAllocator allocator = new SlabAllocator();
     // We really only need one column by allocator but one by memtable is not a big waste and avoids needing allocators to know about CFS
-    private final Function<IColumn, IColumn> localCopyFunction = new Function<IColumn, IColumn>()
+    private final Function<Column, Column> localCopyFunction = new Function<Column, Column>()
     {
-        public IColumn apply(IColumn c)
+        public Column apply(Column c)
         {
             return c.localCopy(cfs, allocator);
         };
@@ -226,7 +226,7 @@ public class Memtable
         if (previous == null)
         {
             // AtomicSortedColumns doesn't work for super columns (see #3821)
-            ColumnFamily empty = cf.cloneMeShallow(cf.isSuper() ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory(), false);
+            ColumnFamily empty = cf.cloneMeShallow(AtomicSortedColumns.factory(), false);
             // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
             previous = columnFamilies.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
             if (previous == null)
@@ -316,7 +316,7 @@ public class Memtable
     public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)
     {
         assert cf != null;
-        final Iterator<IColumn> filteredIter = filter.reversed ? cf.reverseIterator(filter.slices) : cf.iterator(filter.slices);
+        final Iterator<Column> filteredIter = filter.reversed ? cf.reverseIterator(filter.slices) : cf.iterator(filter.slices);
 
         return new AbstractColumnIterator()
         {
@@ -345,7 +345,6 @@ public class Memtable
     public static OnDiskAtomIterator getNamesIterator(final DecoratedKey key, final ColumnFamily cf, final NamesQueryFilter filter)
     {
         assert cf != null;
-        final boolean isStandard = !cf.isSuper();
 
         return new SimpleAbstractColumnIterator()
         {
@@ -366,10 +365,9 @@ public class Memtable
                 while (iter.hasNext())
                 {
                     ByteBuffer current = iter.next();
-                    IColumn column = cf.getColumn(current);
+                    Column column = cf.getColumn(current);
                     if (column != null)
-                        // clone supercolumns so caller can freely removeDeleted or otherwise mutate it
-                        return isStandard ? column : ((SuperColumn)column).cloneMe();
+                        return column;
                 }
                 return endOfData();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 7501d83..cc997ca 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -23,7 +23,6 @@ import java.security.MessageDigest;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -48,18 +47,15 @@ public interface OnDiskAtom
 
     public static class Serializer implements ISSTableSerializer<OnDiskAtom>
     {
-        private final IColumnSerializer columnSerializer;
+        public static Serializer instance = new Serializer();
 
-        public Serializer(IColumnSerializer columnSerializer)
-        {
-            this.columnSerializer = columnSerializer;
-        }
+        private Serializer() {}
 
         public void serializeForSSTable(OnDiskAtom atom, DataOutput dos) throws IOException
         {
-            if (atom instanceof IColumn)
+            if (atom instanceof Column)
             {
-                columnSerializer.serialize((IColumn)atom, dos);
+                Column.serializer().serialize((Column)atom, dos);
             }
             else
             {
@@ -70,27 +66,20 @@ public interface OnDiskAtom
 
         public OnDiskAtom deserializeFromSSTable(DataInput dis, Descriptor.Version version) throws IOException
         {
-            return deserializeFromSSTable(dis, IColumnSerializer.Flag.LOCAL, (int)(System.currentTimeMillis() / 1000), version);
+            return deserializeFromSSTable(dis, ColumnSerializer.Flag.LOCAL, (int)(System.currentTimeMillis() / 1000), version);
         }
 
-        public OnDiskAtom deserializeFromSSTable(DataInput dis, IColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
+        public OnDiskAtom deserializeFromSSTable(DataInput dis, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
         {
-            if (columnSerializer instanceof SuperColumnSerializer)
-            {
-                return columnSerializer.deserialize(dis, flag, expireBefore);
-            }
-            else
-            {
-                ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
-                if (name.remaining() <= 0)
-                    throw ColumnSerializer.CorruptColumnException.create(dis, name);
+            ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
+            if (name.remaining() <= 0)
+                throw ColumnSerializer.CorruptColumnException.create(dis, name);
 
-                int b = dis.readUnsignedByte();
-                if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
-                    return RangeTombstone.serializer.deserializeBody(dis, name, version);
-                else
-                    return ((ColumnSerializer)columnSerializer).deserializeColumnBody(dis, name, b, flag, expireBefore);
-            }
+            int b = dis.readUnsignedByte();
+            if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
+                return RangeTombstone.serializer.deserializeBody(dis, name, version);
+            else
+                return Column.serializer().deserializeColumnBody(dis, name, b, flag, expireBefore);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 1748abd..e365b00 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -49,13 +49,13 @@ import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
-import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
@@ -76,7 +76,6 @@ public class RangeSliceCommand implements IReadCommand
     public final String keyspace;
 
     public final String column_family;
-    public final ByteBuffer super_column;
 
     public final IDiskAtomFilter predicate;
     public final List<IndexExpression> row_filter;
@@ -86,26 +85,20 @@ public class RangeSliceCommand implements IReadCommand
     public final boolean countCQL3Rows;
     public final boolean isPaging;
 
-    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
+    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
     {
-        this(keyspace, column_family, super_column, predicate, range, null, maxResults, false, false);
+        this(keyspace, column_family, predicate, range, null, maxResults, false, false);
     }
 
-    public RangeSliceCommand(String keyspace, ColumnParent column_parent, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
+    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
     {
-        this(keyspace, column_parent.getColumn_family(), column_parent.super_column, predicate, range, row_filter, maxResults, false, false);
+        this(keyspace, column_family, predicate, range, row_filter, maxResults, false, false);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
-    {
-        this(keyspace, column_family, super_column, predicate, range, row_filter, maxResults, false, false);
-    }
-
-    public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
     {
         this.keyspace = keyspace;
         this.column_family = column_family;
-        this.super_column = super_column;
         this.predicate = predicate;
         this.range = range;
         this.row_filter = row_filter;
@@ -125,7 +118,6 @@ public class RangeSliceCommand implements IReadCommand
         return "RangeSliceCommand{" +
                "keyspace='" + keyspace + '\'' +
                ", column_family='" + column_family + '\'' +
-               ", super_column=" + super_column +
                ", predicate=" + predicate +
                ", range=" + range +
                ", row_filter =" + row_filter +
@@ -198,10 +190,26 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
     {
         dos.writeUTF(sliceCommand.keyspace);
         dos.writeUTF(sliceCommand.column_family);
-        ByteBuffer sc = sliceCommand.super_column;
-        dos.writeInt(sc == null ? 0 : sc.remaining());
-        if (sc != null)
-            ByteBufferUtil.write(sc, dos);
+
+        IDiskAtomFilter filter = sliceCommand.predicate;
+        if (version < MessagingService.VERSION_20)
+        {
+            // Pre-2.0, we need to know if it's a super column. If it is, we
+            // must extract the super column name from the predicate (and
+            // modify the predicate accordingly)
+            ByteBuffer sc = null;
+            CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.column_family);
+            if (metadata.cfType == ColumnFamilyType.Super)
+            {
+                SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
+                sc = scFilter.scName;
+                filter = scFilter.updatedFilter;
+            }
+
+            dos.writeInt(sc == null ? 0 : sc.remaining());
+            if (sc != null)
+                ByteBufferUtil.write(sc, dos);
+        }
 
         if (version < MessagingService.VERSION_12)
         {
@@ -250,26 +258,48 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         String keyspace = dis.readUTF();
         String columnFamily = dis.readUTF();
 
-        int scLength = dis.readInt();
-        ByteBuffer superColumn = null;
-        if (scLength > 0)
-        {
-            byte[] buf = new byte[scLength];
-            dis.readFully(buf);
-            superColumn = ByteBuffer.wrap(buf);
-        }
+        CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 
         IDiskAtomFilter predicate;
-        AbstractType<?> comparator = ColumnFamily.getComparatorFor(keyspace, columnFamily, superColumn);
-        if (version < MessagingService.VERSION_12)
+        if (version < MessagingService.VERSION_20)
         {
-            SlicePredicate pred = new SlicePredicate();
-            FBUtilities.deserialize(new TDeserializer(new TBinaryProtocol.Factory()), pred, dis);
-            predicate = ThriftValidation.asIFilter(pred, comparator);
+            int scLength = dis.readInt();
+            ByteBuffer superColumn = null;
+            if (scLength > 0)
+            {
+                byte[] buf = new byte[scLength];
+                dis.readFully(buf);
+                superColumn = ByteBuffer.wrap(buf);
+            }
+
+            AbstractType<?> comparator;
+            if (metadata.cfType == ColumnFamilyType.Super)
+            {
+                CompositeType type = (CompositeType)metadata.comparator;
+                comparator = superColumn == null ? type.types.get(0) : type.types.get(1);
+            }
+            else
+            {
+                comparator = metadata.comparator;
+            }
+
+            if (version < MessagingService.VERSION_12)
+            {
+                SlicePredicate pred = new SlicePredicate();
+                FBUtilities.deserialize(new TDeserializer(new TBinaryProtocol.Factory()), pred, dis);
+                predicate = ThriftValidation.asIFilter(pred, metadata, superColumn);
+            }
+            else
+            {
+                predicate = IDiskAtomFilter.Serializer.instance.deserialize(dis, version, comparator);
+            }
+
+            if (metadata.cfType == ColumnFamilyType.Super)
+                predicate = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, superColumn, predicate);
         }
         else
         {
-            predicate = IDiskAtomFilter.Serializer.instance.deserialize(dis, version, comparator);
+            predicate = IDiskAtomFilter.Serializer.instance.deserialize(dis, version, metadata.comparator);
         }
 
         List<IndexExpression> rowFilter = null;
@@ -304,7 +334,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             countCQL3Rows = dis.readBoolean();
             isPaging = dis.readBoolean();
         }
-        return new RangeSliceCommand(keyspace, columnFamily, superColumn, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
+        return new RangeSliceCommand(keyspace, columnFamily, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
     }
 
     public long serializedSize(RangeSliceCommand rsc, int version)
@@ -312,15 +342,27 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
         size += TypeSizes.NATIVE.sizeof(rsc.column_family);
 
-        ByteBuffer sc = rsc.super_column;
-        if (sc != null)
-        {
-            size += TypeSizes.NATIVE.sizeof(sc.remaining());
-            size += sc.remaining();
-        }
-        else
+        IDiskAtomFilter filter = rsc.predicate;
+        if (version < MessagingService.VERSION_20)
         {
-            size += TypeSizes.NATIVE.sizeof(0);
+            ByteBuffer sc = null;
+            CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.column_family);
+            if (metadata.cfType == ColumnFamilyType.Super)
+            {
+                SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
+                sc = scFilter.scName;
+                filter = scFilter.updatedFilter;
+            }
+
+            if (sc != null)
+            {
+                size += TypeSizes.NATIVE.sizeof(sc.remaining());
+                size += sc.remaining();
+            }
+            else
+            {
+                size += TypeSizes.NATIVE.sizeof(0);
+            }
         }
 
         if (version < MessagingService.VERSION_12)
@@ -328,7 +370,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
             try
             {
-                int predicateLength = ser.serialize(asSlicePredicate(rsc.predicate)).length;
+                int predicateLength = ser.serialize(asSlicePredicate(filter)).length;
                 if (version < MessagingService.VERSION_12)
                     size += TypeSizes.NATIVE.sizeof(predicateLength);
                 size += predicateLength;
@@ -340,7 +382,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         }
         else
         {
-            size += IDiskAtomFilter.Serializer.instance.serializedSize(rsc.predicate, version);
+            size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version);
         }
 
         if (version >= MessagingService.VERSION_11)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 1d472c3..bcddfea 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -83,9 +83,8 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
 
     public void validateFields(CFMetaData metadata) throws MarshalException
     {
-        AbstractType<?> nameValidator = metadata.cfType == ColumnFamilyType.Super ? metadata.subcolumnComparator : metadata.comparator;
-        nameValidator.validate(min);
-        nameValidator.validate(max);
+        metadata.comparator.validate(min);
+        metadata.comparator.validate(max);
     }
 
     public void updateDigest(MessageDigest digest)
@@ -192,7 +191,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
 
         /**
          * Update this tracker given an {@code atom}.
-         * If column is a IColumn, check if any tracked range is useless and
+         * If column is a Column, check if any tracked range is useless and
          * can be removed. If it is a RangeTombstone, add it to this tracker.
          */
         public void update(OnDiskAtom atom)
@@ -219,7 +218,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
             }
             else
             {
-                assert atom instanceof IColumn;
+                assert atom instanceof Column;
                 Iterator<RangeTombstone> iter = maxOrderingSet.iterator();
                 while (iter.hasNext())
                 {
@@ -240,7 +239,7 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
             }
         }
 
-        public boolean isDeleted(IColumn column)
+        public boolean isDeleted(Column column)
         {
             for (RangeTombstone tombstone : ranges)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index f3494e5..8b68d5b 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -24,10 +24,14 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -38,8 +42,22 @@ import org.apache.cassandra.utils.IFilter;
 
 public abstract class ReadCommand implements IReadCommand
 {
-    public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
-    public static final byte CMD_TYPE_GET_SLICE = 2;
+    public enum Type {
+        GET_BY_NAMES((byte)1),
+        GET_SLICES((byte)2);
+
+        public final byte serializedValue;
+
+        private Type(byte b)
+        {
+            this.serializedValue = b;
+        }
+
+        public static Type fromSerializedValue(byte b)
+        {
+            return b == 1 ? GET_BY_NAMES : GET_SLICES;
+        }
+    }
 
     public static final ReadCommandSerializer serializer = new ReadCommandSerializer();
 
@@ -48,20 +66,28 @@ public abstract class ReadCommand implements IReadCommand
         return new MessageOut<ReadCommand>(MessagingService.Verb.READ, this, serializer);
     }
 
-    public final QueryPath queryPath;
     public final String table;
+    public final String cfName;
     public final ByteBuffer key;
     private boolean isDigestQuery = false;
-    protected final byte commandType;
+    protected final Type commandType;
 
-    protected ReadCommand(String table, ByteBuffer key, QueryPath queryPath, byte cmdType)
+    protected ReadCommand(String table, ByteBuffer key, String cfName, Type cmdType)
     {
         this.table = table;
         this.key = key;
-        this.queryPath = queryPath;
+        this.cfName = cfName;
         this.commandType = cmdType;
     }
 
+    public static ReadCommand create(String table, ByteBuffer key, String cfName, IDiskAtomFilter filter)
+    {
+        if (filter instanceof SliceQueryFilter)
+            return new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+        else
+            return new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+    }
+
     public boolean isDigestQuery()
     {
         return isDigestQuery;
@@ -74,7 +100,7 @@ public abstract class ReadCommand implements IReadCommand
 
     public String getColumnFamilyName()
     {
-        return queryPath.columnFamilyName;
+        return cfName;
     }
 
     public abstract ReadCommand copy();
@@ -83,11 +109,6 @@ public abstract class ReadCommand implements IReadCommand
 
     public abstract IDiskAtomFilter filter();
 
-    protected AbstractType<?> getComparator()
-    {
-        return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
-    }
-
     public String getKeyspace()
     {
         return table;
@@ -113,28 +134,77 @@ public abstract class ReadCommand implements IReadCommand
 
 class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 {
-    private static final Map<Byte, IVersionedSerializer<ReadCommand>> CMD_SERIALIZER_MAP = new HashMap<Byte, IVersionedSerializer<ReadCommand>>();
-    static
-    {
-        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new SliceByNamesReadCommandSerializer());
-        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE, new SliceFromReadCommandSerializer());
-    }
-
-
     public void serialize(ReadCommand command, DataOutput dos, int version) throws IOException
     {
-        dos.writeByte(command.commandType);
-        CMD_SERIALIZER_MAP.get(command.commandType).serialize(command, dos, version);
+        // For super columns, when talking to an older node, we need to translate the filter used.
+        // That translation can change the filter type (names -> slice), and so change the command type.
+        // Hence we need to detect that early on, before we've written the command type.
+        ReadCommand newCommand = command;
+        ByteBuffer superColumn = null;
+        if (version < MessagingService.VERSION_20)
+        {
+            CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName);
+            if (metadata.cfType == ColumnFamilyType.Super)
+            {
+                SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
+                newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+                newCommand.setDigestQuery(command.isDigestQuery());
+                superColumn = scFilter.scName;
+            }
+        }
+
+        dos.writeByte(newCommand.commandType.serializedValue);
+        switch (command.commandType)
+        {
+            case GET_BY_NAMES:
+                SliceByNamesReadCommand.serializer.serialize(newCommand, superColumn, dos, version);
+                break;
+            case GET_SLICES:
+                SliceFromReadCommand.serializer.serialize(newCommand, superColumn, dos, version);
+                break;
+            default:
+                throw new AssertionError();
+        }
     }
 
     public ReadCommand deserialize(DataInput dis, int version) throws IOException
     {
-        byte msgType = dis.readByte();
-        return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis, version);
+        ReadCommand.Type msgType = ReadCommand.Type.fromSerializedValue(dis.readByte());
+        switch (msgType)
+        {
+            case GET_BY_NAMES:
+                return SliceByNamesReadCommand.serializer.deserialize(dis, version);
+            case GET_SLICES:
+                return SliceFromReadCommand.serializer.deserialize(dis, version);
+            default:
+                throw new AssertionError();
+        }
     }
 
     public long serializedSize(ReadCommand command, int version)
     {
-        return 1 + CMD_SERIALIZER_MAP.get(command.commandType).serializedSize(command, version);
+        ReadCommand newCommand = command;
+        ByteBuffer superColumn = null;
+        if (version < MessagingService.VERSION_20)
+        {
+            CFMetaData metadata = Schema.instance.getCFMetaData(command.table, command.cfName);
+            if (metadata.cfType == ColumnFamilyType.Super)
+            {
+                SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
+                newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+                newCommand.setDigestQuery(command.isDigestQuery());
+                superColumn = scFilter.scName;
+            }
+        }
+
+        switch (command.commandType)
+        {
+            case GET_BY_NAMES:
+                return 1 + SliceByNamesReadCommand.serializer.serializedSize(newCommand, superColumn, version);
+            case GET_SLICES:
+                return 1 + SliceFromReadCommand.serializer.serializedSize(newCommand, superColumn, version);
+            default:
+                throw new AssertionError();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 1f21006..95f36e3 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -94,7 +93,7 @@ class ReadResponseSerializer implements IVersionedSerializer<ReadResponse>
         if (!isDigest)
         {
             // This is coming from a remote host
-            row = Row.serializer.deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE, ArrayBackedSortedColumns.factory());
+            row = Row.serializer.deserialize(dis, version, ColumnSerializer.Flag.FROM_REMOTE, ArrayBackedSortedColumns.factory());
         }
 
         return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index e7e99fe..b948460 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -29,21 +29,16 @@ public class RetriedSliceFromReadCommand extends SliceFromReadCommand
     static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class);
     public final int originalCount;
 
-    public RetriedSliceFromReadCommand(String table, ByteBuffer key, ColumnParent column_parent, SliceQueryFilter filter, int originalCount)
+    public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter, int originalCount)
     {
-        this(table, key, new QueryPath(column_parent), filter, originalCount);
-    }
-
-    public RetriedSliceFromReadCommand(String table, ByteBuffer key, QueryPath path, SliceQueryFilter filter, int originalCount)
-    {
-        super(table, key, path, filter);
+        super(table, key, cfName, filter);
         this.originalCount = originalCount;
     }
 
     @Override
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, queryPath, filter, originalCount);
+        ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, filter, originalCount);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index 74cd906..54c5efe 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 import java.io.*;
 
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -62,7 +61,7 @@ public class Row
             ColumnFamily.serializer.serialize(row.cf, dos, version);
         }
 
-        public Row deserialize(DataInput dis, int version, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
+        public Row deserialize(DataInput dis, int version, ColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
         {
             return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
                            ColumnFamily.serializer.deserialize(dis, flag, factory, version));
@@ -70,7 +69,7 @@ public class Row
 
         public Row deserialize(DataInput dis, int version) throws IOException
         {
-            return deserialize(dis, version, IColumnSerializer.Flag.LOCAL, TreeMapBackedSortedColumns.factory());
+            return deserialize(dis, version, ColumnSerializer.Flag.LOCAL, TreeMapBackedSortedColumns.factory());
         }
 
         public long serializedSize(Row row, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 937c496..38d0fc1 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -107,7 +107,7 @@ public class RowIteratorFactory
                 }
                 else
                 {
-                    QueryFilter keyFilter = new QueryFilter(key, filter.path, filter.filter);
+                    QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter);
                     returnCF = cfs.filterColumnFamily(cached, keyFilter, gcBefore);
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index b4666b5..7ad3100 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -29,9 +29,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.MessageOut;
@@ -115,8 +113,9 @@ public class RowMutation implements IMutation
             ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
 
         // serialize the hint with id and version as a composite column name
-        QueryPath path = new QueryPath(SystemTable.HINTS_CF, null, HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version));
-        rm.add(path, ByteBuffer.wrap(mutation.getSerializedBuffer(MessagingService.current_version)), System.currentTimeMillis(), ttl);
+        ByteBuffer name = HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version);
+        ByteBuffer value = ByteBuffer.wrap(mutation.getSerializedBuffer(MessagingService.current_version));
+        rm.add(SystemTable.HINTS_CF, name, value, System.currentTimeMillis(), ttl);
 
         return rm;
     }
@@ -156,81 +155,37 @@ public class RowMutation implements IMutation
         return modifications.isEmpty();
     }
 
-    /*
-     * Specify a column name and a corresponding value for
-     * the column. Column name is specified as <column family>:column.
-     * This will result in a ColumnFamily associated with
-     * <column family> as name and a Column with <column>
-     * as name. The column can be further broken up
-     * as super column name : columnname  in case of super columns
-     *
-     * param @ cf - column name as <column family>:<column>
-     * param @ value - value associated with the column
-     * param @ timestamp - timestamp associated with this data.
-     * param @ timeToLive - ttl for the column, 0 for standard (non expiring) columns
-     *
-     * @Deprecated this tends to be low-performance; we're doing two hash lookups,
-     * one of which instantiates a Pair, and callers tend to instantiate new QP objects
-     * for each call as well.  Use the add(ColumnFamily) overload instead.
-     */
-    public void add(QueryPath path, ByteBuffer value, long timestamp, int timeToLive)
+    public void add(String cfName, ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive)
     {
-        UUID id = Schema.instance.getId(table, path.columnFamilyName);
-        ColumnFamily columnFamily = modifications.get(id);
-
-        if (columnFamily == null)
-        {
-            columnFamily = ColumnFamily.create(table, path.columnFamilyName);
-            modifications.put(id, columnFamily);
-        }
-        columnFamily.addColumn(path, value, timestamp, timeToLive);
+        addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
     }
 
-    public void addCounter(QueryPath path, long value)
+    public void addCounter(String cfName, ByteBuffer name, long value)
     {
-        UUID id = Schema.instance.getId(table, path.columnFamilyName);
-        ColumnFamily columnFamily = modifications.get(id);
-
-        if (columnFamily == null)
-        {
-            columnFamily = ColumnFamily.create(table, path.columnFamilyName);
-            modifications.put(id, columnFamily);
-        }
-        columnFamily.addCounter(path, value);
+        addOrGet(cfName).addCounter(name, value);
     }
 
-    public void add(QueryPath path, ByteBuffer value, long timestamp)
+    public void add(String cfName, ByteBuffer name, ByteBuffer value, long timestamp)
     {
-        add(path, value, timestamp, 0);
+        add(cfName, name, value, timestamp, 0);
     }
 
-    public void delete(QueryPath path, long timestamp)
+    public void delete(String cfName, long timestamp)
     {
-        UUID id = Schema.instance.getId(table, path.columnFamilyName);
-
         int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+        addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
+    }
 
-        ColumnFamily columnFamily = modifications.get(id);
-        if (columnFamily == null)
-        {
-            columnFamily = ColumnFamily.create(table, path.columnFamilyName);
-            modifications.put(id, columnFamily);
-        }
+    public void delete(String cfName, ByteBuffer name, long timestamp)
+    {
+        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+        addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
+    }
 
-        if (path.superColumnName == null && path.columnName == null)
-        {
-            columnFamily.delete(new DeletionInfo(timestamp, localDeleteTime));
-        }
-        else if (path.columnName == null)
-        {
-            SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator());
-            sc.delete(new DeletionInfo(timestamp, localDeleteTime));
-            columnFamily.addColumn(sc);
-        }
-        else
-        {
-            columnFamily.addTombstone(path, localDeleteTime, timestamp);
-        }
+    public void deleteRange(String cfName, ByteBuffer start, ByteBuffer end, long timestamp)
+    {
+        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+        addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
     }
 
     public void addAll(IMutation m)
@@ -314,49 +269,6 @@ public class RowMutation implements IMutation
         return buff.append("])").toString();
     }
 
-    public void addColumnOrSuperColumn(String cfName, ColumnOrSuperColumn cosc)
-    {
-        if (cosc.super_column != null)
-        {
-            for (org.apache.cassandra.thrift.Column column : cosc.super_column.columns)
-            {
-                add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl);
-            }
-        }
-        else if (cosc.column != null)
-        {
-            add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
-        }
-        else if (cosc.counter_super_column != null)
-        {
-            for (org.apache.cassandra.thrift.CounterColumn column : cosc.counter_super_column.columns)
-            {
-                addCounter(new QueryPath(cfName, cosc.counter_super_column.name, column.name), column.value);
-            }
-        }
-        else // cosc.counter_column != null
-        {
-            addCounter(new QueryPath(cfName, null, cosc.counter_column.name), cosc.counter_column.value);
-        }
-    }
-
-    public void deleteColumnOrSuperColumn(String cfName, Deletion del)
-    {
-        if (del.predicate != null && del.predicate.column_names != null)
-        {
-            for(ByteBuffer c : del.predicate.column_names)
-            {
-                if (del.super_column == null && Schema.instance.getColumnFamilyType(table, cfName) == ColumnFamilyType.Super)
-                    delete(new QueryPath(cfName, c), del.timestamp);
-                else
-                    delete(new QueryPath(cfName, del.super_column, c), del.timestamp);
-            }
-        }
-        else
-        {
-            delete(new QueryPath(cfName, del.super_column), del.timestamp);
-        }
-    }
 
     public static RowMutation fromBytes(byte[] raw, int version) throws IOException
     {
@@ -396,7 +308,7 @@ public class RowMutation implements IMutation
             }
         }
 
-        public RowMutation deserialize(DataInput dis, int version, IColumnSerializer.Flag flag) throws IOException
+        public RowMutation deserialize(DataInput dis, int version, ColumnSerializer.Flag flag) throws IOException
         {
             String table = dis.readUTF();
             ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
@@ -417,7 +329,7 @@ public class RowMutation implements IMutation
 
         public RowMutation deserialize(DataInput dis, int version) throws IOException
         {
-            return deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE);
+            return deserialize(dis, version, ColumnSerializer.Flag.FROM_REMOTE);
         }
 
         public long serializedSize(RowMutation rm, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 2b3b0b1..0273cbc 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -21,39 +21,31 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SliceByNamesReadCommand extends ReadCommand
 {
-    public final NamesQueryFilter filter;
-
-    public SliceByNamesReadCommand(String table, ByteBuffer key, ColumnParent column_parent, Collection<ByteBuffer> columnNames)
-    {
-        this(table, key, new QueryPath(column_parent), columnNames);
-    }
+    static final SliceByNamesReadCommandSerializer serializer = new SliceByNamesReadCommandSerializer();
 
-    public SliceByNamesReadCommand(String table, ByteBuffer key, QueryPath path, Collection<ByteBuffer> columnNames)
-    {
-        super(table, key, path, CMD_TYPE_GET_SLICE_BY_NAMES);
-        SortedSet s = new TreeSet<ByteBuffer>(getComparator());
-        s.addAll(columnNames);
-        this.filter = new NamesQueryFilter(s);
-    }
+    public final NamesQueryFilter filter;
 
-    public SliceByNamesReadCommand(String table, ByteBuffer key, QueryPath path, NamesQueryFilter filter)
+    public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, NamesQueryFilter filter)
     {
-        super(table, key, path, CMD_TYPE_GET_SLICE_BY_NAMES);
+        super(table, key, cfName, Type.GET_BY_NAMES);
         this.filter = filter;
     }
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, queryPath, filter);
+        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, filter);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }
@@ -61,7 +53,7 @@ public class SliceByNamesReadCommand extends ReadCommand
     public Row getRow(Table table) throws IOException
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(new QueryFilter(dk, queryPath, filter));
+        return table.getRow(new QueryFilter(dk, cfName, filter));
     }
 
     @Override
@@ -70,7 +62,7 @@ public class SliceByNamesReadCommand extends ReadCommand
         return "SliceByNamesReadCommand(" +
                "table='" + table + '\'' +
                ", key=" + ByteBufferUtil.bytesToHex(key) +
-               ", columnParent='" + queryPath + '\'' +
+               ", cfName='" + cfName + '\'' +
                ", filter=" + filter +
                ')';
     }
@@ -85,30 +77,86 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
 {
     public void serialize(ReadCommand cmd, DataOutput dos, int version) throws IOException
     {
+        serialize(cmd, null, dos, version);
+    }
+
+    public void serialize(ReadCommand cmd, ByteBuffer superColumn, DataOutput dos, int version) throws IOException
+    {
         SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
         dos.writeBoolean(command.isDigestQuery());
         dos.writeUTF(command.table);
         ByteBufferUtil.writeWithShortLength(command.key, dos);
-        command.queryPath.serialize(dos);
+
+        if (version < MessagingService.VERSION_20)
+            new QueryPath(command.cfName, superColumn).serialize(dos);
+        else
+            dos.writeUTF(command.cfName);
+
         NamesQueryFilter.serializer.serialize(command.filter, dos, version);
     }
 
-    public SliceByNamesReadCommand deserialize(DataInput dis, int version) throws IOException
+    public ReadCommand deserialize(DataInput dis, int version) throws IOException
     {
         boolean isDigest = dis.readBoolean();
         String table = dis.readUTF();
         ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
-        QueryPath columnParent = QueryPath.deserialize(dis);
 
-        AbstractType<?> comparator = ColumnFamily.getComparatorFor(table, columnParent.columnFamilyName, columnParent.superColumnName);
-        NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(dis, version, comparator);
-        SliceByNamesReadCommand command = new SliceByNamesReadCommand(table, key, columnParent, filter);
+        String cfName;
+        ByteBuffer sc = null;
+        if (version < MessagingService.VERSION_20)
+        {
+            QueryPath path = QueryPath.deserialize(dis);
+            cfName = path.columnFamilyName;
+            sc = path.superColumnName;
+        }
+        else
+        {
+            cfName = dis.readUTF();
+        }
+
+        CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
+        ReadCommand command;
+        if (version < MessagingService.VERSION_20)
+        {
+            AbstractType<?> comparator;
+            if (metadata.cfType == ColumnFamilyType.Super)
+            {
+                CompositeType type = (CompositeType)metadata.comparator;
+                comparator = sc == null ? type.types.get(0) : type.types.get(1);
+            }
+            else
+            {
+                comparator = metadata.comparator;
+            }
+
+            IDiskAtomFilter filter = NamesQueryFilter.serializer.deserialize(dis, version, comparator);
+
+            if (metadata.cfType == ColumnFamilyType.Super)
+                filter = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, sc, filter);
+
+            // Due to SC compat, it's possible we get back a slice filter at this point
+            if (filter instanceof NamesQueryFilter)
+                command = new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+            else
+                command = new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+        }
+        else
+        {
+            NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(dis, version, metadata.comparator);
+            command = new SliceByNamesReadCommand(table, key, cfName, filter);
+        }
+
         command.setDigestQuery(isDigest);
         return command;
     }
 
     public long serializedSize(ReadCommand cmd, int version)
     {
+        return serializedSize(cmd, null, version);
+    }
+
+    public long serializedSize(ReadCommand cmd, ByteBuffer superColumn, int version)
+    {
         TypeSizes sizes = TypeSizes.NATIVE;
         SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
         int size = sizes.sizeof(command.isDigestQuery());
@@ -116,7 +164,16 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
 
         size += sizes.sizeof(command.table);
         size += sizes.sizeof((short)keySize) + keySize;
-        size += command.queryPath.serializedSize(sizes);
+
+        if (version < MessagingService.VERSION_20)
+        {
+            size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
+        }
+        else
+        {
+            size += sizes.sizeof(command.cfName);
+        }
+
         size += NamesQueryFilter.serializer.serializedSize(command.filter, version);
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index d52826b..3a05ce0 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -25,41 +25,37 @@ import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.RepairCallback;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SliceFromReadCommand extends ReadCommand
 {
     static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class);
 
-    public final SliceQueryFilter filter;
+    static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer();
 
-    public SliceFromReadCommand(String table, ByteBuffer key, ColumnParent column_parent, ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
-    {
-        this(table, key, new QueryPath(column_parent), start, finish, reversed, count);
-    }
-
-    public SliceFromReadCommand(String table, ByteBuffer key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int count)
-    {
-        this(table, key, path, new SliceQueryFilter(start, finish, reversed, count));
-    }
+    public final SliceQueryFilter filter;
 
-    public SliceFromReadCommand(String table, ByteBuffer key, QueryPath path, SliceQueryFilter filter)
+    public SliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter)
     {
-        super(table, key, path, CMD_TYPE_GET_SLICE);
+        super(table, key, cfName, Type.GET_SLICES);
         this.filter = filter;
     }
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new SliceFromReadCommand(table, key, queryPath, filter);
+        ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, filter);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }
@@ -67,7 +63,7 @@ public class SliceFromReadCommand extends ReadCommand
     public Row getRow(Table table)
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(new QueryFilter(dk, queryPath, filter));
+        return table.getRow(new QueryFilter(dk, cfName, filter));
     }
 
     @Override
@@ -91,7 +87,7 @@ public class SliceFromReadCommand extends ReadCommand
             // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
             int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
             SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
-            return new RetriedSliceFromReadCommand(table, key, queryPath, newFilter, getOriginalRequestedCount());
+            return new RetriedSliceFromReadCommand(table, key, cfName, newFilter, getOriginalRequestedCount());
         }
 
         return null;
@@ -127,7 +123,7 @@ public class SliceFromReadCommand extends ReadCommand
         return "SliceFromReadCommand(" +
                "table='" + table + '\'' +
                ", key='" + ByteBufferUtil.bytesToHex(key) + '\'' +
-               ", column_parent='" + queryPath + '\'' +
+               ", cfName='" + cfName + '\'' +
                ", filter='" + filter + '\'' +
                ')';
     }
@@ -137,11 +133,21 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
 {
     public void serialize(ReadCommand rm, DataOutput dos, int version) throws IOException
     {
+        serialize(rm, null, dos, version);
+    }
+
+    public void serialize(ReadCommand rm, ByteBuffer superColumn, DataOutput dos, int version) throws IOException
+    {
         SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
         dos.writeBoolean(realRM.isDigestQuery());
         dos.writeUTF(realRM.table);
         ByteBufferUtil.writeWithShortLength(realRM.key, dos);
-        realRM.queryPath.serialize(dos);
+
+        if (version < MessagingService.VERSION_20)
+            new QueryPath(realRM.cfName, superColumn).serialize(dos);
+        else
+            dos.writeUTF(realRM.cfName);
+
         SliceQueryFilter.serializer.serialize(realRM.filter, dos, version);
     }
 
@@ -150,15 +156,46 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         boolean isDigest = dis.readBoolean();
         String table = dis.readUTF();
         ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
-        QueryPath path = QueryPath.deserialize(dis);
-        SliceQueryFilter filter = SliceQueryFilter.serializer.deserialize(dis, version);
-        SliceFromReadCommand rm = new SliceFromReadCommand(table, key, path, filter);
-        rm.setDigestQuery(isDigest);
-        return rm;
+
+        String cfName;
+        ByteBuffer sc = null;
+        if (version < MessagingService.VERSION_20)
+        {
+            QueryPath path = QueryPath.deserialize(dis);
+            cfName = path.columnFamilyName;
+            sc = path.superColumnName;
+        }
+        else
+        {
+            cfName = dis.readUTF();
+        }
+
+        CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
+        SliceQueryFilter filter;
+        if (version < MessagingService.VERSION_20)
+        {
+            filter = SliceQueryFilter.serializer.deserialize(dis, version);
+
+            if (metadata.cfType == ColumnFamilyType.Super)
+                filter = SuperColumns.fromSCSliceFilter((CompositeType)metadata.comparator, sc, filter);
+        }
+        else
+        {
+            filter = SliceQueryFilter.serializer.deserialize(dis, version);
+        }
+
+        ReadCommand command = new SliceFromReadCommand(table, key, cfName, filter);
+        command.setDigestQuery(isDigest);
+        return command;
     }
 
     public long serializedSize(ReadCommand cmd, int version)
     {
+        return serializedSize(cmd, null, version);
+    }
+
+    public long serializedSize(ReadCommand cmd, ByteBuffer superColumn, int version)
+    {
         TypeSizes sizes = TypeSizes.NATIVE;
         SliceFromReadCommand command = (SliceFromReadCommand) cmd;
         int keySize = command.key.remaining();
@@ -166,7 +203,16 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         int size = sizes.sizeof(cmd.isDigestQuery()); // boolean
         size += sizes.sizeof(command.table);
         size += sizes.sizeof((short) keySize) + keySize;
-        size += command.queryPath.serializedSize(sizes);
+
+        if (version < MessagingService.VERSION_20)
+        {
+            size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
+        }
+        else
+        {
+            size += sizes.sizeof(command.cfName);
+        }
+
         size += SliceQueryFilter.serializer.serializedSize(command.filter, version);
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
index cd555a5..551b50d 100644
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -51,9 +51,8 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
         if (exhausted)
             return null;
 
-        QueryPath path = new QueryPath(cfs.name);
         SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
-        QueryFilter filter = new QueryFilter(key, path, sliceFilter);
+        QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter);
         ColumnFamily cf = cfs.getColumnFamily(filter);
         if (cf == null || sliceFilter.getLiveCount(cf) < DEFAULT_PAGE_SIZE)
         {
@@ -61,8 +60,8 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
         }
         else
         {
-            Iterator<IColumn> iter = cf.getReverseSortedColumns().iterator();
-            IColumn lastColumn = iter.next();
+            Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
+            Column lastColumn = iter.next();
             while (lastColumn.isMarkedForDelete())
                 lastColumn = iter.next();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SuperColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumn.java b/src/java/org/apache/cassandra/db/SuperColumn.java
deleted file mode 100644
index 57e87c4..0000000
--- a/src/java/org/apache/cassandra/db/SuperColumn.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.Collection;
-import java.util.Comparator;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.IColumnSerializer;
-import org.apache.cassandra.io.util.ColumnSortedMap;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-public class SuperColumn extends AbstractColumnContainer implements IColumn
-{
-    private static final NonBlockingHashMap<Comparator, SuperColumnSerializer> serializers = new NonBlockingHashMap<Comparator, SuperColumnSerializer>();
-    public static SuperColumnSerializer serializer(AbstractType<?> comparator)
-    {
-        SuperColumnSerializer serializer = serializers.get(comparator);
-        if (serializer == null)
-        {
-            serializer = new SuperColumnSerializer(comparator);
-            serializers.put(comparator, serializer);
-        }
-        return serializer;
-    }
-
-    public static OnDiskAtom.Serializer onDiskSerializer(AbstractType<?> comparator)
-    {
-        return new OnDiskAtom.Serializer(serializer(comparator));
-    }
-
-    private final ByteBuffer name;
-
-    public SuperColumn(ByteBuffer name, AbstractType<?> comparator)
-    {
-        this(name, AtomicSortedColumns.factory().create(comparator, false));
-    }
-
-    SuperColumn(ByteBuffer name, ISortedColumns columns)
-    {
-        super(columns);
-        assert name != null;
-        assert name.remaining() <= IColumn.MAX_NAME_LENGTH;
-        this.name = name;
-    }
-
-    public SuperColumn cloneMeShallow()
-    {
-        SuperColumn sc = new SuperColumn(name, getComparator());
-        sc.delete(this);
-        return sc;
-    }
-
-    public IColumn cloneMe()
-    {
-        SuperColumn sc = new SuperColumn(name, columns.cloneMe());
-        sc.delete(this);
-        return sc;
-    }
-
-    public ByteBuffer name()
-    {
-        return name;
-    }
-
-    public Collection<IColumn> getSubColumns()
-    {
-        return getSortedColumns();
-    }
-
-    public IColumn getSubColumn(ByteBuffer columnName)
-    {
-        IColumn column = columns.getColumn(columnName);
-        assert column == null || column instanceof Column;
-        return column;
-    }
-
-    /**
-     * This calculates the exact size of the sub columns on the fly
-     */
-    public int dataSize()
-    {
-        int size = TypeSizes.NATIVE.sizeof(getMarkedForDeleteAt());
-        for (IColumn subColumn : getSubColumns())
-            size += subColumn.dataSize();
-        return size;
-    }
-
-    /**
-     * This returns the size of the super-column when serialized.
-     * @see org.apache.cassandra.db.IColumn#serializedSize(TypeSizes)
-    */
-    public int serializedSize(TypeSizes typeSizes)
-    {
-        /*
-         * We need to keep the way we are calculating the column size in sync with the
-         * way we are calculating the size for the column family serializer.
-         *
-         * 2 bytes for name size
-         * n bytes for the name
-         * 4 bytes for getLocalDeletionTime
-         * 8 bytes for getMarkedForDeleteAt
-         * 4 bytes for the subcolumns size
-         * size(constantSize) of subcolumns.
-         */
-        int nameSize = name.remaining();
-        int subColumnsSize = 0;
-        for (IColumn subColumn : getSubColumns())
-            subColumnsSize += subColumn.serializedSize(typeSizes);
-        int size = typeSizes.sizeof((short) nameSize) + nameSize
-                 + typeSizes.sizeof(getLocalDeletionTime())
-                 + typeSizes.sizeof(getMarkedForDeleteAt())
-                 + typeSizes.sizeof(subColumnsSize) + subColumnsSize;
-        return size;
-    }
-
-    public long serializedSizeForSSTable()
-    {
-        return serializedSize(TypeSizes.NATIVE);
-    }
-
-    public long timestamp()
-    {
-        throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
-    }
-
-    public long minTimestamp()
-    {
-        long minTimestamp = getMarkedForDeleteAt();
-        for (IColumn subColumn : getSubColumns())
-            minTimestamp = Math.min(minTimestamp, subColumn.minTimestamp());
-        return minTimestamp;
-    }
-
-    public long maxTimestamp()
-    {
-        long maxTimestamp = getMarkedForDeleteAt();
-        for (IColumn subColumn : getSubColumns())
-            maxTimestamp = Math.max(maxTimestamp, subColumn.maxTimestamp());
-        return maxTimestamp;
-    }
-
-    public long mostRecentLiveChangeAt()
-    {
-        long max = Long.MIN_VALUE;
-        for (IColumn column : getSubColumns())
-        {
-            if (!column.isMarkedForDelete() && column.timestamp() > max)
-            {
-                max = column.timestamp();
-            }
-        }
-        return max;
-    }
-
-    public long getMarkedForDeleteAt()
-    {
-        return deletionInfo().getTopLevelDeletion().markedForDeleteAt;
-    }
-
-    public int getLocalDeletionTime()
-    {
-        return deletionInfo().getTopLevelDeletion().localDeletionTime;
-    }
-
-    public long mostRecentNonGCableChangeAt(int gcbefore)
-    {
-        long max = Long.MIN_VALUE;
-        for (IColumn column : getSubColumns())
-        {
-            if (column.getLocalDeletionTime() >= gcbefore && column.timestamp() > max)
-            {
-                max = column.timestamp();
-            }
-        }
-        return max;
-    }
-
-    public ByteBuffer value()
-    {
-        throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
-    }
-
-    @Override
-    public void addColumn(IColumn column, Allocator allocator)
-    {
-        assert column instanceof Column : "A super column can only contain simple columns";
-        super.addColumn(column, allocator);
-    }
-
-    /*
-     * Go through each sub column if it exists then as it to resolve itself
-     * if the column does not exist then create it.
-     */
-    void putColumn(SuperColumn column, Allocator allocator)
-    {
-        for (IColumn subColumn : column.getSubColumns())
-        {
-            addColumn(subColumn, allocator);
-        }
-        delete(column);
-    }
-
-    public IColumn diff(IColumn columnNew)
-    {
-        IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator());
-        ((SuperColumn)columnDiff).delete(((SuperColumn)columnNew).deletionInfo());
-
-        // (don't need to worry about columnNew containing subColumns that are shadowed by
-        // the delete tombstone, since columnNew was generated by CF.resolve, which
-        // takes care of those for us.)
-        for (IColumn subColumn : columnNew.getSubColumns())
-        {
-            IColumn columnInternal = columns.getColumn(subColumn.name());
-            if(columnInternal == null )
-            {
-                columnDiff.addColumn(subColumn);
-            }
-            else
-            {
-                IColumn subColumnDiff = columnInternal.diff(subColumn);
-                if(subColumnDiff != null)
-                {
-                    columnDiff.addColumn(subColumnDiff);
-                }
-            }
-        }
-
-        if (!columnDiff.getSubColumns().isEmpty() || columnNew.isMarkedForDelete())
-            return columnDiff;
-        else
-            return null;
-    }
-
-    public void updateDigest(MessageDigest digest)
-    {
-        assert name != null;
-        digest.update(name.duplicate());
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            buffer.writeLong(getMarkedForDeleteAt());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        digest.update(buffer.getData(), 0, buffer.getLength());
-        for (IColumn column : getSubColumns())
-        {
-            column.updateDigest(digest);
-        }
-    }
-
-    public String getString(AbstractType<?> comparator)
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("SuperColumn(");
-        sb.append(comparator.getString(name));
-
-        if (isMarkedForDelete()) {
-            sb.append(" -delete at ").append(getMarkedForDeleteAt()).append("-");
-        }
-
-        sb.append(" [");
-        sb.append(getComparator().getColumnsString(getSubColumns()));
-        sb.append("])");
-
-        return sb.toString();
-    }
-
-    public boolean isLive()
-    {
-        return mostRecentLiveChangeAt() > getMarkedForDeleteAt();
-    }
-
-    public IColumn localCopy(ColumnFamilyStore cfs)
-    {
-        return localCopy(cfs, HeapAllocator.instance);
-    }
-
-    public IColumn localCopy(ColumnFamilyStore cfs, Allocator allocator)
-    {
-        // we don't try to intern supercolumn names, because if we're using Cassandra correctly it's almost
-        // certainly just going to pollute our interning map with unique, dynamic values
-        SuperColumn sc = new SuperColumn(allocator.clone(name), this.getComparator());
-        sc.delete(this);
-
-        for(IColumn c : columns)
-        {
-            sc.addColumn(c.localCopy(cfs, allocator));
-        }
-
-        return sc;
-    }
-
-    public IColumn reconcile(IColumn c)
-    {
-        return reconcile(null, null);
-    }
-
-    public IColumn reconcile(IColumn c, Allocator allocator)
-    {
-        throw new UnsupportedOperationException("This operation is unsupported on super columns.");
-    }
-
-    public int serializationFlags()
-    {
-        throw new UnsupportedOperationException("Super columns don't have a serialization mask");
-    }
-
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        metadata.comparator.validate(name());
-        for (IColumn column : getSubColumns())
-        {
-            column.validateFields(metadata);
-        }
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        SuperColumn sc = (SuperColumn)o;
-
-        if (!name.equals(sc.name))
-            return false;
-        if (getMarkedForDeleteAt() != sc.getMarkedForDeleteAt())
-            return false;
-        if (getLocalDeletionTime() != sc.getLocalDeletionTime())
-            return false;
-        return Iterables.elementsEqual(columns, sc.columns);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hashCode(name, getMarkedForDeleteAt(), getLocalDeletionTime(), columns);
-    }
-}
-
-class SuperColumnSerializer implements IColumnSerializer
-{
-    private final AbstractType<?> comparator;
-
-    public SuperColumnSerializer(AbstractType<?> comparator)
-    {
-        this.comparator = comparator;
-    }
-
-    public AbstractType<?> getComparator()
-    {
-        return comparator;
-    }
-
-    public void serialize(IColumn column, DataOutput dos) throws IOException
-    {
-        SuperColumn superColumn = (SuperColumn)column;
-        ByteBufferUtil.writeWithShortLength(superColumn.name(), dos);
-        DeletionInfo.serializer().serialize(superColumn.deletionInfo(), dos, MessagingService.VERSION_10);
-        Collection<IColumn> columns = superColumn.getSubColumns();
-        dos.writeInt(columns.size());
-        for (IColumn subColumn : columns)
-        {
-            Column.serializer().serialize(subColumn, dos);
-        }
-    }
-
-    public IColumn deserialize(DataInput dis) throws IOException
-    {
-        return deserialize(dis, IColumnSerializer.Flag.LOCAL);
-    }
-
-    public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
-    {
-        return deserialize(dis, flag, (int)(System.currentTimeMillis() / 1000));
-    }
-
-    public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
-    {
-        ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
-        DeletionInfo delInfo = DeletionInfo.serializer().deserialize(dis, MessagingService.VERSION_10, comparator);
-
-        /* read the number of columns */
-        int size = dis.readInt();
-        ColumnSerializer serializer = Column.serializer();
-        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, flag, expireBefore);
-        SuperColumn superColumn = new SuperColumn(name, AtomicSortedColumns.factory().fromSorted(preSortedMap, false));
-        superColumn.delete(delInfo);
-        return superColumn;
-    }
-
-    public long serializedSize(IColumn object, TypeSizes typeSizes)
-    {
-        return object.serializedSize(typeSizes);
-    }
-}