You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/02 13:41:16 UTC

[6/9] Replace supercolumns internally by composites

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
new file mode 100644
index 0000000..c45f9f4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SuperColumns
+{
+    public static Iterator<OnDiskAtom> onDiskIterator(DataInput dis, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore)
+    {
+        return new SCIterator(dis, superColumnCount, flag, expireBefore);
+    }
+
+    public static void serializeSuperColumnFamily(ColumnFamily scf, DataOutput dos, int version) throws IOException
+    {
+        /*
+         * There is 2 complications:
+         *   1) We need to know the number of super columns in the column
+         *   family to write in the header (so we do a first pass to group
+         *   columns before serializing).
+         *   2) For deletion infos, we need to figure out which are top-level
+         *   deletions and which are super columns deletions (i.e. the
+         *   subcolumns range deletions).
+         */
+        DeletionInfo delInfo = scf.deletionInfo();
+        Map<ByteBuffer, List<Column>> scMap = groupSuperColumns(scf);
+
+        // Actually Serialize
+        DeletionInfo.serializer().serialize(new DeletionInfo(delInfo.getTopLevelDeletion()), dos, version);
+        dos.writeInt(scMap.size());
+
+        for (Map.Entry<ByteBuffer, List<Column>> entry : scMap.entrySet())
+        {
+            ByteBufferUtil.writeWithShortLength(entry.getKey(), dos);
+
+            List<DeletionTime> delTimes = delInfo.rangeCovering(entry.getKey());
+            assert delTimes.size() <= 1; // We're supposed to have either no deletion, or a full SC deletion.
+            DeletionInfo scDelInfo = delTimes.isEmpty() ? DeletionInfo.LIVE : new DeletionInfo(delTimes.get(0));
+            DeletionInfo.serializer().serialize(scDelInfo, dos, MessagingService.VERSION_10);
+
+            dos.writeInt(entry.getValue().size());
+            for (Column subColumn : entry.getValue())
+                Column.serializer().serialize(subColumn, dos);
+        }
+    }
+
+    private static Map<ByteBuffer, List<Column>> groupSuperColumns(ColumnFamily scf)
+    {
+        CompositeType type = (CompositeType)scf.getComparator();
+        // The order of insertion matters!
+        Map<ByteBuffer, List<Column>> scMap = new LinkedHashMap<ByteBuffer, List<Column>>();
+
+        ByteBuffer scName = null;
+        List<Column> subColumns = null;
+        for (Column column : scf)
+        {
+            ByteBuffer newScName = scName(column.name());
+            ByteBuffer newSubName = subName(column.name());
+
+            if (scName == null || type.types.get(0).compare(scName, newScName) != 0)
+            {
+                // new super column
+                scName = newScName;
+                subColumns = new ArrayList<Column>();
+                scMap.put(scName, subColumns);
+            }
+
+            subColumns.add(((Column)column).withUpdatedName(newSubName));
+        }
+        return scMap;
+    }
+
+    public static void deserializerSuperColumnFamily(DataInput dis, ColumnFamily cf, ColumnSerializer.Flag flag, int expireBefore, int version) throws IOException
+    {
+        // Note that there was no way to insert a range tombstone in a SCF in 1.2
+        cf.delete(DeletionInfo.serializer().deserialize(dis, version, cf.getComparator()));
+        assert !cf.deletionInfo().rangeIterator().hasNext();
+
+        Iterator<OnDiskAtom> iter = onDiskIterator(dis, dis.readInt(), flag, expireBefore);
+        while (iter.hasNext())
+            cf.addAtom(iter.next());
+    }
+
+    public static long serializedSize(ColumnFamily scf, TypeSizes typeSizes, int version)
+    {
+        Map<ByteBuffer, List<Column>> scMap = groupSuperColumns(scf);
+        DeletionInfo delInfo = scf.deletionInfo();
+
+        // Actually Serialize
+        long size = DeletionInfo.serializer().serializedSize(new DeletionInfo(delInfo.getTopLevelDeletion()), version);
+        for (Map.Entry<ByteBuffer, List<Column>> entry : scMap.entrySet())
+        {
+            int nameSize = entry.getKey().remaining();
+            size += typeSizes.sizeof((short) nameSize) + nameSize;
+
+            List<DeletionTime> delTimes = delInfo.rangeCovering(entry.getKey());
+            assert delTimes.size() <= 1; // We're supposed to have either no deletion, or a full SC deletion.
+            DeletionInfo scDelInfo = delTimes.isEmpty() ? DeletionInfo.LIVE : new DeletionInfo(delTimes.get(0));
+            size += DeletionInfo.serializer().serializedSize(scDelInfo, MessagingService.VERSION_10);
+
+            size += typeSizes.sizeof(entry.getValue().size());
+            for (Column subColumn : entry.getValue())
+                size += Column.serializer().serializedSize(subColumn, typeSizes);
+        }
+        return size;
+    }
+
+    private static class SCIterator implements Iterator<OnDiskAtom>
+    {
+        private final DataInput dis;
+        private final int scCount;
+
+        private final ColumnSerializer.Flag flag;
+        private final int expireBefore;
+
+        private int read;
+        private ByteBuffer scName;
+        private Iterator<Column> subColumnsIterator;
+
+        private SCIterator(DataInput dis, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore)
+        {
+            this.dis = dis;
+            this.scCount = superColumnCount;
+            this.flag = flag;
+            this.expireBefore = expireBefore;
+        }
+
+        public boolean hasNext()
+        {
+            return (subColumnsIterator != null && subColumnsIterator.hasNext()) || read < scCount;
+        }
+
+        public OnDiskAtom next()
+        {
+            try
+            {
+                if (subColumnsIterator != null && subColumnsIterator.hasNext())
+                {
+                    Column c = subColumnsIterator.next();
+                    return c.withUpdatedName(CompositeType.build(scName, c.name()));
+                }
+
+                // Read one more super column
+                ++read;
+
+                scName = ByteBufferUtil.readWithShortLength(dis);
+                DeletionInfo delInfo = DeletionInfo.serializer().deserialize(dis, MessagingService.VERSION_10, null);
+                assert !delInfo.rangeIterator().hasNext(); // We assume no range tombstone (there was no way to insert some in a SCF in 1.2)
+
+                /* read the number of columns */
+                int size = dis.readInt();
+                List<Column> subColumns = new ArrayList<Column>(size);
+
+                for (int i = 0; i < size; ++i)
+                    subColumns.add(Column.serializer().deserialize(dis, flag, expireBefore));
+
+                subColumnsIterator = subColumns.iterator();
+
+                // If the SC was deleted, return that first, otherwise return the first subcolumn
+                DeletionTime dtime = delInfo.getTopLevelDeletion();
+                if (!dtime.equals(DeletionTime.LIVE))
+                    return new RangeTombstone(startOf(scName), endOf(scName), dtime);
+
+                return next();
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public static AbstractType<?> getComparatorFor(CFMetaData metadata, ByteBuffer superColumn)
+    {
+        return getComparatorFor(metadata, superColumn != null);
+    }
+
+    public static AbstractType<?> getComparatorFor(CFMetaData metadata, boolean subColumn)
+    {
+        return metadata.isSuper()
+             ? ((CompositeType)metadata.comparator).types.get(subColumn ? 1 : 0)
+             : metadata.comparator;
+    }
+
+    // Extract the first component of a columnName, i.e. the super column name
+    public static ByteBuffer scName(ByteBuffer columnName)
+    {
+        return CompositeType.extractComponent(columnName, 0);
+    }
+
+    // Extract the 2nd component of a columnName, i.e. the sub-column name
+    public static ByteBuffer subName(ByteBuffer columnName)
+    {
+        return CompositeType.extractComponent(columnName, 1);
+    }
+
+    // We don't use CompositeType.Builder mostly because we want to avoid having to provide the comparator.
+    public static ByteBuffer startOf(ByteBuffer scName)
+    {
+        int length = scName.remaining();
+        ByteBuffer bb = ByteBuffer.allocate(2 + length + 1);
+
+        bb.put((byte) ((length >> 8) & 0xFF));
+        bb.put((byte) (length & 0xFF));
+        bb.put(scName.duplicate());
+        bb.put((byte) 0);
+        bb.flip();
+        return bb;
+    }
+
+    public static ByteBuffer endOf(ByteBuffer scName)
+    {
+        ByteBuffer bb = startOf(scName);
+        bb.put(bb.remaining() - 1, (byte)1);
+        return bb;
+    }
+
+    public static SCFilter filterToSC(CompositeType type, IDiskAtomFilter filter)
+    {
+        if (filter instanceof NamesQueryFilter)
+            return namesFilterToSC(type, (NamesQueryFilter)filter);
+        else
+            return sliceFilterToSC(type, (SliceQueryFilter)filter);
+    }
+
+    public static SCFilter namesFilterToSC(CompositeType type, NamesQueryFilter filter)
+    {
+        ByteBuffer scName = null;
+        SortedSet<ByteBuffer> newColumns = new TreeSet<ByteBuffer>(filter.columns.comparator());
+        for (ByteBuffer name : filter.columns)
+        {
+            ByteBuffer newScName = scName(name);
+
+            if (scName == null)
+            {
+                scName = newScName;
+            }
+            else if (type.types.get(0).compare(scName, newScName) != 0)
+            {
+                // If we're selecting column across multiple SC, it's not something we can translate for an old node
+                throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
+            }
+
+            newColumns.add(subName(name));
+        }
+        return new SCFilter(scName, new NamesQueryFilter(newColumns));
+    }
+
+    public static SCFilter sliceFilterToSC(CompositeType type, SliceQueryFilter filter)
+    {
+        /*
+         * There is 3 main cases that we can translate back into super column
+         * queries:
+         *   1) We have only one slice where the first component of start and
+         *   finish is the same, we translate as a slice query on one SC.
+         *   2) We have only one slice, neither the start and finish have a 2nd
+         *   component, and end has the 'end of component' set, we translate
+         *   as a slice of SCs.
+         *   3) Each slice has the same first component for start and finish, no
+         *   2nd component and each finish has the 'end of component' set, we
+         *   translate as a names query of SCs (the filter must then not be reversed).
+         * Otherwise, we can't do much.
+         */
+
+        boolean reversed = filter.reversed;
+        if (filter.slices.length == 1)
+        {
+            ByteBuffer start = filter.slices[0].start;
+            ByteBuffer finish = filter.slices[0].start;
+
+            if (filter.compositesToGroup == 1)
+            {
+                // Note: all the resulting filter must have compositeToGroup == 0 because this
+                // make no sense for super column on the destination node otherwise
+                if (start.remaining() == 0)
+                {
+                    if (finish.remaining() == 0)
+                        // An 'IdentityFilter', keep as is (except for the compositeToGroup)
+                        return new SCFilter(null, new SliceQueryFilter(filter.start(), filter.finish(), reversed, filter.count));
+
+                    if (subName(finish) == null
+                            && ((!reversed && !firstEndOfComponent(finish)) || (reversed && firstEndOfComponent(finish))))
+                        return new SCFilter(null, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, scName(finish), reversed, filter.count));
+                }
+                else if (finish.remaining() == 0)
+                {
+                    if (subName(start) == null
+                            && ((!reversed && firstEndOfComponent(start)) || (reversed && !firstEndOfComponent(start))))
+                        return new SCFilter(null, new SliceQueryFilter(scName(start), ByteBufferUtil.EMPTY_BYTE_BUFFER, reversed, filter.count));
+                }
+                else if (subName(start) == null && subName(finish) == null
+                        && ((   reversed && !firstEndOfComponent(start) &&  firstEndOfComponent(finish))
+                            || (!reversed &&  firstEndOfComponent(start) && !firstEndOfComponent(finish))))
+                {
+                    // A slice of supercolumns
+                    return new SCFilter(null, new SliceQueryFilter(scName(start), scName(finish), reversed, filter.count));
+                }
+            }
+            else if (filter.compositesToGroup == 0 && type.types.get(0).compare(scName(start), scName(finish)) == 0)
+            {
+                // A slice of subcolumns
+                return new SCFilter(scName(start), filter.withUpdatedSlice(subName(start), subName(finish)));
+            }
+        }
+        else if (!reversed)
+        {
+            SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(type.types.get(0));
+            for (int i = 0; i < filter.slices.length; ++i)
+            {
+                ByteBuffer start = filter.slices[i].start;
+                ByteBuffer finish = filter.slices[i].finish;
+
+                if (subName(start) != null || subName(finish) != null
+                  || type.types.get(0).compare(scName(start), scName(finish)) != 0
+                  || firstEndOfComponent(start) || !firstEndOfComponent(finish))
+                    throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
+
+                columns.add(scName(start));
+            }
+            return new SCFilter(null, new NamesQueryFilter(columns));
+        }
+        throw new RuntimeException("Cannot convert filter to old super column format. Update all nodes to Cassandra 2.0 first.");
+    }
+
+    public static IDiskAtomFilter fromSCFilter(CompositeType type, ByteBuffer scName, IDiskAtomFilter filter)
+    {
+        if (filter instanceof NamesQueryFilter)
+            return fromSCNamesFilter(type, scName, (NamesQueryFilter)filter);
+        else
+            return fromSCSliceFilter(type, scName, (SliceQueryFilter)filter);
+    }
+
+    public static IDiskAtomFilter fromSCNamesFilter(CompositeType type, ByteBuffer scName, NamesQueryFilter filter)
+    {
+        if (scName == null)
+        {
+            ColumnSlice[] slices = new ColumnSlice[filter.columns.size()];
+            int i = 0;
+            for (ByteBuffer bb : filter.columns)
+            {
+                CompositeType.Builder builder = type.builder().add(bb);
+                slices[i++] = new ColumnSlice(builder.build(), builder.buildAsEndOfRange());
+            }
+            return new SliceQueryFilter(slices, false, slices.length, 1, 1);
+        }
+        else
+        {
+            SortedSet<ByteBuffer> newColumns = new TreeSet<ByteBuffer>(type);
+            for (ByteBuffer c : filter.columns)
+                newColumns.add(CompositeType.build(scName, c));
+            return filter.withUpdatedColumns(newColumns);
+        }
+    }
+
+    public static SliceQueryFilter fromSCSliceFilter(CompositeType type, ByteBuffer scName, SliceQueryFilter filter)
+    {
+        assert filter.slices.length == 1;
+        if (scName == null)
+        {
+            ByteBuffer start = filter.start().remaining() == 0
+                             ? filter.start()
+                             : (filter.reversed ? type.builder().add(filter.start()).buildAsEndOfRange()
+                                                : type.builder().add(filter.start()).build());
+            ByteBuffer finish = filter.finish().remaining() == 0
+                              ? filter.finish()
+                              : (filter.reversed ? type.builder().add(filter.finish()).build()
+                                                 : type.builder().add(filter.finish()).buildAsEndOfRange());
+            return new SliceQueryFilter(start, finish, filter.reversed, filter.count, 1);
+        }
+        else
+        {
+            CompositeType.Builder builder = type.builder().add(scName);
+            ByteBuffer start = filter.start().remaining() == 0
+                             ? filter.reversed ? builder.buildAsEndOfRange() : builder.build()
+                             : builder.copy().add(filter.start()).build();
+            ByteBuffer end = filter.finish().remaining() == 0
+                             ? filter.reversed ? builder.build() : builder.buildAsEndOfRange()
+                             : builder.add(filter.finish()).build();
+            return new SliceQueryFilter(start, end, filter.reversed, filter.count);
+        }
+    }
+
+    private static boolean firstEndOfComponent(ByteBuffer bb)
+    {
+        bb = bb.duplicate();
+        int length = (bb.get() & 0xFF) << 8;
+        length |= (bb.get() & 0xFF);
+
+        return bb.get(length + 2) == 1;
+    }
+
+    public static class SCFilter
+    {
+        public final ByteBuffer scName;
+        public final IDiskAtomFilter updatedFilter;
+
+        public SCFilter(ByteBuffer scName, IDiskAtomFilter updatedFilter)
+        {
+            this.scName = scName;
+            this.updatedFilter = updatedFilter;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 8f60904..940300c 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -43,7 +43,6 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -156,9 +155,9 @@ public class SystemTable
             SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance);
             cols.add(ByteBufferUtil.bytes("ClusterName"));
             cols.add(ByteBufferUtil.bytes("Token"));
-            QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes("L")), new QueryPath(OLD_STATUS_CF), cols);
+            QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes("L")), OLD_STATUS_CF, cols);
             ColumnFamily oldCf = oldStatusCfs.getColumnFamily(filter);
-            Iterator<IColumn> oldColumns = oldCf.columns.iterator();
+            Iterator<Column> oldColumns = oldCf.columns.iterator();
 
             String clusterName = null;
             try
@@ -514,7 +513,7 @@ public class SystemTable
     {
         ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
         QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(table)),
-                                                        new QueryPath(INDEX_CF),
+                                                        INDEX_CF,
                                                         ByteBufferUtil.bytes(indexName));
         return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
     }
@@ -532,7 +531,7 @@ public class SystemTable
     public static void setIndexRemoved(String table, String indexName)
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, ByteBufferUtil.bytes(table));
-        rm.delete(new QueryPath(INDEX_CF, null, ByteBufferUtil.bytes(indexName)), FBUtilities.timestampMicros());
+        rm.delete(INDEX_CF, ByteBufferUtil.bytes(indexName), FBUtilities.timestampMicros());
         rm.apply();
         forceBlockingFlush(INDEX_CF);
     }
@@ -573,7 +572,7 @@ public class SystemTable
 
         // Get the last CounterId (since CounterId are timeuuid is thus ordered from the older to the newer one)
         QueryFilter filter = QueryFilter.getSliceFilter(decorate(ALL_LOCAL_NODE_ID_KEY),
-                                                        new QueryPath(COUNTER_ID_CF),
+                                                        COUNTER_ID_CF,
                                                         ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                         ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                         true,
@@ -611,11 +610,11 @@ public class SystemTable
         List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
 
         Table table = Table.open(Table.SYSTEM_KS);
-        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), new QueryPath(COUNTER_ID_CF));
+        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF);
         ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
 
         CounterId previous = null;
-        for (IColumn c : cf)
+        for (Column c : cf)
         {
             if (previous != null)
                 l.add(new CounterId.CounterIdRecord(previous, c.timestamp()));
@@ -655,8 +654,7 @@ public class SystemTable
     {
         Token minToken = StorageService.getPartitioner().getMinimumToken();
 
-        return schemaCFS(schemaCfName).getRangeSlice(null,
-                                                     new Range<RowPosition>(minToken.minKeyBound(),
+        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(),
                                                                             minToken.maxKeyBound()),
                                                      Integer.MAX_VALUE,
                                                      new IdentityQueryFilter(),
@@ -713,7 +711,7 @@ public class SystemTable
         DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 
         ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_KEYSPACES_CF);
-        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(SCHEMA_KEYSPACES_CF)));
+        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF));
 
         return new Row(key, result);
     }
@@ -724,7 +722,6 @@ public class SystemTable
 
         ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_COLUMNFAMILIES_CF);
         ColumnFamily result = schemaCFS.getColumnFamily(key,
-                                                        new QueryPath(SCHEMA_COLUMNFAMILIES_CF),
                                                         DefsTable.searchComposite(cfName, true),
                                                         DefsTable.searchComposite(cfName, false),
                                                         false,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 786ed43..361e3b2 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -425,7 +425,7 @@ public class Table
                 {
                     ColumnFamily cf = pager.next();
                     ColumnFamily cf2 = cf.cloneMeShallow();
-                    for (IColumn column : cf)
+                    for (Column column : cf)
                     {
                         if (cfs.indexManager.indexes(column.name(), indexes))
                             cf2.addColumn(column);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
index 6aa19f0..776e543 100644
--- a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.Allocator;
 
 public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns implements ISortedColumns
 {
-    private final ConcurrentSkipListMap<ByteBuffer, IColumn> map;
+    private final ConcurrentSkipListMap<ByteBuffer, Column> map;
 
     public static final ISortedColumns.Factory factory = new Factory()
     {
@@ -42,7 +42,7 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
             return new ThreadSafeSortedColumns(comparator);
         }
 
-        public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed)
+        public ISortedColumns fromSorted(SortedMap<ByteBuffer, Column> sortedMap, boolean insertReversed)
         {
             return new ThreadSafeSortedColumns(sortedMap);
         }
@@ -60,12 +60,12 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
 
     private ThreadSafeSortedColumns(AbstractType<?> comparator)
     {
-        this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(comparator);
+        this.map = new ConcurrentSkipListMap<ByteBuffer, Column>(comparator);
     }
 
-    private ThreadSafeSortedColumns(SortedMap<ByteBuffer, IColumn> columns)
+    private ThreadSafeSortedColumns(SortedMap<ByteBuffer, Column> columns)
     {
-        this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns);
+        this.map = new ConcurrentSkipListMap<ByteBuffer, Column>(columns);
     }
 
     public ISortedColumns.Factory getFactory()
@@ -87,59 +87,49 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
      * If we find an old column that has the same name
      * the ask it to resolve itself else add the new column
     */
-    public void addColumn(IColumn column, Allocator allocator)
+    public void addColumn(Column column, Allocator allocator)
     {
         addColumnInternal(column, allocator);
     }
 
-    private long addColumnInternal(IColumn column, Allocator allocator)
+    private long addColumnInternal(Column column, Allocator allocator)
     {
         ByteBuffer name = column.name();
         while (true)
         {
-            IColumn oldColumn = map.putIfAbsent(name, column);
+            Column oldColumn = map.putIfAbsent(name, column);
             if (oldColumn == null)
                 return column.dataSize();
 
-            if (oldColumn instanceof SuperColumn)
-            {
-                assert column instanceof SuperColumn;
-                long previousSize = oldColumn.dataSize();
-                ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
-                return oldColumn.dataSize() - previousSize;
-            }
-            else
-            {
-                // calculate reconciled col from old (existing) col and new col
-                IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
-                if (map.replace(name, oldColumn, reconciledColumn))
-                    return reconciledColumn.dataSize() - oldColumn.dataSize();
-
-                // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
-                // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
-            }
+            // calculate reconciled col from old (existing) col and new col
+            Column reconciledColumn = column.reconcile(oldColumn, allocator);
+            if (map.replace(name, oldColumn, reconciledColumn))
+                return reconciledColumn.dataSize() - oldColumn.dataSize();
+
+            // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+            // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
         }
     }
 
     /**
      * We need to go through each column in the column container and resolve it before adding
      */
-    public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    public void addAll(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation)
     {
         addAllWithSizeDelta(cm, allocator, transformation, null);
     }
 
     @Override
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer)
     {
         delete(cm.getDeletionInfo());
         long sizeDelta = 0;
-        for (IColumn column : cm.getSortedColumns())
+        for (Column column : cm.getSortedColumns())
             sizeDelta += addColumnInternal(transformation.apply(column), allocator);
         return sizeDelta;
     }
 
-    public boolean replace(IColumn oldColumn, IColumn newColumn)
+    public boolean replace(Column oldColumn, Column newColumn)
     {
         if (!oldColumn.name().equals(newColumn.name()))
             throw new IllegalArgumentException();
@@ -147,7 +137,7 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
         return map.replace(oldColumn.name(), oldColumn, newColumn);
     }
 
-    public IColumn getColumn(ByteBuffer name)
+    public Column getColumn(ByteBuffer name)
     {
         return map.get(name);
     }
@@ -167,12 +157,12 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
         return map.size();
     }
 
-    public Collection<IColumn> getSortedColumns()
+    public Collection<Column> getSortedColumns()
     {
         return map.values();
     }
 
-    public Collection<IColumn> getReverseSortedColumns()
+    public Collection<Column> getReverseSortedColumns()
     {
         return map.descendingMap().values();
     }
@@ -182,17 +172,17 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i
         return map.navigableKeySet();
     }
 
-    public Iterator<IColumn> iterator()
+    public Iterator<Column> iterator()
     {
         return map.values().iterator();
     }
 
-    public Iterator<IColumn> iterator(ColumnSlice[] slices)
+    public Iterator<Column> iterator(ColumnSlice[] slices)
     {
         return new ColumnSlice.NavigableMapIterator(map, slices);
     }
 
-    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
+    public Iterator<Column> reverseIterator(ColumnSlice[] slices)
     {
         return new ColumnSlice.NavigableMapIterator(map.descendingMap(), slices);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
index c4ec52f..20cbd90 100644
--- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.Allocator;
 
 public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumns implements ISortedColumns
 {
-    private final TreeMap<ByteBuffer, IColumn> map;
+    private final TreeMap<ByteBuffer, Column> map;
 
     public static final ISortedColumns.Factory factory = new Factory()
     {
@@ -42,7 +42,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
             return new TreeMapBackedSortedColumns(comparator);
         }
 
-        public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed)
+        public ISortedColumns fromSorted(SortedMap<ByteBuffer, Column> sortedMap, boolean insertReversed)
         {
             return new TreeMapBackedSortedColumns(sortedMap);
         }
@@ -60,12 +60,12 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
 
     private TreeMapBackedSortedColumns(AbstractType<?> comparator)
     {
-        this.map = new TreeMap<ByteBuffer, IColumn>(comparator);
+        this.map = new TreeMap<ByteBuffer, Column>(comparator);
     }
 
-    private TreeMapBackedSortedColumns(SortedMap<ByteBuffer, IColumn> columns)
+    private TreeMapBackedSortedColumns(SortedMap<ByteBuffer, Column> columns)
     {
-        this.map = new TreeMap<ByteBuffer, IColumn>(columns);
+        this.map = new TreeMap<ByteBuffer, Column>(columns);
     }
 
     public ISortedColumns.Factory getFactory()
@@ -83,7 +83,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         return false;
     }
 
-    public void addColumn(IColumn column, Allocator allocator)
+    public void addColumn(Column column, Allocator allocator)
     {
         addColumn(column, allocator, SecondaryIndexManager.nullUpdater);
     }
@@ -92,47 +92,33 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
      * If we find an old column that has the same name
      * the ask it to resolve itself else add the new column
     */
-    public long addColumn(IColumn column, Allocator allocator, SecondaryIndexManager.Updater indexer)
+    public long addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater indexer)
     {
         ByteBuffer name = column.name();
         // this is a slightly unusual way to structure this; a more natural way is shown in ThreadSafeSortedColumns,
         // but TreeMap lacks putAbsent.  Rather than split it into a "get, then put" check, we do it as follows,
         // which saves the extra "get" in the no-conflict case [for both normal and super columns],
         // in exchange for a re-put in the SuperColumn case.
-        IColumn oldColumn = map.put(name, column);
+        Column oldColumn = map.put(name, column);
         if (oldColumn == null)
             return column.dataSize();
 
-        if (oldColumn instanceof SuperColumn)
-        {
-            assert column instanceof SuperColumn;
-            long previousSize = oldColumn.dataSize();
-            // since oldColumn is where we've been accumulating results, it's usually going to be faster to
-            // add the new one to the old, then place old back in the Map, rather than copy the old contents
-            // into the new Map entry.
-            ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator);
-            map.put(name, oldColumn);
-            return oldColumn.dataSize() - previousSize;
-        }
+        // calculate reconciled col from old (existing) col and new col
+        Column reconciledColumn = column.reconcile(oldColumn, allocator);
+        map.put(name, reconciledColumn);
+        // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
+        // we need to make sure we update indexes no matter the order we merge
+        if (reconciledColumn == column)
+            indexer.update(oldColumn, reconciledColumn);
         else
-        {
-            // calculate reconciled col from old (existing) col and new col
-            IColumn reconciledColumn = column.reconcile(oldColumn, allocator);
-            map.put(name, reconciledColumn);
-            // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting
-            // we need to make sure we update indexes no matter the order we merge
-            if (reconciledColumn == column)
-                indexer.update(oldColumn, reconciledColumn);
-            else
-                indexer.update(column, reconciledColumn);
-            return reconciledColumn.dataSize() - oldColumn.dataSize();
-        }
+            indexer.update(column, reconciledColumn);
+        return reconciledColumn.dataSize() - oldColumn.dataSize();
     }
 
-    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation, SecondaryIndexManager.Updater indexer)
+    public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer)
     {
         delete(cm.getDeletionInfo());
-        for (IColumn column : cm.getSortedColumns())
+        for (Column column : cm.getSortedColumns())
             addColumn(transformation.apply(column), allocator, indexer);
 
         // we don't use this for memtables, so we don't bother computing size
@@ -142,12 +128,12 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
     /**
      * We need to go through each column in the column container and resolve it before adding
      */
-    public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation)
+    public void addAll(ISortedColumns cm, Allocator allocator, Function<Column, Column> transformation)
     {
         addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater);
     }
 
-    public boolean replace(IColumn oldColumn, IColumn newColumn)
+    public boolean replace(Column oldColumn, Column newColumn)
     {
         if (!oldColumn.name().equals(newColumn.name()))
             throw new IllegalArgumentException();
@@ -156,7 +142,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         // column or the column was not equal to oldColumn (to be coherent
         // with other implementation). We optimize for the common case where
         // oldColumn do is present though.
-        IColumn previous = map.put(oldColumn.name(), newColumn);
+        Column previous = map.put(oldColumn.name(), newColumn);
         if (previous == null)
         {
             map.remove(oldColumn.name());
@@ -170,7 +156,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         return true;
     }
 
-    public IColumn getColumn(ByteBuffer name)
+    public Column getColumn(ByteBuffer name)
     {
         return map.get(name);
     }
@@ -190,12 +176,12 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         return map.size();
     }
 
-    public Collection<IColumn> getSortedColumns()
+    public Collection<Column> getSortedColumns()
     {
         return map.values();
     }
 
-    public Collection<IColumn> getReverseSortedColumns()
+    public Collection<Column> getReverseSortedColumns()
     {
         return map.descendingMap().values();
     }
@@ -205,17 +191,17 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn
         return map.navigableKeySet();
     }
 
-    public Iterator<IColumn> iterator()
+    public Iterator<Column> iterator()
     {
         return map.values().iterator();
     }
 
-    public Iterator<IColumn> iterator(ColumnSlice[] slices)
+    public Iterator<Column> iterator(ColumnSlice[] slices)
     {
         return new ColumnSlice.NavigableMapIterator(map, slices);
     }
 
-    public Iterator<IColumn> reverseIterator(ColumnSlice[] slices)
+    public Iterator<Column> reverseIterator(ColumnSlice[] slices)
     {
         return new ColumnSlice.NavigableMapIterator(map.descendingMap(), slices);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java b/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
index 9c9682f..9697335 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.columniterator;
 
-import org.apache.cassandra.db.SuperColumn;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -30,10 +29,4 @@ public class IdentityQueryFilter extends SliceQueryFilter
     {
         super(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
     }
-
-    public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
-    {
-        // no filtering done, deliberately
-        return superColumn;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index db0130e..ac03583 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.collect.AbstractIterator;
@@ -352,7 +353,9 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             if (file == null)
                 file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput;
 
-            OnDiskAtom.Serializer atomSerializer = emptyColumnFamily.getOnDiskSerializer();
+            // Give a bogus atom count since we'll deserialize as long as we're
+            // within the index block but we don't know how much atom is there
+            Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, Integer.MAX_VALUE, sstable.descriptor.version);
             file.seek(positionToSeek);
             FileMark mark = file.mark();
 
@@ -365,7 +368,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             {
                 // Only fetch a new column if we haven't dealt with the previous one.
                 if (column == null)
-                    column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+                    column = atomIterator.next();
 
                 // col is before slice
                 // (If in slice, don't bother checking that until we change slice)
@@ -434,12 +437,10 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
             // We remenber when we are whithin a slice to avoid some comparison
             boolean inSlice = false;
 
-            OnDiskAtom.Serializer atomSerializer = emptyColumnFamily.getOnDiskSerializer();
-            int columns = file.readInt();
-
-            for (int i = 0; i < columns; i++)
+            Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
+            while (atomIterator.hasNext())
             {
-                OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+                OnDiskAtom column = atomIterator.next();
 
                 // col is before slice
                 // (If in slice, don't bother checking that until we change slice)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index da4631d..389fbb2 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilySerializer;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.OnDiskAtom;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -196,13 +196,12 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
 
     private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<ByteBuffer> filteredColumnNames, List<OnDiskAtom> result) throws IOException
     {
-        OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
-        int columns = file.readInt();
+        Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
         int n = 0;
-        for (int i = 0; i < columns; i++)
+        while (atomIterator.hasNext())
         {
-            OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
-            if (column instanceof IColumn)
+            OnDiskAtom column = atomIterator.next();
+            if (column instanceof Column)
             {
                 if (columnNames.contains(column.name()))
                 {
@@ -255,15 +254,16 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
             if (file == null)
                 file = createFileDataInput(positionToSeek);
 
-            OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
+            // We'll read as much atom as there is in the index block, so provide a bogus atom count
+            Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, Integer.MAX_VALUE, sstable.descriptor.version);
             file.seek(positionToSeek);
             FileMark mark = file.mark();
             // TODO only completely deserialize columns we are interested in
             while (file.bytesPastMark(mark) < indexInfo.width)
             {
-                OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+                OnDiskAtom column = atomIterator.next();
                 // we check vs the original Set, not the filtered List, for efficiency
-                if (!(column instanceof IColumn) || columnNames.contains(column.name()))
+                if (!(column instanceof Column) || columnNames.contains(column.name()))
                     result.add(column);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index d19e6a5..652d60c 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.columniterator;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
 
@@ -43,10 +44,9 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
     private final ByteBuffer finishColumn;
     private final AbstractType<?> comparator;
     private final ColumnFamily emptyColumnFamily;
-    private final int columns;
     private int i;
     private FileMark mark;
-    private final OnDiskAtom.Serializer atomSerializer;
+    private final Iterator<OnDiskAtom> atomIterator;
 
     public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer finishColumn)
     {
@@ -79,8 +79,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
 
             emptyColumnFamily = ColumnFamily.create(sstable.metadata);
             emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version));
-            atomSerializer = emptyColumnFamily.getOnDiskSerializer();
-            columns = file.readInt();
+            atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
             mark = file.mark();
         }
         catch (IOException e)
@@ -92,14 +91,14 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
 
     protected OnDiskAtom computeNext()
     {
-        if (i++ >= columns)
+        if (!atomIterator.hasNext())
             return endOfData();
 
         OnDiskAtom column;
         try
         {
             file.reset(mark);
-            column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version);
+            column = atomIterator.next();
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 17f3519..533219c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -35,11 +35,12 @@ public class CommitLogDescriptor
 
     public static final int LEGACY_VERSION = 1;
     public static final int VERSION_12 = 2;
+    public static final int VERSION_20 = 3;
     /**
      * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
      */
-    public static final int current_version = VERSION_12;
+    public static final int current_version = VERSION_20;
 
     private final int version;
     public final long id;
@@ -75,13 +76,15 @@ public class CommitLogDescriptor
 
     public int getMessagingVersion()
     {
-        assert MessagingService.current_version == MessagingService.VERSION_12;
+        assert MessagingService.current_version == MessagingService.VERSION_20;
         switch (version)
         {
             case LEGACY_VERSION:
                 return MessagingService.VERSION_11;
             case VERSION_12:
                 return MessagingService.VERSION_12;
+            case VERSION_20:
+                return MessagingService.VERSION_20;
             default:
                 throw new IllegalStateException("Unknown commitlog version " + version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 9f949d0..3ef3693 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
@@ -201,7 +200,7 @@ public class CommitLogReplayer
                 {
                     // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
                     // the current version. so do make sure the CL is drained prior to upgrading a node.
-                    rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, IColumnSerializer.Flag.LOCAL);
+                    rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
                 }
                 catch (UnknownColumnFamilyException ex)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index eec4b4c..e93fdc1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -603,7 +603,7 @@ public class CompactionManager implements CompactionManagerMBean
 
             SSTableScanner scanner = sstable.getDirectScanner();
             long rowsRead = 0;
-            List<IColumn> indexedColumnsInRow = null;
+            List<Column> indexedColumnsInRow = null;
 
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
             metrics.beginCompaction(ci);
@@ -637,12 +637,12 @@ public class CompactionManager implements CompactionManagerMBean
                                 OnDiskAtom column = row.next();
                                 if (column instanceof CounterColumn)
                                     renewer.maybeRenew((CounterColumn) column);
-                                if (column instanceof IColumn && cfs.indexManager.indexes((IColumn)column))
+                                if (column instanceof Column && cfs.indexManager.indexes((Column)column))
                                 {
                                     if (indexedColumnsInRow == null)
-                                        indexedColumnsInRow = new ArrayList<IColumn>();
+                                        indexedColumnsInRow = new ArrayList<Column>();
 
-                                    indexedColumnsInRow.add((IColumn)column);
+                                    indexedColumnsInRow.add((Column)column);
                                 }
                             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7981d88..6ff9a50 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -252,7 +252,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             }
             else
             {
-                IColumn column = (IColumn) current;
+                Column column = (Column) current;
                 container.addColumn(column);
                 if (container.getColumn(column.name()) != column)
                     indexer.remove(column);
@@ -284,7 +284,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                     container.clear();
                     return null;
                 }
-                IColumn reduced = purged.iterator().next();
+                Column reduced = purged.iterator().next();
                 container.clear();
 
                 // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 7e1983c..58227f6 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -207,7 +207,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                     {
                         // addAll is ok even if cf is an ArrayBackedSortedColumns
                         SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.updaterFor(row.key, false);
-                        cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<IColumn>identity(), indexer);
+                        cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<Column>identity(), indexer);
                     }
                 }
 
@@ -218,7 +218,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
         private class DeserializedColumnIterator implements ICountableColumnIterator
         {
             private final Row row;
-            private Iterator<IColumn> iter;
+            private Iterator<Column> iter;
 
             public DeserializedColumnIterator(Row row)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index be4b20e..213bb8e 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -121,7 +121,7 @@ public class PrecompactedRow extends AbstractCompactedRow
             else
             {
                 // addAll is ok even if cf is an ArrayBackedSortedColumns
-                cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<IColumn>identity(), indexer);
+                cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.<Column>identity(), indexer);
             }
         }
         return cf;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index ccf83d9..697ad02 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -22,8 +22,8 @@ package org.apache.cassandra.db.filter;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.IColumnContainer;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -32,7 +32,7 @@ public class ColumnCounter
     protected int live;
     protected int ignored;
 
-    public void count(IColumn column, IColumnContainer container)
+    public void count(Column column, ColumnFamily container)
     {
         if (!isLive(column, container))
             ignored++;
@@ -40,7 +40,7 @@ public class ColumnCounter
             live++;
     }
 
-    protected static boolean isLive(IColumn column, IColumnContainer container)
+    protected static boolean isLive(Column column, ColumnFamily container)
     {
         return column.isLive() && (!container.deletionInfo().isDeleted(column));
     }
@@ -79,7 +79,7 @@ public class ColumnCounter
             assert toGroup == 0 || type != null;
         }
 
-        public void count(IColumn column, IColumnContainer container)
+        public void count(Column column, ColumnFamily container)
         {
             if (!isLive(column, container))
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index cdcdc17..e62a436 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -140,21 +140,21 @@ public class ColumnSlice
         }
     }
 
-    public static class NavigableMapIterator extends AbstractIterator<IColumn>
+    public static class NavigableMapIterator extends AbstractIterator<Column>
     {
-        private final NavigableMap<ByteBuffer, IColumn> map;
+        private final NavigableMap<ByteBuffer, Column> map;
         private final ColumnSlice[] slices;
 
         private int idx = 0;
-        private Iterator<IColumn> currentSlice;
+        private Iterator<Column> currentSlice;
 
-        public NavigableMapIterator(NavigableMap<ByteBuffer, IColumn> map, ColumnSlice[] slices)
+        public NavigableMapIterator(NavigableMap<ByteBuffer, Column> map, ColumnSlice[] slices)
         {
             this.map = map;
             this.slices = slices;
         }
 
-        protected IColumn computeNext()
+        protected Column computeNext()
         {
             if (currentSlice == null)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 4772c53..41f9d28 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -281,7 +281,7 @@ public abstract class ExtendedFilter
             {
                 // check column data vs expression
                 ByteBuffer colName = builder == null ? expression.column_name : builder.copy().add(expression.column_name).build();
-                IColumn column = data.getColumn(colName);
+                Column column = data.getColumn(colName);
                 if (column == null)
                     return false;
                 int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index f1d9611..5115e95 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -66,15 +66,9 @@ public interface IDiskAtomFilter
      * by the filter code, which should have some limit on the number of columns
      * to avoid running out of memory on large rows.
      */
-    public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore);
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore);
 
-    /**
-     * subcolumns of a supercolumn are unindexed, so to pick out parts of those we operate in-memory.
-     * @param superColumn may be modified by filtering op.
-     */
-    public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore);
-
-    public Comparator<IColumn> getColumnComparator(AbstractType<?> comparator);
+    public Comparator<Column> getColumnComparator(AbstractType<?> comparator);
 
     public boolean isReversed();
     public void updateColumnsLimit(int newLimit);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 0581e12..5d0bc49 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -86,29 +86,17 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return new SSTableNamesIterator(sstable, file, key, columns, indexEntry);
     }
 
-    public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
-    {
-        for (IColumn column : superColumn.getSubColumns())
-        {
-            if (!columns.contains(column.name()) || !QueryFilter.isRelevant(column, superColumn, gcBefore))
-            {
-                superColumn.remove(column.name());
-            }
-        }
-        return superColumn;
-    }
-
-    public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
     {
         while (reducedColumns.hasNext())
         {
-            IColumn column = reducedColumns.next();
+            Column column = reducedColumns.next();
             if (QueryFilter.isRelevant(column, container, gcBefore))
                 container.addColumn(column);
         }
     }
 
-    public Comparator<IColumn> getColumnComparator(AbstractType<?> comparator)
+    public Comparator<Column> getColumnComparator(AbstractType<?> comparator)
     {
         return comparator.columnComparator;
     }
@@ -136,7 +124,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
             return cf.hasOnlyTombstones() ? 0 : 1;
 
         int count = 0;
-        for (IColumn column : cf)
+        for (Column column : cf)
         {
             if (column.isLive())
                 count++;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 31b9db7..c6414fa 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -33,16 +33,14 @@ import org.apache.cassandra.utils.MergeIterator;
 public class QueryFilter
 {
     public final DecoratedKey key;
-    public final QueryPath path;
+    public final String cfName;
     public final IDiskAtomFilter filter;
-    private final IDiskAtomFilter superFilter;
 
-    public QueryFilter(DecoratedKey key, QueryPath path, IDiskAtomFilter filter)
+    public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter)
     {
         this.key = key;
-        this.path = path;
+        this.cfName = cfName;
         this.filter = filter;
-        superFilter = path.superColumnName == null ? null : new NamesQueryFilter(path.superColumnName);
     }
 
     public OnDiskAtomIterator getMemtableColumnIterator(Memtable memtable)
@@ -56,95 +54,62 @@ public class QueryFilter
     public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
     {
         assert cf != null;
-        if (path.superColumnName == null)
-            return filter.getMemtableColumnIterator(cf, key);
-        return superFilter.getMemtableColumnIterator(cf, key);
+        return filter.getMemtableColumnIterator(cf, key);
     }
 
-    // TODO move gcBefore into a field
     public ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable)
     {
-        if (path.superColumnName == null)
-            return filter.getSSTableColumnIterator(sstable, key);
-        return superFilter.getSSTableColumnIterator(sstable, key);
+        return filter.getSSTableColumnIterator(sstable, key);
     }
 
     public ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
-        if (path.superColumnName == null)
-            return filter.getSSTableColumnIterator(sstable, file, key, indexEntry);
-        return superFilter.getSSTableColumnIterator(sstable, file, key, indexEntry);
+        return filter.getSSTableColumnIterator(sstable, file, key, indexEntry);
     }
 
     public void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends CloseableIterator<OnDiskAtom>> toCollate, final int gcBefore)
     {
-        List<CloseableIterator<IColumn>> filteredIterators = new ArrayList<CloseableIterator<IColumn>>(toCollate.size());
+        List<CloseableIterator<Column>> filteredIterators = new ArrayList<CloseableIterator<Column>>(toCollate.size());
         for (CloseableIterator<OnDiskAtom> iter : toCollate)
             filteredIterators.add(gatherTombstones(returnCF, iter));
         collateColumns(returnCF, filteredIterators, gcBefore);
     }
 
-    // TODO move gcBefore into a field
-    public void collateColumns(final ColumnFamily returnCF, List<? extends CloseableIterator<IColumn>> toCollate, final int gcBefore)
+    public void collateColumns(final ColumnFamily returnCF, List<? extends CloseableIterator<Column>> toCollate, final int gcBefore)
     {
-        IDiskAtomFilter topLevelFilter = (superFilter == null ? filter : superFilter);
-
-        Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(returnCF.getComparator());
+        Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
         // define a 'reduced' iterator that merges columns w/ the same name, which
         // greatly simplifies computing liveColumns in the presence of tombstones.
-        MergeIterator.Reducer<IColumn, IColumn> reducer = new MergeIterator.Reducer<IColumn, IColumn>()
+        MergeIterator.Reducer<Column, Column> reducer = new MergeIterator.Reducer<Column, Column>()
         {
             ColumnFamily curCF = returnCF.cloneMeShallow();
 
-            public void reduce(IColumn current)
+            public void reduce(Column current)
             {
-                if (curCF.isSuper() && curCF.isEmpty())
-                {
-                    // If it is the first super column we add, we must clone it since other super column may modify
-                    // it otherwise and it could be aliased in a memtable somewhere. We'll also don't have to care about what
-                    // consumers make of the result (for instance CFS.getColumnFamily() call removeDeleted() on the
-                    // result which removes column; which shouldn't be done on the original super column).
-                    assert current instanceof SuperColumn;
-                    curCF.addColumn(((SuperColumn) current).cloneMe());
-                }
-                else
-                {
-                    curCF.addColumn(current);
-                }
+                curCF.addColumn(current);
             }
 
-            protected IColumn getReduced()
+            protected Column getReduced()
             {
-                IColumn c = curCF.iterator().next();
-                if (superFilter != null)
-                {
-                    // filterSuperColumn only looks at immediate parent (the supercolumn) when determining if a subcolumn
-                    // is still live, i.e., not shadowed by the parent's tombstone.  so, bump it up temporarily to the tombstone
-                    // time of the cf, if that is greater.
-                    DeletionInfo delInfo = ((SuperColumn) c).deletionInfo();
-                    ((SuperColumn) c).delete(returnCF.deletionInfo());
-                    c = filter.filterSuperColumn((SuperColumn) c, gcBefore);
-                    ((SuperColumn) c).setDeletionInfo(delInfo); // reset sc tombstone time to what it should be
-                }
+                Column c = curCF.iterator().next();
                 curCF.clear();
-
                 return c;
             }
         };
-        Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, reducer);
+        Iterator<Column> reduced = MergeIterator.get(toCollate, fcomp, reducer);
 
-        topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore);
+        filter.collectReducedColumns(returnCF, reduced, gcBefore);
     }
 
     /**
      * Given an iterator of on disk atom, returns an iterator that filters the tombstone range
      * markers adding them to {@code returnCF} and returns the normal column.
      */
-    public static CloseableIterator<IColumn> gatherTombstones(final ColumnFamily returnCF, final CloseableIterator<OnDiskAtom> iter)
+    public static CloseableIterator<Column> gatherTombstones(final ColumnFamily returnCF, final CloseableIterator<OnDiskAtom> iter)
     {
-        return new CloseableIterator<IColumn>()
+        return new CloseableIterator<Column>()
         {
-            private IColumn next;
+            private Column next;
 
             public boolean hasNext()
             {
@@ -155,13 +120,13 @@ public class QueryFilter
                 return next != null;
             }
 
-            public IColumn next()
+            public Column next()
             {
                 if (next == null)
                     getNext();
 
                 assert next != null;
-                IColumn toReturn = next;
+                Column toReturn = next;
                 next = null;
                 return toReturn;
             }
@@ -172,9 +137,9 @@ public class QueryFilter
                 {
                     OnDiskAtom atom = iter.next();
 
-                    if (atom instanceof IColumn)
+                    if (atom instanceof Column)
                     {
-                        next = (IColumn)atom;
+                        next = (Column)atom;
                         break;
                     }
                     else
@@ -198,10 +163,10 @@ public class QueryFilter
 
     public String getColumnFamilyName()
     {
-        return path.columnFamilyName;
+        return cfName;
     }
 
-    public static boolean isRelevant(IColumn column, IColumnContainer container, int gcBefore)
+    public static boolean isRelevant(Column column, ColumnFamily container, int gcBefore)
     {
         // the column itself must be not gc-able (it is live, or a still relevant tombstone, or has live subcolumns), (1)
         // and if its container is deleted, the column must be changed more recently than the container tombstone (2)
@@ -214,51 +179,48 @@ public class QueryFilter
     /**
      * @return a QueryFilter object to satisfy the given slice criteria:
      * @param key the row to slice
-     * @param path path to the level to slice at (CF or SuperColumn)
+     * @param cfName column family to query
      * @param start column to start slice at, inclusive; empty for "the first column"
      * @param finish column to stop slice at, inclusive; empty for "the last column"
      * @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
      * @param limit maximum number of non-deleted columns to return
      */
-    public static QueryFilter getSliceFilter(DecoratedKey key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+    public static QueryFilter getSliceFilter(DecoratedKey key, String cfName, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
     {
-        return new QueryFilter(key, path, new SliceQueryFilter(start, finish, reversed, limit));
+        return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit));
     }
 
     /**
      * return a QueryFilter object that includes every column in the row.
      * This is dangerous on large rows; avoid except for test code.
      */
-    public static QueryFilter getIdentityFilter(DecoratedKey key, QueryPath path)
+    public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName)
     {
-        return new QueryFilter(key, path, new IdentityQueryFilter());
+        return new QueryFilter(key, cfName, new IdentityQueryFilter());
     }
 
     /**
      * @return a QueryFilter object that will return columns matching the given names
      * @param key the row to slice
-     * @param path path to the level to slice at (CF or SuperColumn)
+     * @param cfName column family to query
      * @param columns the column names to restrict the results to, sorted in comparator order
      */
-    public static QueryFilter getNamesFilter(DecoratedKey key, QueryPath path, SortedSet<ByteBuffer> columns)
+    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns)
     {
-        return new QueryFilter(key, path, new NamesQueryFilter(columns));
+        return new QueryFilter(key, cfName, new NamesQueryFilter(columns));
     }
 
     /**
      * convenience method for creating a name filter matching a single column
      */
-    public static QueryFilter getNamesFilter(DecoratedKey key, QueryPath path, ByteBuffer column)
+    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column)
     {
-        return new QueryFilter(key, path, new NamesQueryFilter(column));
+        return new QueryFilter(key, cfName, new NamesQueryFilter(column));
     }
 
     @Override
-    public String toString() {
-        return getClass().getSimpleName() + "(key=" + key +
-               ", path=" + path +
-               (filter == null ? "" : ", filter=" + filter) +
-               (superFilter == null ? "" : ", superFilter=" + superFilter) +
-               ")";
+    public String toString()
+    {
+        return getClass().getSimpleName() + "(key=" + key + ", cfName=" + cfName + (filter == null ? "" : ", filter=" + filter) + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/QueryPath.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryPath.java b/src/java/org/apache/cassandra/db/filter/QueryPath.java
index 0bae441..022631e 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryPath.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryPath.java
@@ -21,10 +21,12 @@ import java.io.*;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ColumnPath;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+/**
+ * This class is obsolete internally, but kept for wire compatibility with
+ * older nodes. I.e. we kept it only for the serialization part.
+ */
 public class QueryPath
 {
     public final String columnFamilyName;
@@ -38,31 +40,11 @@ public class QueryPath
         this.columnName = columnName;
     }
 
-    public QueryPath(ColumnParent columnParent)
-    {
-        this(columnParent.column_family, columnParent.super_column, null);
-    }
-
     public QueryPath(String columnFamilyName, ByteBuffer superColumnName)
     {
         this(columnFamilyName, superColumnName, null);
     }
 
-    public QueryPath(String columnFamilyName)
-    {
-        this(columnFamilyName, null);
-    }
-
-    public QueryPath(ColumnPath column_path)
-    {
-        this(column_path.column_family, column_path.super_column, column_path.column);
-    }
-
-    public static QueryPath column(ByteBuffer columnName)
-    {
-        return new QueryPath(null, null, columnName);
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 2971151..25e7053 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -46,7 +46,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
     public final ColumnSlice[] slices;
     public final boolean reversed;
     public volatile int count;
-    private final int compositesToGroup;
+    public final int compositesToGroup;
     // This is a hack to allow rolling upgrade with pre-1.2 nodes
     private final int countMutliplierForCompatibility;
 
@@ -91,6 +91,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup, countMutliplierForCompatibility);
     }
 
+    public SliceQueryFilter withUpdatedSlice(ByteBuffer start, ByteBuffer finish)
+    {
+        return new SliceQueryFilter(new ColumnSlice[]{ new ColumnSlice(start, finish) }, reversed, count, compositesToGroup, countMutliplierForCompatibility);
+    }
+
     public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
     {
         return Memtable.getSliceIterator(key, cf, this);
@@ -106,57 +111,18 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return new SSTableSliceIterator(sstable, file, key, slices, reversed, indexEntry);
     }
 
-    public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
-    {
-        // we clone shallow, then add, under the theory that generally we're interested in a relatively small number of subcolumns.
-        // this may be a poor assumption.
-        SuperColumn scFiltered = superColumn.cloneMeShallow();
-        final Iterator<IColumn> subcolumns;
-        if (reversed)
-        {
-            List<IColumn> columnsAsList = new ArrayList<IColumn>(superColumn.getSubColumns());
-            subcolumns = Lists.reverse(columnsAsList).iterator();
-        }
-        else
-        {
-            subcolumns = superColumn.getSubColumns().iterator();
-        }
-        final Comparator<ByteBuffer> comparator = reversed ? superColumn.getComparator().reverseComparator : superColumn.getComparator();
-        Iterator<IColumn> results = new AbstractIterator<IColumn>()
-        {
-            protected IColumn computeNext()
-            {
-                while (subcolumns.hasNext())
-                {
-                    IColumn subcolumn = subcolumns.next();
-                    // iterate until we get to the "real" start column
-                    if (comparator.compare(subcolumn.name(), start()) < 0)
-                        continue;
-                    // exit loop when columns are out of the range.
-                    if (finish().remaining() > 0 && comparator.compare(subcolumn.name(), finish()) > 0)
-                        break;
-                    return subcolumn;
-                }
-                return endOfData();
-            }
-        };
-        // subcolumns is either empty now, or has been redefined in the loop above. either is ok.
-        collectReducedColumns(scFiltered, results, gcBefore);
-        return scFiltered;
-    }
-
-    public Comparator<IColumn> getColumnComparator(AbstractType<?> comparator)
+    public Comparator<Column> getColumnComparator(AbstractType<?> comparator)
     {
         return reversed ? comparator.columnReverseComparator : comparator.columnComparator;
     }
 
-    public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
     {
         columnCounter = getColumnCounter(container);
 
         while (reducedColumns.hasNext())
         {
-            IColumn column = reducedColumns.next();
+            Column column = reducedColumns.next();
             if (logger.isTraceEnabled())
                 logger.trace(String.format("collecting %s of %s: %s",
                                            columnCounter.live(), count, column.getString(container.getComparator())));
@@ -177,12 +143,12 @@ public class SliceQueryFilter implements IDiskAtomFilter
     public int getLiveCount(ColumnFamily cf)
     {
         ColumnCounter counter = getColumnCounter(cf);
-        for (IColumn column : cf)
+        for (Column column : cf)
             counter.count(column, cf);
         return counter.live();
     }
 
-    private ColumnCounter getColumnCounter(IColumnContainer container)
+    private ColumnCounter getColumnCounter(ColumnFamily container)
     {
         AbstractType<?> comparator = container.getComparator();
         if (compositesToGroup < 0)
@@ -200,11 +166,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
         Collection<ByteBuffer> toRemove = null;
         boolean trimRemaining = false;
 
-        Collection<IColumn> columns = reversed
-                                    ? cf.getReverseSortedColumns()
-                                    : cf.getSortedColumns();
+        Collection<Column> columns = reversed
+                                   ? cf.getReverseSortedColumns()
+                                   : cf.getSortedColumns();
 
-        for (IColumn column : columns)
+        for (Column column : columns)
         {
             if (trimRemaining)
             {