You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/02 13:41:16 UTC

[5/9] Replace supercolumns internally by composites

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 4bb14fc..2f96ef8 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -72,7 +72,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     protected abstract void init(ColumnDefinition columnDef);
 
-    protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column);
+    protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column);
 
     protected abstract AbstractType getExpressionComparator();
 
@@ -86,7 +86,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
     }
 
 
-    public void delete(ByteBuffer rowKey, IColumn column)
+    public void delete(ByteBuffer rowKey, Column column)
     {
         if (column.isMarkedForDelete())
             return;
@@ -100,7 +100,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
             logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
     }
 
-    public void insert(ByteBuffer rowKey, IColumn column)
+    public void insert(ByteBuffer rowKey, Column column)
     {
         DecoratedKey valueKey = getIndexKeyFor(column.value());
         ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
@@ -120,7 +120,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater);
     }
 
-    public void update(ByteBuffer rowKey, IColumn col)
+    public void update(ByteBuffer rowKey, Column col)
     {
         insert(rowKey, col);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index d202578..991581d 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -19,8 +19,7 @@ package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -35,7 +34,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
      * @param rowKey the underlying row key which is indexed
      * @param col all the column info
      */
-    public abstract void delete(ByteBuffer rowKey, IColumn col);
+    public abstract void delete(ByteBuffer rowKey, Column col);
 
     /**
      * insert a column to the index
@@ -43,7 +42,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
      * @param rowKey the underlying row key which is indexed
      * @param col all the column info
      */
-    public abstract void insert(ByteBuffer rowKey, IColumn col);
+    public abstract void insert(ByteBuffer rowKey, Column col);
 
     /**
      * update a column from the index
@@ -51,7 +50,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
      * @param rowKey the underlying row key which is indexed
      * @param col all the column info
      */
-    public abstract void update(ByteBuffer rowKey, IColumn col);
+    public abstract void update(ByteBuffer rowKey, Column col);
 
     public String getNameForSystemTable(ByteBuffer column)
     {
@@ -61,6 +60,6 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
     @Override
     public boolean validate(Column column)
     {
-        return column.value.remaining() < FBUtilities.MAX_UNSIGNED_SHORT;
+        return column.value().remaining() < FBUtilities.MAX_UNSIGNED_SHORT;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
index 0200667..1dd2de7 100644
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
@@ -20,9 +20,9 @@ package org.apache.cassandra.db.index;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index c7af4f1..f78061c 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.SystemTable;
@@ -41,7 +42,6 @@ import org.apache.cassandra.db.marshal.LocalByPartionerType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.service.StorageService;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 1be04dd..0de43b3 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.IndexExpression;
 
 /**
@@ -49,11 +48,11 @@ public class SecondaryIndexManager
 
     public static final Updater nullUpdater = new Updater()
     {
-        public void insert(IColumn column) { }
+        public void insert(Column column) { }
 
-        public void update(IColumn oldColumn, IColumn column) { }
+        public void update(Column oldColumn, Column column) { }
 
-        public void remove(IColumn current) { }
+        public void remove(Column current) { }
     };
 
     /**
@@ -172,7 +171,7 @@ public class SecondaryIndexManager
         return null;
     }
 
-    public boolean indexes(IColumn column)
+    public boolean indexes(Column column)
     {
         return indexes(column.name());
     }
@@ -434,7 +433,7 @@ public class SecondaryIndexManager
             }
             else
             {
-                for (IColumn column : cf)
+                for (Column column : cf)
                 {
                     if (index.indexes(column.name()))
                         ((PerColumnSecondaryIndex) index).insert(key, column);
@@ -449,12 +448,12 @@ public class SecondaryIndexManager
      * @param key the row key
      * @param indexedColumnsInRow all column names in row
      */
-    public void deleteFromIndexes(DecoratedKey key, List<IColumn> indexedColumnsInRow)
+    public void deleteFromIndexes(DecoratedKey key, List<Column> indexedColumnsInRow)
     {
         // Update entire row only once per row level index
         Set<Class<? extends SecondaryIndex>> cleanedRowLevelIndexes = null;
 
-        for (IColumn column : indexedColumnsInRow)
+        for (Column column : indexedColumnsInRow)
         {
             SecondaryIndex index = indexesByColumn.get(column.name());
             if (index == null)
@@ -574,17 +573,17 @@ public class SecondaryIndexManager
 
     public boolean validate(Column column)
     {
-        SecondaryIndex index = getIndexForColumn(column.name);
+        SecondaryIndex index = getIndexForColumn(column.name());
         return index != null ? index.validate(column) : true;
     }
 
     public static interface Updater
     {
-        public void insert(IColumn column);
+        public void insert(Column column);
 
-        public void update(IColumn oldColumn, IColumn column);
+        public void update(Column oldColumn, Column column);
 
-        public void remove(IColumn current);
+        public void remove(Column current);
     }
 
     private class PerColumnIndexUpdater implements Updater
@@ -596,7 +595,7 @@ public class SecondaryIndexManager
             this.key = key;
         }
 
-        public void insert(IColumn column)
+        public void insert(Column column)
         {
             if (column.isMarkedForDelete())
                 return;
@@ -608,7 +607,7 @@ public class SecondaryIndexManager
             ((PerColumnSecondaryIndex) index).insert(key.key, column);
         }
 
-        public void update(IColumn oldColumn, IColumn column)
+        public void update(Column oldColumn, Column column)
         {
             if (column.isMarkedForDelete())
                 return;
@@ -621,7 +620,7 @@ public class SecondaryIndexManager
             ((PerColumnSecondaryIndex) index).insert(key.key, column);
         }
 
-        public void remove(IColumn column)
+        public void remove(Column column)
         {
             if (column.isMarkedForDelete())
                 return;
@@ -644,7 +643,7 @@ public class SecondaryIndexManager
             this.key = key;
         }
 
-        public void insert(IColumn column)
+        public void insert(Column column)
         {
             if (column.isMarkedForDelete())
                 return;
@@ -664,7 +663,7 @@ public class SecondaryIndexManager
             }
         }
 
-        public void update(IColumn oldColumn, IColumn column)
+        public void update(Column oldColumn, Column column)
         {
             if (column.isMarkedForDelete())
                 return;
@@ -685,7 +684,7 @@ public class SecondaryIndexManager
             }
         }
 
-        public void remove(IColumn column)
+        public void remove(Column column)
         {
             if (column.isMarkedForDelete())
                 return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index a8c1dde..3085f48 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -47,7 +47,7 @@ public abstract class SecondaryIndexSearcher
     
     protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue)
     {
-        IColumn liveColumn = liveData.getColumn(indexedColumnName);
+        Column liveColumn = liveData.getColumn(indexedColumnName);
         if (liveColumn == null || liveColumn.isMarkedForDelete())
             return true;
         

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index f1aa4aa..3d10ec5 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -56,7 +56,7 @@ public class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
         indexComparator = (CompositeType)SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
     }
 
-    protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column)
+    protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
     {
         CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
         ByteBuffer[] components = baseComparator.split(column.name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 29333b1..1f201db 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -160,8 +160,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         return new ColumnFamilyStore.AbstractScanIterator()
         {
             private ByteBuffer lastSeenPrefix = startPrefix;
-            private Deque<IColumn> indexColumns;
-            private final QueryPath path = new QueryPath(baseCfs.name);
+            private Deque<Column> indexColumns;
             private int columnsRead = Integer.MAX_VALUE;
 
             private final int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1);
@@ -218,7 +217,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                                          ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), indexComparator.getString(startPrefix));
 
                         QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
-                                                                             new QueryPath(index.getIndexCfs().name),
+                                                                             index.getIndexCfs().name,
                                                                              lastSeenPrefix,
                                                                              endPrefix,
                                                                              false,
@@ -227,10 +226,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                         if (indexRow == null)
                             return makeReturn(currentKey, data);
 
-                        Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+                        Collection<Column> sortedColumns = indexRow.getSortedColumns();
                         columnsRead = sortedColumns.size();
                         indexColumns = new ArrayDeque(sortedColumns);
-                        IColumn firstColumn = sortedColumns.iterator().next();
+                        Column firstColumn = sortedColumns.iterator().next();
 
                         // Paging is racy, so it is possible the first column of a page is not the last seen one.
                         if (lastSeenPrefix != startPrefix && lastSeenPrefix.equals(firstColumn.name()))
@@ -249,7 +248,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                     while (!indexColumns.isEmpty() && columnsCount <= limit)
                     {
-                        IColumn column = indexColumns.poll();
+                        Column column = indexColumns.poll();
                         lastSeenPrefix = column.name();
                         if (column.isMarkedForDelete())
                         {
@@ -302,7 +301,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                             continue;
 
                         SliceQueryFilter dataFilter = new SliceQueryFilter(start, builder.copy().buildAsEndOfRange(), false, Integer.MAX_VALUE, prefixSize);
-                        ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, path, dataFilter));
+                        ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter));
                         if (newData != null)
                         {
                             ByteBuffer baseColumnName = builder.copy().add(primary.column_name).build();
@@ -311,7 +310,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                             if (isIndexValueStale(newData, baseColumnName, indexedValue))
                             {
                                 // delete the index entry w/ its own timestamp
-                                IColumn dummyColumn = new Column(baseColumnName, indexedValue, column.timestamp());
+                                Column dummyColumn = new Column(baseColumnName, indexedValue, column.timestamp());
                                 ((PerColumnSecondaryIndex) index).delete(dk.key, dummyColumn);
                                 continue;
                             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 04c9946..8d065ab 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.Set;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -38,7 +38,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
         // Nothing specific
     }
 
-    protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column)
+    protected ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column)
     {
         return rowKey;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index cc7773c..62cdb78 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -108,8 +108,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
         return new ColumnFamilyStore.AbstractScanIterator()
         {
             private ByteBuffer lastSeenKey = startKey;
-            private Iterator<IColumn> indexColumns;
-            private final QueryPath path = new QueryPath(baseCfs.name);
+            private Iterator<Column> indexColumns;
             private int columnsRead = Integer.MAX_VALUE;
 
             protected Row computeNext()
@@ -132,7 +131,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                                          ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey));
 
                         QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
-                                                                             new QueryPath(index.getIndexCfs().name),
+                                                                             index.getIndexCfs().name,
                                                                              lastSeenKey,
                                                                              endKey,
                                                                              false,
@@ -145,10 +144,10 @@ public class KeysSearcher extends SecondaryIndexSearcher
                             return endOfData();
                         }
 
-                        Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+                        Collection<Column> sortedColumns = indexRow.getSortedColumns();
                         columnsRead = sortedColumns.size();
                         indexColumns = sortedColumns.iterator();
-                        IColumn firstColumn = sortedColumns.iterator().next();
+                        Column firstColumn = sortedColumns.iterator().next();
 
                         // Paging is racy, so it is possible the first column of a page is not the last seen one.
                         if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name()))
@@ -167,7 +166,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
 
                     while (indexColumns.hasNext())
                     {
-                        IColumn column = indexColumns.next();
+                        Column column = indexColumns.next();
                         lastSeenKey = column.name();
                         if (column.isMarkedForDelete())
                         {
@@ -188,7 +187,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         }
 
                         logger.trace("Returning index hit for {}", dk);
-                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter()));
+                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter()));
                         // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
                         if (data == null)
                             data = ColumnFamily.create(baseCfs.metadata);
@@ -198,7 +197,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
                         if (extraFilter != null)
                         {
-                            ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, path, extraFilter));
+                            ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter));
                             if (cf != null)
                                 data.addAll(cf, HeapAllocator.instance);
                         }
@@ -206,7 +205,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         if (isIndexValueStale(data, primary.column_name, indexKey.key))
                         {
                             // delete the index entry w/ its own timestamp
-                            IColumn dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());
+                            Column dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());
                             ((PerColumnSecondaryIndex)index).delete(dk.key, dummyColumn);
                             continue;
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 5393c0c..b3d158d 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -23,7 +23,7 @@ import java.util.Comparator;
 import java.util.Map;
 
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.OnDiskAtom;
 import org.apache.cassandra.db.RangeTombstone;
 import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
@@ -40,8 +40,8 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
 {
     public final Comparator<IndexInfo> indexComparator;
     public final Comparator<IndexInfo> indexReverseComparator;
-    public final Comparator<IColumn> columnComparator;
-    public final Comparator<IColumn> columnReverseComparator;
+    public final Comparator<Column> columnComparator;
+    public final Comparator<Column> columnReverseComparator;
     public final Comparator<OnDiskAtom> onDiskAtomComparator;
     public final Comparator<ByteBuffer> reverseComparator;
 
@@ -61,16 +61,16 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
                 return AbstractType.this.compare(o1.firstName, o2.firstName);
             }
         };
-        columnComparator = new Comparator<IColumn>()
+        columnComparator = new Comparator<Column>()
         {
-            public int compare(IColumn c1, IColumn c2)
+            public int compare(Column c1, Column c2)
             {
                 return AbstractType.this.compare(c1.name(), c2.name());
             }
         };
-        columnReverseComparator = new Comparator<IColumn>()
+        columnReverseComparator = new Comparator<Column>()
         {
-            public int compare(IColumn c1, IColumn c2)
+            public int compare(Column c1, Column c2)
             {
                 return AbstractType.this.compare(c2.name(), c1.name());
             }
@@ -159,10 +159,10 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     }
 
     /* convenience method */
-    public String getColumnsString(Collection<IColumn> columns)
+    public String getColumnsString(Collection<Column> columns)
     {
         StringBuilder builder = new StringBuilder();
-        for (IColumn column : columns)
+        for (Column column : columns)
         {
             builder.append(column.getString(this)).append(",");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index a19912b..621e5c3 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -49,7 +49,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
 
     protected abstract void appendToStringBuilder(StringBuilder sb);
 
-    public abstract ByteBuffer serialize(List<Pair<ByteBuffer, IColumn>> columns);
+    public abstract ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns);
 
     @Override
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index fb80906..d843f5a 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -68,6 +68,11 @@ public class CompositeType extends AbstractCompositeType
         return getInstance(parser.getTypeParameters());
     }
 
+    public static CompositeType getInstance(AbstractType... types)
+    {
+        return getInstance(Arrays.<AbstractType<?>>asList(types));
+    }
+
     public static synchronized CompositeType getInstance(List<AbstractType<?>> types)
     {
         assert types != null && !types.isEmpty();
@@ -126,6 +131,23 @@ public class CompositeType extends AbstractCompositeType
         return build(serialized);
     }
 
+    // Extract component idx from bb. Return null if there is not enough component.
+    public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
+    {
+        bb = bb.duplicate();
+        int i = 0;
+        while (bb.remaining() > 0)
+        {
+            ByteBuffer c = getWithShortLength(bb);
+            if (i == idx)
+                return c;
+
+            bb.get(); // skip end-of-component
+            ++i;
+        }
+        return null;
+    }
+
     @Override
     public boolean isCompatibleWith(AbstractType<?> previous)
     {
@@ -190,7 +212,7 @@ public class CompositeType extends AbstractCompositeType
         return new Builder(this);
     }
 
-    public ByteBuffer build(ByteBuffer... buffers)
+    public static ByteBuffer build(ByteBuffer... buffers)
     {
         int totalLength = 0;
         for (ByteBuffer bb : buffers)
@@ -200,7 +222,7 @@ public class CompositeType extends AbstractCompositeType
         for (ByteBuffer bb : buffers)
         {
             putShortLength(out, bb.remaining());
-            out.put(bb);
+            out.put(bb.duplicate());
             out.put((byte) 0);
         }
         out.flip();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 589e29e..76cf748 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -21,7 +21,7 @@ import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.utils.Pair;
@@ -118,11 +118,11 @@ public class ListType<T> extends CollectionType<List<T>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
     }
 
-    public ByteBuffer serialize(List<Pair<ByteBuffer, IColumn>> columns)
+    public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
     {
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(columns.size());
         int size = 0;
-        for (Pair<ByteBuffer, IColumn> p : columns)
+        for (Pair<ByteBuffer, Column> p : columns)
         {
             bbs.add(p.right.value());
             size += 2 + p.right.value().remaining();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 8364ea0..820abfa 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -21,7 +21,7 @@ import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.utils.Pair;
@@ -135,11 +135,11 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     /**
      * Creates the same output than decompose, but from the internal representation.
      */
-    public ByteBuffer serialize(List<Pair<ByteBuffer, IColumn>> columns)
+    public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
     {
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * columns.size());
         int size = 0;
-        for (Pair<ByteBuffer, IColumn> p : columns)
+        for (Pair<ByteBuffer, Column> p : columns)
         {
             bbs.add(p.left);
             bbs.add(p.right.value());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index bbfb46f..31afd66 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -21,7 +21,7 @@ import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.utils.Pair;
@@ -118,11 +118,11 @@ public class SetType<T> extends CollectionType<Set<T>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
     }
 
-    public ByteBuffer serialize(List<Pair<ByteBuffer, IColumn>> columns)
+    public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
     {
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(columns.size());
         int size = 0;
-        for (Pair<ByteBuffer, IColumn> p : columns)
+        for (Pair<ByteBuffer, Column> p : columns)
         {
             bbs.add(p.left);
             size += 2 + p.left.remaining();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 057d46a..a78a4ca 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -37,7 +37,7 @@ import org.apache.thrift.TApplicationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -65,8 +65,8 @@ import org.apache.thrift.TException;
  *
  * The default split size is 64k rows.
  */
-public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
-    implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
+    implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
 
@@ -313,7 +313,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
         return map;
     }
 
-    public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+    public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();
     }
@@ -332,7 +332,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
         return oldInputSplits;
     }
 
-    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
     {
         TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 07a5460..ea98cc9 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -30,11 +30,26 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.CounterSuperColumn;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -45,8 +60,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TSocket;
 
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
-    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>>
+    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
 
@@ -54,7 +69,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     private ColumnFamilySplit split;
     private RowIterator iter;
-    private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
+    private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow;
     private SlicePredicate predicate;
     private boolean isEmptyPredicate;
     private int totalRowCount; // total number of rows to fetch
@@ -93,7 +108,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return currentRow.left;
     }
 
-    public SortedMap<ByteBuffer, IColumn> getCurrentValue()
+    public SortedMap<ByteBuffer, Column> getCurrentValue()
     {
         return currentRow.right;
     }
@@ -219,10 +234,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return split.getLocations()[0];
     }
 
-    private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
+    private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
     {
         protected List<KeySlice> rows;
         protected int totalRead = 0;
+        protected final boolean isSuper;
         protected final AbstractType<?> comparator;
         protected final AbstractType<?> subComparator;
         protected final IPartitioner partitioner;
@@ -242,6 +258,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                 int idx = cfnames.indexOf(cfName);
                 CfDef cf_def = ks_def.cf_defs.get(idx);
 
+                isSuper = cf_def.column_type.equals("Super");
                 comparator = TypeParser.parse(cf_def.comparator_type);
                 subComparator = cf_def.subcomparator_type == null ? null : TypeParser.parse(cf_def.subcomparator_type);
             }
@@ -267,46 +284,50 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             return totalRead;
         }
 
-        protected IColumn unthriftify(ColumnOrSuperColumn cosc)
+        protected List<Column> unthriftify(ColumnOrSuperColumn cosc)
         {
             if (cosc.counter_column != null)
-                return unthriftifyCounter(cosc.counter_column);
+                return Collections.<Column>singletonList(unthriftifyCounter(cosc.counter_column));
             if (cosc.counter_super_column != null)
                 return unthriftifySuperCounter(cosc.counter_super_column);
             if (cosc.super_column != null)
                 return unthriftifySuper(cosc.super_column);
             assert cosc.column != null;
-            return unthriftifySimple(cosc.column);
+            return Collections.<Column>singletonList(unthriftifySimple(cosc.column));
         }
 
-        private IColumn unthriftifySuper(SuperColumn super_column)
+        private List<Column> unthriftifySuper(SuperColumn super_column)
         {
-            org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
-            for (Column column : super_column.columns)
+            List<Column> columns = new ArrayList<Column>(super_column.columns.size());
+            for (org.apache.cassandra.thrift.Column column : super_column.columns)
             {
-                sc.addColumn(unthriftifySimple(column));
+                Column c = unthriftifySimple(column);
+                columns.add(c.withUpdatedName(CompositeType.build(super_column.name, c.name())));
             }
-            return sc;
+            return columns;
         }
 
-        protected IColumn unthriftifySimple(Column column)
+        protected Column unthriftifySimple(org.apache.cassandra.thrift.Column column)
         {
-            return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
+            return new Column(column.name, column.value, column.timestamp);
         }
 
-        private IColumn unthriftifyCounter(CounterColumn column)
+        private Column unthriftifyCounter(CounterColumn column)
         {
             //CounterColumns read the counterID from the System table, so need the StorageService running and access
             //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Column.
-            return new org.apache.cassandra.db.Column(column.name, ByteBufferUtil.bytes(column.value), 0);
+            return new Column(column.name, ByteBufferUtil.bytes(column.value), 0);
         }
 
-        private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn)
+        private List<Column> unthriftifySuperCounter(CounterSuperColumn super_column)
         {
-            org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(superColumn.name, subComparator);
-            for (CounterColumn column : superColumn.columns)
-                sc.addColumn(unthriftifyCounter(column));
-            return sc;
+            List<Column> columns = new ArrayList<Column>(super_column.columns.size());
+            for (CounterColumn column : super_column.columns)
+            {
+                Column c = unthriftifyCounter(column);
+                columns.add(c.withUpdatedName(CompositeType.build(super_column.name, c.name())));
+            }
+            return columns;
         }
     }
 
@@ -385,7 +406,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext()
+        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
         {
             maybeInit();
             if (rows == null)
@@ -393,11 +414,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
             totalRead++;
             KeySlice ks = rows.get(i++);
-            SortedMap<ByteBuffer, IColumn> map = new TreeMap<ByteBuffer, IColumn>(comparator);
+            SortedMap<ByteBuffer, Column> map = new TreeMap<ByteBuffer, Column>(comparator);
             for (ColumnOrSuperColumn cosc : ks.columns)
             {
-                IColumn column = unthriftify(cosc);
-                map.put(column.name(), column);
+                List<Column> columns = unthriftify(cosc);
+                for (Column column : columns)
+                    map.put(column.name(), column);
             }
             return Pair.create(ks.key, map);
         }
@@ -405,7 +427,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
 
     private class WideRowIterator extends RowIterator
     {
-        private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> wideColumns;
+        private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns;
         private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
         private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
@@ -454,13 +476,13 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext()
+        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
         {
             maybeInit();
             if (rows == null)
                 return endOfData();
 
-            Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next = wideColumns.next();
+            Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next();
             lastColumn = next.right.values().iterator().next().name();
 
             maybeIncreaseRowCounter(next);
@@ -472,7 +494,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
          * Increases the row counter only if we really moved to the next row.
          * @param next just fetched row slice
          */
-        private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next)
+        private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next)
         {
             ByteBuffer currentKey = next.left;
             if (!currentKey.equals(lastCountedKey))
@@ -482,7 +504,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>>
+        private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>>
         {
             private final Iterator<KeySlice> rows;
             private Iterator<ColumnOrSuperColumn> columns;
@@ -503,16 +525,27 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
                 columns = currentRow.columns.iterator();
             }
 
-            protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext()
+            protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext()
             {
                 while (true)
                 {
                     if (columns.hasNext())
                     {
                         ColumnOrSuperColumn cosc = columns.next();
-                        IColumn column = unthriftify(cosc);
-                        ImmutableSortedMap<ByteBuffer, IColumn> map = ImmutableSortedMap.of(column.name(), column);
-                        return Pair.<ByteBuffer, SortedMap<ByteBuffer, IColumn>>create(currentRow.key, map);
+                        SortedMap<ByteBuffer, Column> map;
+                        List<Column> columns = unthriftify(cosc);
+                        if (columns.size() == 1)
+                        {
+                            map = ImmutableSortedMap.of(columns.get(0).name(), columns.get(0));
+                        }
+                        else
+                        {
+                            assert isSuper;
+                            map = new TreeMap<ByteBuffer, Column>(CompositeType.getInstance(comparator, subComparator));
+                            for (Column column : columns)
+                                map.put(column.name(), column);
+                        }
+                        return Pair.<ByteBuffer, SortedMap<ByteBuffer, Column>>create(currentRow.key, map);
                     }
 
                     if (!rows.hasNext())
@@ -529,7 +562,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
     // to the old. Thus, expect a small performance hit.
     // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
     // and ColumnFamilyRecordReader don't support them, it should be fine for now.
-    public boolean next(ByteBuffer key, SortedMap<ByteBuffer, IColumn> value) throws IOException
+    public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException
     {
         if (this.nextKeyValue())
         {
@@ -550,9 +583,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         return ByteBuffer.wrap(new byte[this.keyBufferSize]);
     }
 
-    public SortedMap<ByteBuffer, IColumn> createValue()
+    public SortedMap<ByteBuffer, Column> createValue()
     {
-        return new TreeMap<ByteBuffer, IColumn>();
+        return new TreeMap<ByteBuffer, Column>();
     }
 
     public long getPos() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 7c459b5..91174f3 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
 import org.apache.cassandra.hadoop.*;
@@ -96,7 +95,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private String storeSignature;
 
     private Configuration conf;
-    private RecordReader<ByteBuffer, Map<ByteBuffer, IColumn>> reader;
+    private RecordReader<ByteBuffer, Map<ByteBuffer, Column>> reader;
     private RecordWriter<ByteBuffer, List<Mutation>> writer;
     private String inputFormatClass;
     private String outputFormatClass;
@@ -105,7 +104,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private boolean usePartitionFilter = false;
     // wide row hacks
     private ByteBuffer lastKey;
-    private Map<ByteBuffer,IColumn> lastRow;
+    private Map<ByteBuffer,Column> lastRow;
     private boolean hasNext = true;
 
     public CassandraStorage()
@@ -147,7 +146,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                             key = (ByteBuffer)reader.getCurrentKey();
                             tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
                         }
-                        for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+                        for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
@@ -171,7 +170,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                 {
                     // read too much, hold on to it for next time
                     lastKey = (ByteBuffer)reader.getCurrentKey();
-                    lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                    lastRow = (SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
                     // but return what we have so far
                     tuple.append(bag);
                     return tuple;
@@ -182,28 +181,28 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                     if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
                     {
                         tuple.append(new DataByteArray(lastKey.array(), lastKey.position()+lastKey.arrayOffset(), lastKey.limit()+lastKey.arrayOffset()));
-                        for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+                        for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
                         tuple.append(bag);
                         lastKey = key;
-                        lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                        lastRow = (SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
                         return tuple;
                     }
                     tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
                 }
-                SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                SortedMap<ByteBuffer,Column> row = (SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
                 if (lastRow != null) // prepend what was read last time
                 {
-                    for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+                    for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
                     {
                         bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                     }
                     lastKey = null;
                     lastRow = null;
                 }
-                for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
+                for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
                 {
                     bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                 }
@@ -228,7 +227,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
 
             CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = reader.getCurrentKey();
-            Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
+            Map<ByteBuffer, Column> cf = reader.getCurrentValue();
             assert key != null && cf != null;
 
             // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
@@ -253,7 +252,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                 added.put(cdef.name, true);
             }
             // now add all the other columns
-            for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
+            for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
             {
                 if (!added.containsKey(entry.getKey()))
                     bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
@@ -307,7 +306,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         return tuple;
     }
 
-    private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
+    private Tuple columnToTuple(Column col, CfDef cfDef, AbstractType comparator) throws IOException
     {
         Tuple pair = TupleFactory.getInstance().newTuple(2);
 
@@ -319,27 +318,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         {
             setTupleValue(pair, 0, comparator.compose(col.name()));
         }
-        if (col instanceof Column)
-        {
-            // standard
-            List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
-            Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 
-            if (validators.get(col.name()) == null)
-                setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
-            else
-                setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
-            return pair;
-        }
-        else
-        {
-            // super
-            ArrayList<Tuple> subcols = new ArrayList<Tuple>();
-            for (IColumn subcol : col.getSubColumns())
-                subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
+        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 
-            pair.set(1, new DefaultDataBag(subcols));
-        }
+        if (validators.get(col.name()) == null)
+            setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
+        else
+            setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
         return pair;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/IColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/IColumnSerializer.java b/src/java/org/apache/cassandra/io/IColumnSerializer.java
deleted file mode 100644
index 5802140..0000000
--- a/src/java/org/apache/cassandra/io/IColumnSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import org.apache.cassandra.db.IColumn;
-
-public interface IColumnSerializer extends ISerializer<IColumn>
-{
-    /**
-     * Flag affecting deserialization behavior.
-     *  - LOCAL: for deserialization of local data (Expired columns are
-     *      converted to tombstones (to gain disk space)).
-     *  - FROM_REMOTE: for deserialization of data received from remote hosts
-     *      (Expired columns are converted to tombstone and counters have
-     *      their delta cleared)
-     *  - PRESERVE_SIZE: used when no transformation must be performed, i.e,
-     *      when we must ensure that deserializing and reserializing the
-     *      result yield the exact same bytes. Streaming uses this.
-     */
-    public static enum Flag
-    {
-        LOCAL, FROM_REMOTE, PRESERVE_SIZE;
-    }
-
-    public IColumn deserialize(DataInput in, Flag flag, int expireBefore) throws IOException;
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f3d097f..63a6071 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.Pair;
@@ -38,7 +39,7 @@ public abstract class AbstractSSTableSimpleWriter
     protected final CFMetaData metadata;
     protected DecoratedKey currentKey;
     protected ColumnFamily columnFamily;
-    protected SuperColumn currentSuperColumn;
+    protected ByteBuffer currentSuperColumn;
     protected final CounterId counterid = CounterId.generate();
 
     public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner)
@@ -102,20 +103,22 @@ public abstract class AbstractSSTableSimpleWriter
      */
     public void newSuperColumn(ByteBuffer name)
     {
-        if (!columnFamily.isSuper())
+        if (!columnFamily.metadata().isSuper())
             throw new IllegalStateException("Cannot add a super column to a standard column family");
 
-        currentSuperColumn = new SuperColumn(name, metadata.subcolumnComparator);
-        columnFamily.addColumn(currentSuperColumn);
+        currentSuperColumn = name;
     }
 
-    private void addColumn(IColumn column)
+    private void addColumn(Column column)
     {
-        if (columnFamily.isSuper() && currentSuperColumn == null)
-            throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started.");
+        if (columnFamily.metadata().isSuper())
+        {
+            if (currentSuperColumn == null)
+                throw new IllegalStateException("Trying to add a column to a super column family, but no super column has been started.");
 
-        IColumnContainer container = columnFamily.isSuper() ? currentSuperColumn : columnFamily;
-        container.addColumn(column);
+            column = column.withUpdatedName(CompositeType.build(currentSuperColumn, column.name()));
+        }
+        columnFamily.addColumn(column);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index cf1907e..c96a336 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -47,7 +47,7 @@ public class Descriptor
     public static class Version
     {
         // This needs to be at the begining for initialization sake
-        private static final String current_version = "ib";
+        private static final String current_version = "ic";
 
         public static final Version LEGACY = new Version("a"); // "pre-history"
         // b (0.7.0): added version to sstable filenames
@@ -66,6 +66,10 @@ public class Descriptor
         //             records estimated histogram of deletion times in tombstones
         //             bloom filter (keys and columns) upgraded to Murmur3
         // ib (1.2.1): tracks min client timestamp in metadata component
+        // ja (1.3.0): super columns are serialized as composites
+        //             (note that there is no real format change, this is mostly a marker to know if we should expect super
+        //             columns or not. We do need a major version bump however, because we should not allow streaming of
+        //             super columns into this new format)
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -85,6 +89,7 @@ public class Descriptor
         public final boolean hasPromotedIndexes;
         public final FilterFactory.Type filterType;
         public final boolean hasAncestors;
+        public final boolean hasSuperColumns;
 
         public Version(String version)
         {
@@ -108,6 +113,7 @@ public class Descriptor
                 filterType = FilterFactory.Type.MURMUR2;
             else
                 filterType = FilterFactory.Type.MURMUR3;
+            hasSuperColumns = version.compareTo("ib") < 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index d9aabfb..d2839c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
+import java.util.Iterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,7 +27,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.BytesReadTracker;
 
@@ -38,13 +38,13 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
     private final DataInput input;
     private final long dataStart;
     public final long dataSize;
-    public final IColumnSerializer.Flag flag;
+    public final ColumnSerializer.Flag flag;
 
     private final ColumnFamily columnFamily;
     private final int columnCount;
     private final long columnPosition;
 
-    private final OnDiskAtom.Serializer atomSerializer;
+    private final Iterator<OnDiskAtom> atomIterator;
     private final Descriptor.Version dataVersion;
 
     private final BytesReadTracker inputWithTracker; // tracks bytes read
@@ -80,11 +80,11 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
      */
     public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize, boolean checkData)
     {
-        this(sstable.metadata, file, file.getPath(), key, dataStart, dataSize, checkData, sstable, IColumnSerializer.Flag.LOCAL);
+        this(sstable.metadata, file, file.getPath(), key, dataStart, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
     }
 
     // Must only be used against current file format
-    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, String filename, DecoratedKey key, long dataStart, long dataSize, IColumnSerializer.Flag flag)
+    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, String filename, DecoratedKey key, long dataStart, long dataSize, ColumnSerializer.Flag flag)
     {
         this(metadata, file, filename, key, dataStart, dataSize, false, null, flag);
     }
@@ -99,7 +99,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
                                     long dataSize,
                                     boolean checkData,
                                     SSTableReader sstable,
-                                    IColumnSerializer.Flag flag)
+                                    ColumnSerializer.Flag flag)
     {
         assert !checkData || (sstable != null);
         this.input = input;
@@ -157,8 +157,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
             }
             columnFamily = ColumnFamily.create(metadata);
             columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(inputWithTracker, dataVersion));
-            atomSerializer = columnFamily.getOnDiskSerializer();
+
             columnCount = inputWithTracker.readInt();
+            atomIterator = columnFamily.metadata().getOnDiskIterator(inputWithTracker, columnCount, dataVersion);
             columnPosition = dataStart + inputWithTracker.getBytesRead();
         }
         catch (IOException e)
@@ -188,14 +189,17 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
     {
         try
         {
-            OnDiskAtom atom = atomSerializer.deserializeFromSSTable(inputWithTracker, flag, expireBefore, dataVersion);
+            OnDiskAtom atom = atomIterator.next();
             if (validateColumns)
                 atom.validateFields(columnFamily.metadata());
             return atom;
         }
-        catch (IOException e)
+        catch (IOError e)
         {
-            throw new CorruptSSTableException(e, filename);
+            if (e.getCause() instanceof IOException)
+                throw new CorruptSSTableException((IOException)e.getCause(), filename);
+            else
+                throw e;
         }
         catch (MarshalException me)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 771f18f..10af96d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
@@ -241,28 +240,14 @@ public class SSTableWriter extends SSTable
         cf.delete(deletionInfo);
 
         ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, columnCount, dataFile.stream);
-        OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer();
+        OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer();
         for (int i = 0; i < columnCount; i++)
         {
             // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
             // data size received, so we must reserialize the exact same data
-            OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, IColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
+            OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
             if (atom instanceof CounterColumn)
-            {
                 atom = ((CounterColumn) atom).markDeltaToBeCleared();
-            }
-            else if (atom instanceof SuperColumn)
-            {
-                SuperColumn sc = (SuperColumn) atom;
-                for (IColumn subColumn : sc.getSubColumns())
-                {
-                    if (subColumn instanceof CounterColumn)
-                    {
-                        IColumn marked = ((CounterColumn) subColumn).markDeltaToBeCleared();
-                        sc.replace(subColumn, marked);
-                    }
-                }
-            }
 
             int deletionTime = atom.getLocalDeletionTime();
             if (deletionTime < Integer.MAX_VALUE)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java b/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
deleted file mode 100644
index e01fd91..0000000
--- a/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.IOError;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.Map.Entry;
-
-import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.io.IColumnSerializer;
-
-/**
- * Facade over a DataInput that contains IColumns in sorted order.
- * We use this because passing a SortedMap to the ConcurrentSkipListMap constructor is the only way
- * to invoke its private buildFromSorted method and avoid worst-case behavior of CSLM.put.
- */
-public class ColumnSortedMap implements SortedMap<ByteBuffer, IColumn>
-{
-    private final ColumnSerializer serializer;
-    private final DataInput dis;
-    private final Comparator<ByteBuffer> comparator;
-    private final int length;
-    private final IColumnSerializer.Flag flag;
-    private final int expireBefore;
-
-    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
-    {
-        this.comparator = comparator;
-        this.serializer = serializer;
-        this.dis = dis;
-        this.length = length;
-        this.flag = flag;
-        this.expireBefore = expireBefore;
-    }
-
-    public int size()
-    {
-        return length;
-    }
-
-    public boolean isEmpty()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean containsKey(Object key)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean containsValue(Object value)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public IColumn get(Object key)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public IColumn put(ByteBuffer key, IColumn value)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public IColumn remove(Object key)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void putAll(Map<? extends ByteBuffer, ? extends IColumn> m)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void clear()
-    {
-
-    }
-
-    public Comparator<? super ByteBuffer> comparator()
-    {
-        return comparator;
-    }
-
-    public SortedMap<ByteBuffer, IColumn> subMap(ByteBuffer fromKey, ByteBuffer toKey)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public SortedMap<ByteBuffer, IColumn> headMap(ByteBuffer toKey)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public SortedMap<ByteBuffer, IColumn> tailMap(ByteBuffer fromKey)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public ByteBuffer firstKey()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public ByteBuffer lastKey()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public Set<ByteBuffer> keySet()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public Collection<IColumn> values()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
-    {
-        return new ColumnSet(serializer, dis, length, flag, expireBefore);
-    }
-}
-
-class ColumnSet implements Set<Map.Entry<ByteBuffer, IColumn>>
-{
-    private final ColumnSerializer serializer;
-    private final DataInput dis;
-    private final int length;
-    private final IColumnSerializer.Flag flag;
-    private final int expireBefore;
-
-    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
-    {
-        this.serializer = serializer;
-        this.dis = dis;
-        this.length = length;
-        this.flag = flag;
-        this.expireBefore = expireBefore;
-    }
-
-    public int size()
-    {
-        return length;
-    }
-
-    public boolean isEmpty()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean contains(Object o)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public Iterator<Entry<ByteBuffer, IColumn>> iterator()
-    {
-        return new ColumnIterator(serializer, dis, length, flag, expireBefore);
-    }
-
-    public Object[] toArray()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public <T> T[] toArray(T[] a)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean add(Entry<ByteBuffer, IColumn> e)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean remove(Object o)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean containsAll(Collection<?> c)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean addAll(Collection<? extends Entry<ByteBuffer, IColumn>> c)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean retainAll(Collection<?> c)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean removeAll(Collection<?> c)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void clear()
-    {
-    }
-}
-
-class ColumnIterator implements Iterator<Map.Entry<ByteBuffer, IColumn>>
-{
-    private final ColumnSerializer serializer;
-    private final DataInput dis;
-    private final int length;
-    private final IColumnSerializer.Flag flag;
-    private int count = 0;
-    private final int expireBefore;
-
-    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
-    {
-        this.dis = dis;
-        this.serializer = serializer;
-        this.length = length;
-        this.flag = flag;
-        this.expireBefore = expireBefore;
-    }
-
-    private IColumn deserializeNext()
-    {
-        try
-        {
-            count++;
-            return serializer.deserialize(dis, flag, expireBefore);
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e); // can't throw more detailed error. can't rethrow IOException - Iterator interface next().
-        }
-    }
-
-    public boolean hasNext()
-    {
-        return count < length;
-    }
-
-    public Entry<ByteBuffer, IColumn> next()
-    {
-        if (!hasNext())
-        {
-            throw new IllegalStateException("end of column iterator");
-        }
-
-        final IColumn column = deserializeNext();
-        return new Entry<ByteBuffer, IColumn>()
-        {
-            public IColumn setValue(IColumn value)
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public IColumn getValue()
-            {
-                return column;
-            }
-
-            public ByteBuffer getKey()
-            {
-                return column.name();
-            }
-        };
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/io/util/IIterableColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/IIterableColumns.java b/src/java/org/apache/cassandra/io/util/IIterableColumns.java
index 030bef3..68c5645 100644
--- a/src/java/org/apache/cassandra/io/util/IIterableColumns.java
+++ b/src/java/org/apache/cassandra/io/util/IIterableColumns.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.io.util;
 
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.marshal.AbstractType;
 
-public interface IIterableColumns extends Iterable<IColumn>
+public interface IIterableColumns extends Iterable<Column>
 {
     public int getEstimatedColumnCount();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 98495be..84438b9 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -73,7 +73,8 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_11  = 4;
     public static final int VERSION_117 = 5;
     public static final int VERSION_12  = 6;
-    public static final int current_version = VERSION_12;
+    public static final int VERSION_20  = 7;
+    public static final int current_version = VERSION_20;
 
     /**
      * we preface every message with this number so the recipient can validate the sender is sane

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 7e3aca8..b0aa693 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableReader.Operator;
@@ -340,7 +339,7 @@ public class CacheService implements CacheServiceMBean
                 public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
                 {
                     DecoratedKey key = cfs.partitioner.decorateKey(buffer);
-                    ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(cfs.name)), Integer.MIN_VALUE, true);
+                    ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name), Integer.MIN_VALUE, true);
                     return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
                 }
             });
@@ -351,7 +350,7 @@ public class CacheService implements CacheServiceMBean
             for (ByteBuffer key : buffers)
             {
                 DecoratedKey dk = cfs.partitioner.decorateKey(key);
-                ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, new QueryPath(cfs.name)), Integer.MIN_VALUE, true);
+                ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name), Integer.MIN_VALUE, true);
                 rowCache.put(new RowCacheKey(cfs.metadata.cfId, dk), data);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 5bd9876..6d03009 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -43,7 +43,7 @@ public class IndexScanVerbHandler implements IVerbHandler<IndexScanCommand>
             List<Row> rows = cfs.search(command.index_clause.expressions,
                                         command.range,
                                         command.index_clause.count,
-                                        ThriftValidation.asIFilter(command.predicate, cfs.getComparator()));
+                                        ThriftValidation.asIFilter(command.predicate, cfs.metadata, null));
             RangeSliceReply reply = new RangeSliceReply(rows);
             Tracing.trace("Enqueuing response to {}", message.from);
             MessagingService.instance().sendReply(reply.createMessage(), id, message.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a005df3/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 43fa8f7..943e6ae 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.*;
@@ -391,7 +390,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
         DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
         Table defs = Table.open(Table.SYSTEM_KS);
         ColumnFamilyStore cfStore = defs.getColumnFamilyStore(DefsTable.OLD_SCHEMA_CF);
-        QueryFilter filter = QueryFilter.getNamesFilter(dkey, new QueryPath(DefsTable.OLD_SCHEMA_CF), LAST_MIGRATION_KEY);
+        QueryFilter filter = QueryFilter.getNamesFilter(dkey, DefsTable.OLD_SCHEMA_CF, LAST_MIGRATION_KEY);
         ColumnFamily cf = cfStore.getColumnFamily(filter);
         if (cf == null || cf.getColumnNames().size() == 0)
             return null;