You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:32 UTC

[03/15] cassandra git commit: Simplify some 8099's implementations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 4399a80..1aadd78 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -108,7 +108,7 @@ public class CassandraServer implements Cassandra.Iface
 
     public List<ColumnOrSuperColumn> thriftifyColumns(CFMetaData metadata, Iterator<LegacyLayout.LegacyCell> cells)
     {
-        ArrayList<ColumnOrSuperColumn> thriftColumns = new ArrayList<ColumnOrSuperColumn>();
+        ArrayList<ColumnOrSuperColumn> thriftColumns = new ArrayList<>();
         while (cells.hasNext())
         {
             LegacyLayout.LegacyCell cell = cells.next();
@@ -142,7 +142,7 @@ public class CassandraServer implements Cassandra.Iface
 
     private List<Column> thriftifyColumnsAsColumns(CFMetaData metadata, Iterator<LegacyLayout.LegacyCell> cells)
     {
-        List<Column> thriftColumns = new ArrayList<Column>();
+        List<Column> thriftColumns = new ArrayList<>();
         while (cells.hasNext())
             thriftColumns.add(thriftifySubColumn(metadata, cells.next()));
         return thriftColumns;
@@ -162,7 +162,7 @@ public class CassandraServer implements Cassandra.Iface
     {
         if (subcolumnsOnly)
         {
-            ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>();
+            ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<>();
             while (cells.hasNext())
             {
                 LegacyLayout.LegacyCell cell = cells.next();
@@ -185,7 +185,7 @@ public class CassandraServer implements Cassandra.Iface
 
     private List<ColumnOrSuperColumn> thriftifySuperColumns(Iterator<LegacyLayout.LegacyCell> cells, boolean reversed)
     {
-        ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>();
+        ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<>();
         SuperColumn current = null;
         while (cells.hasNext())
         {
@@ -198,7 +198,7 @@ public class CassandraServer implements Cassandra.Iface
                 if (current != null && reversed)
                     Collections.reverse(current.columns);
 
-                current = new SuperColumn(scName, new ArrayList<Column>());
+                current = new SuperColumn(scName, new ArrayList<>());
                 thriftSuperColumns.add(new ColumnOrSuperColumn().setSuper_column(current));
             }
             current.getColumns().add(thriftifySubColumn(cell, cell.name.superColumnSubName()));
@@ -212,7 +212,7 @@ public class CassandraServer implements Cassandra.Iface
 
     private List<ColumnOrSuperColumn> thriftifyCounterSuperColumns(CFMetaData metadata, Iterator<LegacyLayout.LegacyCell> cells, boolean reversed)
     {
-        ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>();
+        ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<>();
         CounterSuperColumn current = null;
         while (cells.hasNext())
         {
@@ -225,7 +225,7 @@ public class CassandraServer implements Cassandra.Iface
                 if (current != null && reversed)
                     Collections.reverse(current.columns);
 
-                current = new CounterSuperColumn(scName, new ArrayList<CounterColumn>());
+                current = new CounterSuperColumn(scName, new ArrayList<>());
                 thriftSuperColumns.add(new ColumnOrSuperColumn().setCounter_super_column(current));
             }
             current.getColumns().add(thriftifySubCounter(metadata, cell).setName(cell.name.superColumnSubName()));
@@ -262,7 +262,7 @@ public class CassandraServer implements Cassandra.Iface
     {
         try (PartitionIterator results = read(commands, consistency_level, cState))
         {
-            Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<ByteBuffer, List<ColumnOrSuperColumn>>();
+            Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<>();
             while (results.hasNext())
             {
                 try (RowIterator iter = results.next())
@@ -360,7 +360,7 @@ public class CassandraServer implements Cassandra.Iface
     private ClusteringIndexFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SliceRange range)
     {
         if (metadata.isSuper() && parent.isSetSuper_column())
-            return new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(new SimpleClustering(parent.bufferForSuper_column()), metadata.comparator), range.reversed);
+            return new ClusteringIndexNamesFilter(FBUtilities.singleton(new Clustering(parent.bufferForSuper_column()), metadata.comparator), range.reversed);
         else
             return new ClusteringIndexSliceFilter(makeSlices(metadata, range), range.reversed);
     }
@@ -384,13 +384,13 @@ public class CassandraServer implements Cassandra.Iface
                 {
                     if (parent.isSetSuper_column())
                     {
-                        return new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(new SimpleClustering(parent.bufferForSuper_column()), metadata.comparator), false);
+                        return new ClusteringIndexNamesFilter(FBUtilities.singleton(new Clustering(parent.bufferForSuper_column()), metadata.comparator), false);
                     }
                     else
                     {
                         NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator);
                         for (ByteBuffer bb : predicate.column_names)
-                            clusterings.add(new SimpleClustering(bb));
+                            clusterings.add(new Clustering(bb));
                         return new ClusteringIndexNamesFilter(clusterings, false);
                     }
                 }
@@ -455,7 +455,7 @@ public class CassandraServer implements Cassandra.Iface
             // We only want to include the static columns that are selected by the slices
             for (ColumnDefinition def : columns.statics)
             {
-                if (slices.selects(new SimpleClustering(def.name.bytes)))
+                if (slices.selects(new Clustering(def.name.bytes)))
                     builder.add(def);
             }
             columns = builder.build();
@@ -608,21 +608,20 @@ public class CassandraServer implements Cassandra.Iface
                     builder.select(dynamicDef, CellPath.create(column_path.column));
                     columns = builder.build();
                 }
-                filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(new SimpleClustering(column_path.super_column), metadata.comparator),
+                filter = new ClusteringIndexNamesFilter(FBUtilities.singleton(new Clustering(column_path.super_column), metadata.comparator),
                                                   false);
             }
             else
             {
                 LegacyLayout.LegacyCellName cellname = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column);
                 columns = ColumnFilter.selection(PartitionColumns.of(cellname.column));
-                filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(cellname.clustering, metadata.comparator), false);
+                filter = new ClusteringIndexNamesFilter(FBUtilities.singleton(cellname.clustering, metadata.comparator), false);
             }
 
-            long now = System.currentTimeMillis();
             DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
             SinglePartitionReadCommand<?> command = SinglePartitionReadCommand.create(true, metadata, FBUtilities.nowInSeconds(), columns, RowFilter.NONE, DataLimits.NONE, dk, filter);
 
-            try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.<SinglePartitionReadCommand<?>>asList(command), consistencyLevel, cState), command))
+            try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.asList(command), consistencyLevel, cState), command))
             {
                 if (!result.hasNext())
                     throw new NotFoundException();
@@ -768,7 +767,7 @@ public class CassandraServer implements Cassandra.Iface
             String keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
 
-            Map<ByteBuffer, Integer> counts = new HashMap<ByteBuffer, Integer>();
+            Map<ByteBuffer, Integer> counts = new HashMap<>();
             Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace,
                                                                                                  keys,
                                                                                                  column_parent,
@@ -791,6 +790,14 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
+    private Cell cellFromColumn(CFMetaData metadata, LegacyLayout.LegacyCellName name, Column column)
+    {
+        CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
+        return column.ttl == 0
+             ? BufferCell.live(metadata, name.column, column.timestamp, column.value, path)
+             : BufferCell.expiring(name.column, column.timestamp, column.ttl, FBUtilities.nowInSeconds(), column.value, path);
+    }
+
     private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
     throws RequestValidationException, UnavailableException, TimedOutException
     {
@@ -806,22 +813,17 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw new org.apache.cassandra.exceptions.InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);
         }
-        ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
+        ThriftValidation.validateColumnNames(metadata, column_parent, Collections.singletonList(column.name));
         ThriftValidation.validateColumnData(metadata, column_parent.super_column, column);
 
         org.apache.cassandra.db.Mutation mutation;
         try
         {
-            LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
-
             DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-            PartitionUpdate update = new PartitionUpdate(metadata, dk, PartitionColumns.of(name.column), 1);
 
-            Row.Writer writer = name.column.isStatic() ? update.staticWriter() : update.writer();
-            name.clustering.writeTo(writer);
-            CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
-            writer.writeCell(name.column, false, column.value, SimpleLivenessInfo.forUpdate(column.timestamp, column.ttl, FBUtilities.nowInSeconds(), metadata), path);
-            writer.endOfRow();
+            LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
+            Cell cell = cellFromColumn(metadata, name, column);
+            PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
 
             mutation = new org.apache.cassandra.db.Mutation(update);
         }
@@ -829,7 +831,7 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage());
         }
-        doInsert(consistency_level, Arrays.asList(mutation));
+        doInsert(consistency_level, Collections.singletonList(mutation));
     }
 
     public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
@@ -896,23 +898,15 @@ public class CassandraServer implements Cassandra.Iface
             if (metadata.isSuper())
                 throw new org.apache.cassandra.exceptions.InvalidRequestException("CAS does not support supercolumns");
 
-            Iterable<ByteBuffer> names = Iterables.transform(updates, new Function<Column, ByteBuffer>()
-            {
-                public ByteBuffer apply(Column column)
-                {
-                    return column.name;
-                }
-            });
+            Iterable<ByteBuffer> names = Iterables.transform(updates, column -> column.name);
             ThriftValidation.validateColumnNames(metadata, new ColumnParent(column_family), names);
             for (Column column : updates)
                 ThriftValidation.validateColumnData(metadata, null, column);
 
-            CFMetaData cfm = Schema.instance.getCFMetaData(cState.getKeyspace(), column_family);
-
             DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
             int nowInSec = FBUtilities.nowInSeconds();
 
-            PartitionUpdate partitionUpdates = RowIterators.toUpdate(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec));
+            PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec));
 
             FilteredPartition partitionExpected = null;
             if (!expected.isEmpty())
@@ -1096,7 +1090,7 @@ public class CassandraServer implements Cassandra.Iface
                 if (metadata.isCounter())
                     ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
 
-                DeletionInfo delInfo = DeletionInfo.live();
+                MutableDeletionInfo delInfo = MutableDeletionInfo.live();
                 List<LegacyLayout.LegacyCell> cells = new ArrayList<>();
                 for (Mutation m : muts)
                 {
@@ -1113,7 +1107,7 @@ public class CassandraServer implements Cassandra.Iface
                 }
 
                 sortAndMerge(metadata, cells, nowInSec);
-                PartitionUpdate update = UnfilteredRowIterators.toUpdate(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()));
+                PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()));
 
                 org.apache.cassandra.db.Mutation mutation;
                 if (metadata.isCounter())
@@ -1173,12 +1167,12 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private void addRange(CFMetaData cfm, DeletionInfo delInfo, Slice.Bound start, Slice.Bound end, long timestamp, int nowInSec)
+    private void addRange(CFMetaData cfm, MutableDeletionInfo delInfo, Slice.Bound start, Slice.Bound end, long timestamp, int nowInSec)
     {
-        delInfo.add(new RangeTombstone(Slice.make(start, end), new SimpleDeletionTime(timestamp, nowInSec)), cfm.comparator);
+        delInfo.add(new RangeTombstone(Slice.make(start, end), new DeletionTime(timestamp, nowInSec)), cfm.comparator);
     }
 
-    private void deleteColumnOrSuperColumn(DeletionInfo delInfo, List<LegacyLayout.LegacyCell> cells, CFMetaData cfm, Deletion del, int nowInSec)
+    private void deleteColumnOrSuperColumn(MutableDeletionInfo delInfo, List<LegacyLayout.LegacyCell> cells, CFMetaData cfm, Deletion del, int nowInSec)
     throws InvalidRequestException
     {
         if (del.predicate != null && del.predicate.column_names != null)
@@ -1227,7 +1221,7 @@ public class CassandraServer implements Cassandra.Iface
             if (del.super_column != null)
                 addRange(cfm, delInfo, Slice.Bound.inclusiveStartOf(del.super_column), Slice.Bound.inclusiveEndOf(del.super_column), del.timestamp, nowInSec);
             else
-                delInfo.add(new SimpleDeletionTime(del.timestamp, nowInSec));
+                delInfo.add(new DeletionTime(del.timestamp, nowInSec));
         }
     }
 
@@ -1320,23 +1314,17 @@ public class CassandraServer implements Cassandra.Iface
         }
         else if (column_path.super_column != null && column_path.column == null)
         {
-            update = new PartitionUpdate(metadata, dk, PartitionColumns.NONE, 1);
-            Row.Writer writer = update.writer();
-            writer.writeClusteringValue(column_path.super_column);
-            writer.writeRowDeletion(new SimpleDeletionTime(timestamp, nowInSec));
-            writer.endOfRow();
+            Row row = ArrayBackedRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
+            update = PartitionUpdate.singleRowUpdate(metadata, dk, row);
         }
         else
         {
             try
             {
                 LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column);
-                update = new PartitionUpdate(metadata, dk, PartitionColumns.of(name.column), 1);
-                Row.Writer writer = name.column.isStatic() ? update.staticWriter() : update.writer();
-                name.clustering.writeTo(writer);
                 CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
-                writer.writeCell(name.column, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, SimpleLivenessInfo.forDeletion(timestamp, nowInSec), path);
-                writer.endOfRow();
+                Cell cell = BufferCell.tombstone(name.column, timestamp, nowInSec, path);
+                update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
             }
             catch (UnknownColumnException e)
             {
@@ -1347,9 +1335,9 @@ public class CassandraServer implements Cassandra.Iface
         org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
 
         if (isCommutativeOp)
-            doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
+            doInsert(consistency_level, Collections.singletonList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
         else
-            doInsert(consistency_level, Arrays.asList(mutation));
+            doInsert(consistency_level, Collections.singletonList(mutation));
     }
 
     public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
@@ -1483,7 +1471,7 @@ public class CassandraServer implements Cassandra.Iface
                 PartitionPosition end = range.end_key == null
                                 ? p.getTokenFactory().fromString(range.end_token).maxKeyBound()
                                 : PartitionPosition.ForKey.get(range.end_key, p);
-                bounds = new Bounds<PartitionPosition>(PartitionPosition.ForKey.get(range.start_key, p), end);
+                bounds = new Bounds<>(PartitionPosition.ForKey.get(range.start_key, p), end);
             }
             int nowInSec = FBUtilities.nowInSeconds();
             schedule(DatabaseDescriptor.getRangeRpcTimeout());
@@ -1569,7 +1557,7 @@ public class CassandraServer implements Cassandra.Iface
                 PartitionPosition end = range.end_key == null
                                 ? p.getTokenFactory().fromString(range.end_token).maxKeyBound()
                                 : PartitionPosition.ForKey.get(range.end_key, p);
-                bounds = new Bounds<PartitionPosition>(PartitionPosition.ForKey.get(range.start_key, p), end);
+                bounds = new Bounds<>(PartitionPosition.ForKey.get(range.start_key, p), end);
             }
 
             if (range.row_filter != null && !range.row_filter.isEmpty())
@@ -1582,7 +1570,7 @@ public class CassandraServer implements Cassandra.Iface
                 ClusteringIndexFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
                 DataLimits limits = getLimits(range.count, true, Integer.MAX_VALUE);
                 Clustering pageFrom = metadata.isSuper()
-                                    ? new SimpleClustering(start_column)
+                                    ? new Clustering(start_column)
                                     : LegacyLayout.decodeCellName(metadata, start_column).clustering;
                 PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
                                                                               true,
@@ -1624,7 +1612,7 @@ public class CassandraServer implements Cassandra.Iface
     {
         try (PartitionIterator iter = results)
         {
-            List<KeySlice> keySlices = new ArrayList<KeySlice>();
+            List<KeySlice> keySlices = new ArrayList<>();
             while (iter.hasNext())
             {
                 try (RowIterator partition = iter.next())
@@ -1667,8 +1655,8 @@ public class CassandraServer implements Cassandra.Iface
             consistencyLevel.validateForRead(keyspace);
 
             IPartitioner p = StorageService.getPartitioner();
-            AbstractBounds<PartitionPosition> bounds = new Bounds<PartitionPosition>(PartitionPosition.ForKey.get(index_clause.start_key, p),
-                                                                         p.getMinimumToken().minKeyBound());
+            AbstractBounds<PartitionPosition> bounds = new Bounds<>(PartitionPosition.ForKey.get(index_clause.start_key, p),
+                                                                    p.getMinimumToken().minKeyBound());
 
             int nowInSec = FBUtilities.nowInSeconds();
             ColumnFilter columns = makeColumnFilter(metadata, column_parent, column_predicate);
@@ -1706,7 +1694,7 @@ public class CassandraServer implements Cassandra.Iface
         validateLogin();
 
         Set<String> keyspaces = Schema.instance.getKeyspaces();
-        List<KsDef> ksset = new ArrayList<KsDef>(keyspaces.size());
+        List<KsDef> ksset = new ArrayList<>(keyspaces.size());
         for (String ks : keyspaces)
         {
             try
@@ -1778,7 +1766,7 @@ public class CassandraServer implements Cassandra.Iface
     throws TException, InvalidRequestException
     {
         List<CfSplit> splits = describe_splits_ex(cfName, start_token, end_token, keys_per_split);
-        List<String> result = new ArrayList<String>(splits.size() + 1);
+        List<String> result = new ArrayList<>(splits.size() + 1);
 
         result.add(splits.get(0).getStart_token());
         for (CfSplit cfSplit : splits)
@@ -1793,10 +1781,10 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();
-            Range<Token> tr = new Range<Token>(tf.fromString(start_token), tf.fromString(end_token));
+            Range<Token> tr = new Range<>(tf.fromString(start_token), tf.fromString(end_token));
             List<Pair<Range<Token>, Long>> splits =
                     StorageService.instance.getSplits(state().getKeyspace(), cfName, tr, keys_per_split);
-            List<CfSplit> result = new ArrayList<CfSplit>(splits.size());
+            List<CfSplit> result = new ArrayList<>(splits.size());
             for (Pair<Range<Token>, Long> split : splits)
                 result.add(new CfSplit(split.left.left.toString(), split.left.right.toString(), split.right));
             return result;
@@ -2115,17 +2103,13 @@ public class CassandraServer implements Cassandra.Iface
             {
                 LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
                 DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-                PartitionUpdate update = new PartitionUpdate(metadata, dk, PartitionColumns.of(name.column), 1);
-
-                Row.Writer writer = name.column.isStatic() ? update.staticWriter() : update.writer();
-                name.clustering.writeTo(writer);
-                CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
 
                 // See UpdateParameters.addCounter() for more details on this
                 ByteBuffer value = CounterContext.instance().createLocal(column.value);
+                CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
+                Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(), value, path);
 
-                writer.writeCell(name.column, true, value, SimpleLivenessInfo.forUpdate(FBUtilities.timestampMicros(), LivenessInfo.NO_TTL, FBUtilities.nowInSeconds(), metadata), path);
-                writer.endOfRow();
+                PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
 
                 org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
                 doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
@@ -2489,7 +2473,7 @@ public class CassandraServer implements Cassandra.Iface
             // Gather the clustering for the expected values and query those.
             BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(metadata.comparator);
             for (Row row : expected)
-                clusterings.add(row.clustering().takeAlias());
+                clusterings.add(row.clustering());
             PartitionColumns columns = expected.staticRow().isEmpty()
                                      ? metadata.partitionColumns().withoutStatics()
                                      : metadata.partitionColumns();
@@ -2511,11 +2495,29 @@ public class CassandraServer implements Cassandra.Iface
                 if (c == null)
                     return false;
 
-                for (Cell ce : e)
+                SearchIterator<ColumnDefinition, ColumnData> searchIter = c.searchIterator();
+                for (ColumnData expectedData : e)
                 {
-                    Cell cc = c.getCell(ce.column());
-                    if (cc == null || !cc.value().equals(ce.value()))
+                    ColumnDefinition column = expectedData.column();
+                    ColumnData currentData = searchIter.next(column);
+                    if (currentData == null)
                         return false;
+
+                    if (column.isSimple())
+                    {
+                        if (!((Cell)currentData).value().equals(((Cell)expectedData).value()))
+                            return false;
+                    }
+                    else
+                    {
+                        ComplexColumnData currentComplexData = (ComplexColumnData)currentData;
+                        for (Cell expectedCell : (ComplexColumnData)expectedData)
+                        {
+                            Cell currentCell = currentComplexData.getCell(expectedCell.path());
+                            if (currentCell == null || !currentCell.value().equals(expectedCell.value()))
+                                return false;
+                        }
+                    }
                 }
             }
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index d99217d..c9e5062 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -350,7 +350,7 @@ public class ThriftConversion
         {
             // SuperColumn tables: we use a special map to hold dynamic values within a given super column
             defs.add(ColumnDefinition.clusteringKeyDef(ks, cf, names.defaultClusteringName(), comparator, 0));
-            defs.add(ColumnDefinition.regularDef(ks, cf, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true), null));
+            defs.add(ColumnDefinition.regularDef(ks, cf, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true)));
         }
         else
         {
@@ -361,7 +361,7 @@ public class ThriftConversion
             for (int i = 0; i < subTypes.size(); i++)
                 defs.add(ColumnDefinition.clusteringKeyDef(ks, cf, names.defaultClusteringName(), subTypes.get(i), i));
 
-            defs.add(ColumnDefinition.regularDef(ks, cf, names.defaultCompactValueName(), defaultValidator, null));
+            defs.add(ColumnDefinition.regularDef(ks, cf, names.defaultCompactValueName(), defaultValidator));
         }
     }
 
@@ -454,7 +454,6 @@ public class ThriftConversion
     {
         boolean isSuper = thriftSubcomparator != null;
         // For super columns, the componentIndex is 1 because the ColumnDefinition applies to the column component.
-        Integer componentIndex = isSuper ? 1 : null;
         AbstractType<?> comparator = thriftSubcomparator == null ? thriftComparator : thriftSubcomparator;
         try
         {
@@ -475,7 +474,7 @@ public class ThriftConversion
                                     thriftColumnDef.index_type == null ? null : org.apache.cassandra.config.IndexType.valueOf(thriftColumnDef.index_type.name()),
                                     thriftColumnDef.index_options,
                                     thriftColumnDef.index_name,
-                                    componentIndex,
+                                    null,
                                     kind);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
index ccb6e74..9c5a99f 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
@@ -101,10 +101,10 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
         // We initialize lazily to avoid having this iterator fetch the wrapped iterator before it's actually asked for it.
         private boolean isInit;
 
-        private Row staticRow;
-        private int i; // the index of the next column of static row to return
+        private Iterator<Cell> staticCells;
 
-        private ReusableRow nextToMerge;
+        private final Row.Builder builder;
+        private Row nextToMerge;
         private Unfiltered nextFromWrapped;
 
         private PartitionMerger(UnfilteredRowIterator results, int nowInSec)
@@ -112,15 +112,16 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             super(results);
             assert results.metadata().isStaticCompactTable();
             this.nowInSec = nowInSec;
+            this.builder = ArrayBackedRow.sortedBuilder(results.columns().regulars);
         }
 
         private void init()
         {
             assert !isInit;
-            this.staticRow = super.staticRow();
+            Row staticRow = super.staticRow();
             assert staticRow.columns().complexColumnCount() == 0;
 
-            this.nextToMerge = createReusableRow();
+            staticCells = staticRow.cells().iterator();
             updateNextToMerge();
             isInit = true;
         }
@@ -131,11 +132,6 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             return Rows.EMPTY_STATIC_ROW;
         }
 
-        private ReusableRow createReusableRow()
-        {
-            return new ReusableRow(metadata().clusteringColumns().size(), metadata().partitionColumns().regulars, true, metadata().isCounter());
-        }
-
         @Override
         public boolean hasNext()
         {
@@ -171,11 +167,9 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             if (cmp > 0)
                 return consumeNextWrapped();
 
-            // Same row, but we know the row has only a single column so just pick the more recent
+            // Same row, so merge them
             assert nextFromWrapped instanceof Row;
-            ReusableRow row = createReusableRow();
-            Rows.merge((Row)consumeNextWrapped(), consumeNextToMerge(), columns().regulars, row.writer(), nowInSec);
-            return row;
+            return Rows.merge((Row)consumeNextWrapped(), consumeNextToMerge(), nowInSec);
         }
 
         private Unfiltered consumeNextWrapped()
@@ -194,29 +188,26 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
 
         private void updateNextToMerge()
         {
-            while (i < staticRow.columns().simpleColumnCount())
+            if (!staticCells.hasNext())
             {
-                Cell cell = staticRow.getCell(staticRow.columns().getSimple(i++));
-                if (cell != null)
-                {
-                    // Given a static cell, the equivalent row uses the column name as clustering and the
-                    // value as unique cell value.
-                    Row.Writer writer = nextToMerge.writer();
-                    writer.writeClusteringValue(cell.column().name.bytes);
-                    writer.writeCell(metadata().compactValueColumn(), cell.isCounterCell(), cell.value(), cell.livenessInfo(), cell.path());
-                    writer.endOfRow();
-                    return;
-                }
+                // Nothing more to merge.
+                nextToMerge = null;
+                return;
             }
-            // Nothing more to merge.
-            nextToMerge = null;
+
+            Cell cell = staticCells.next();
+
+            // Given a static cell, the equivalent row uses the column name as clustering and the value as unique cell value.
+            builder.newRow(new Clustering(cell.column().name.bytes));
+            builder.addCell(new BufferCell(metadata().compactValueColumn(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), cell.path()));
+            nextToMerge = builder.build();
         }
     }
 
-    private static class SuperColumnsPartitionMerger extends WrappingUnfilteredRowIterator
+    private static class SuperColumnsPartitionMerger extends AlteringUnfilteredRowIterator
     {
         private final int nowInSec;
-        private final ReusableRow reusableRow;
+        private final Row.Builder builder;
         private final ColumnDefinition superColumnMapColumn;
         private final AbstractType<?> columnComparator;
 
@@ -229,30 +220,23 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             this.superColumnMapColumn = results.metadata().compactValueColumn();
             assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType;
 
-            this.reusableRow = new ReusableRow(results.metadata().clusteringColumns().size(),
-                                               Columns.of(superColumnMapColumn),
-                                               true,
-                                               results.metadata().isCounter());
+            this.builder = ArrayBackedRow.sortedBuilder(Columns.of(superColumnMapColumn));
             this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator();
         }
 
         @Override
-        public Unfiltered next()
+        protected Row computeNext(Row row)
         {
-            Unfiltered next = super.next();
-            if (next.kind() != Unfiltered.Kind.ROW)
-                return next;
-
-            Row row = (Row)next;
-            Row.Writer writer = reusableRow.writer();
-            row.clustering().writeTo(writer);
-
-            PeekingIterator<Cell> staticCells = Iterators.peekingIterator(makeStaticCellIterator(row));
+            PeekingIterator<Cell> staticCells = Iterators.peekingIterator(simpleCellsIterator(row));
             if (!staticCells.hasNext())
                 return row;
 
-            Iterator<Cell> cells = row.getCells(superColumnMapColumn);
-            PeekingIterator<Cell> dynamicCells = Iterators.peekingIterator(cells.hasNext() ? cells : Collections.<Cell>emptyIterator());
+            builder.newRow(row.clustering());
+
+            ComplexColumnData complexData = row.getComplexColumnData(superColumnMapColumn);
+            PeekingIterator<Cell> dynamicCells = Iterators.peekingIterator(complexData == null ? Collections.<Cell>emptyIterator() : complexData.iterator());
+
+            builder.addComplexDeletion(superColumnMapColumn, complexData.complexDeletion());
 
             while (staticCells.hasNext() && dynamicCells.hasNext())
             {
@@ -260,52 +244,37 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
                 Cell dynamicCell = dynamicCells.peek();
                 int cmp = columnComparator.compare(staticCell.column().name.bytes, dynamicCell.path().get(0));
                 if (cmp < 0)
-                {
-                    staticCell = staticCells.next();
-                    writer.writeCell(superColumnMapColumn, staticCell.isCounterCell(), staticCell.value(), staticCell.livenessInfo(), CellPath.create(staticCell.column().name.bytes));
-                }
+                    builder.addCell(makeDynamicCell(staticCells.next()));
                 else if (cmp > 0)
-                {
-                    dynamicCells.next().writeTo(writer);
-                }
+                    builder.addCell(dynamicCells.next());
                 else
-                {
-                    staticCell = staticCells.next();
-                    Cell toMerge = Cells.create(superColumnMapColumn,
-                                                 staticCell.isCounterCell(),
-                                                 staticCell.value(),
-                                                 staticCell.livenessInfo(),
-                                                 CellPath.create(staticCell.column().name.bytes));
-                    Cells.reconcile(toMerge, dynamicCells.next(), nowInSec).writeTo(writer);
-                }
+                    builder.addCell(Cells.reconcile(makeDynamicCell(staticCells.next()), dynamicCells.next(), nowInSec));
             }
 
             while (staticCells.hasNext())
-            {
-                Cell staticCell = staticCells.next();
-                writer.writeCell(superColumnMapColumn, staticCell.isCounterCell(), staticCell.value(), staticCell.livenessInfo(), CellPath.create(staticCell.column().name.bytes));
-            }
+                builder.addCell(makeDynamicCell(staticCells.next()));
             while (dynamicCells.hasNext())
-            {
-                dynamicCells.next().writeTo(writer);
-            }
+                builder.addCell(dynamicCells.next());
+
+            return builder.build();
+        }
 
-            writer.endOfRow();
-            return reusableRow;
+        private Cell makeDynamicCell(Cell staticCell)
+        {
+            return new BufferCell(superColumnMapColumn, staticCell.timestamp(), staticCell.ttl(), staticCell.localDeletionTime(), staticCell.value(), CellPath.create(staticCell.column().name.bytes));
         }
 
-        private static Iterator<Cell> makeStaticCellIterator(final Row row)
+        private Iterator<Cell> simpleCellsIterator(Row row)
         {
+            final Iterator<Cell> cells = row.cells().iterator();
             return new AbstractIterator<Cell>()
             {
-                private int i;
-
                 protected Cell computeNext()
                 {
-                    while (i < row.columns().simpleColumnCount())
+                    if (cells.hasNext())
                     {
-                        Cell cell = row.getCell(row.columns().getSimple(i++));
-                        if (cell != null)
+                        Cell cell = cells.next();
+                        if (cell.column().isSimple())
                             return cell;
                     }
                     return endOfData();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 65ed23c..abc2a37 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -34,6 +34,7 @@ import java.util.UUID;
 
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
@@ -290,6 +291,12 @@ public class ByteBufferUtil
         out.write(bytes);
     }
 
+    public static void writeWithVIntLength(ByteBuffer bytes, DataOutputPlus out) throws IOException
+    {
+        out.writeVInt(bytes.remaining());
+        out.write(bytes);
+    }
+
     public static void writeWithLength(byte[] bytes, DataOutput out) throws IOException
     {
         out.writeInt(bytes.length);
@@ -323,12 +330,36 @@ public class ByteBufferUtil
         return ByteBufferUtil.read(in, length);
     }
 
+    public static ByteBuffer readWithVIntLength(DataInputPlus in) throws IOException
+    {
+        int length = (int)in.readVInt();
+        if (length < 0)
+            throw new IOException("Corrupt (negative) value length encountered");
+
+        return ByteBufferUtil.read(in, length);
+    }
+
     public static int serializedSizeWithLength(ByteBuffer buffer)
     {
         int size = buffer.remaining();
         return TypeSizes.sizeof(size) + size;
     }
 
+    public static int serializedSizeWithVIntLength(ByteBuffer buffer)
+    {
+        int size = buffer.remaining();
+        return TypeSizes.sizeofVInt(size) + size;
+    }
+
+    public static void skipWithVIntLength(DataInputPlus in) throws IOException
+    {
+        int length = (int)in.readVInt();
+        if (length < 0)
+            throw new IOException("Corrupt (negative) value length encountered");
+
+        FileUtils.skipBytesFully(in, length);
+    }
+
     /* @return An unsigned short in an integer. */
     public static int readShortLength(DataInput in) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/utils/ObjectSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ObjectSizes.java b/src/java/org/apache/cassandra/utils/ObjectSizes.java
index e05dcba..e7469c1 100644
--- a/src/java/org/apache/cassandra/utils/ObjectSizes.java
+++ b/src/java/org/apache/cassandra/utils/ObjectSizes.java
@@ -23,6 +23,8 @@ package org.apache.cassandra.utils;
 
 import java.nio.ByteBuffer;
 
+import java.util.ArrayList;
+
 import org.github.jamm.MemoryLayoutSpecification;
 import org.github.jamm.MemoryMeter;
 
@@ -111,6 +113,7 @@ public class ObjectSizes
     {
         return BUFFER_EMPTY_SIZE * array.length + sizeOfArray(array);
     }
+
     /**
      * Memory a byte buffer consumes
      * @param buffer ByteBuffer to calculate in memory size

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/utils/Sorting.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Sorting.java b/src/java/org/apache/cassandra/utils/Sorting.java
deleted file mode 100644
index b1c0b46..0000000
--- a/src/java/org/apache/cassandra/utils/Sorting.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-public abstract class Sorting
-{
-    private Sorting() {}
-
-    /**
-     * Interface that allows to sort elements addressable by index, but without actually requiring those
-     * to elements to be part of a list/array.
-     */
-    public interface Sortable
-    {
-        /**
-         * The number of elements to sort.
-         */
-        public int size();
-
-        /**
-         * Compares the element with index i should sort before the element with index j.
-         */
-        public int compare(int i, int j);
-
-        /**
-         * Swaps element i and j.
-         */
-        public void swap(int i, int j);
-    }
-
-    /**
-     * Sort a sortable.
-     *
-     * The actual algorithm is a direct adaptation of the standard sorting in golang
-     * at http://golang.org/src/pkg/sort/sort.go (comments included).
-     *
-     * It makes one call to data.Len to determine n, and O(n*log(n)) calls to
-     * data.Less and data.Swap. The sort is not guaranteed to be stable.
-     */
-    public static void sort(Sortable data)
-    {
-        // Switch to heapsort if depth of 2*ceil(lg(n+1)) is reached.
-        int n = data.size();
-        int maxDepth = 0;
-        for (int i = n; i > 0; i >>= 1)
-            maxDepth++;
-        maxDepth *= 2;
-        quickSort(data, 0, n, maxDepth);
-    }
-
-    private static void insertionSort(Sortable data, int a, int b)
-    {
-        for (int i = a + 1; i < b; i++)
-            for(int j = i; j > a && data.compare(j, j-1) < 0; j--)
-                data.swap(j, j-1);
-    }
-
-    // siftDown implements the heap property on data[lo, hi).
-    // first is an offset into the array where the root of the heap lies.
-    private static void siftDown(Sortable data, int lo, int hi, int first)
-    {
-        int root = lo;
-        while (true)
-        {
-            int child = 2*root + 1;
-            if (child >= hi)
-                return;
-
-            if (child + 1 < hi && data.compare(first+child, first+child+1) < 0)
-                child++;
-
-            if (data.compare(first+root, first+child) >= 0)
-                return;
-
-            data.swap(first+root, first+child);
-            root = child;
-        }
-    }
-
-    private static void heapSort(Sortable data, int a, int b)
-    {
-        int first = a;
-        int lo = 0;
-        int hi = b - a;
-
-        // Build heap with greatest element at top.
-        for (int i = (hi - 1) / 2; i >= 0; i--)
-            siftDown(data, i, hi, first);
-
-        // Pop elements, largest first, into end of data.
-        for (int i = hi - 1; i >= 0; i--) {
-            data.swap(first, first+i);
-            siftDown(data, lo, i, first);
-        }
-    }
-
-    // Quicksort, following Bentley and McIlroy,
-    // ``Engineering a Sort Function,'' SP&E November 1993.
-
-    // medianOfThree moves the median of the three values data[a], data[b], data[c] into data[a].
-    private static void medianOfThree(Sortable data, int a, int b, int c)
-    {
-        int m0 = b;
-        int m1 = a;
-        int m2 = c;
-        // bubble sort on 3 elements
-        if (data.compare(m1, m0) < 0)
-            data.swap(m1, m0);
-        if (data.compare(m2, m1) < 0)
-            data.swap(m2, m1);
-        if (data.compare(m1, m0) < 0)
-            data.swap(m1, m0);
-        // now data[m0] <= data[m1] <= data[m2]
-    }
-
-    private static void swapRange(Sortable data, int a, int b, int n)
-    {
-        for (int i = 0; i < n; i++)
-            data.swap(a+i, b+i);
-    }
-
-    private static void doPivot(Sortable data, int lo, int hi, int[] result)
-    {
-        int m = lo + (hi-lo)/2; // Written like this to avoid integer overflow.
-        if (hi-lo > 40) {
-            // Tukey's ``Ninther,'' median of three medians of three.
-            int s = (hi - lo) / 8;
-            medianOfThree(data, lo, lo+s, lo+2*s);
-            medianOfThree(data, m, m-s, m+s);
-            medianOfThree(data, hi-1, hi-1-s, hi-1-2*s);
-        }
-        medianOfThree(data, lo, m, hi-1);
-
-        // Invariants are:
-        //    data[lo] = pivot (set up by ChoosePivot)
-        //    data[lo <= i < a] = pivot
-        //    data[a <= i < b] < pivot
-        //    data[b <= i < c] is unexamined
-        //    data[c <= i < d] > pivot
-        //    data[d <= i < hi] = pivot
-        //
-        // Once b meets c, can swap the "= pivot" sections
-        // into the middle of the slice.
-        int pivot = lo;
-        int a = lo+1, b = lo+1, c = hi, d =hi;
-        while (true)
-        {
-            while (b < c)
-            {
-                int cmp = data.compare(b, pivot);
-                if (cmp < 0)  // data[b] < pivot
-                {
-                    b++;
-                }
-                else if (cmp == 0) // data[b] = pivot
-                {
-                    data.swap(a, b);
-                    a++;
-                    b++;
-                }
-                else
-                {
-                    break;
-                }
-            }
-
-            while (b < c)
-            {
-                int cmp = data.compare(pivot, c-1);
-                if (cmp < 0) // data[c-1] > pivot
-                {
-                    c--;
-                }
-                else if (cmp == 0) // data[c-1] = pivot
-                {
-                    data.swap(c-1, d-1);
-                    c--;
-                    d--;
-                }
-                else
-                {
-                    break;
-                }
-            }
-
-            if (b >= c)
-                break;
-
-            // data[b] > pivot; data[c-1] < pivot
-            data.swap(b, c-1);
-            b++;
-            c--;
-        }
-
-        int n = Math.min(b-a, a-lo);
-        swapRange(data, lo, b-n, n);
-
-        n = Math.min(hi-d, d-c);
-        swapRange(data, c, hi-n, n);
-
-        result[0] = lo + b - a;
-        result[1] = hi - (d - c);
-    }
-
-    private static void quickSort(Sortable data, int a, int b, int maxDepth)
-    {
-        int[] buffer = new int[2];
-
-        while (b-a > 7)
-        {
-            if (maxDepth == 0)
-            {
-                heapSort(data, a, b);
-                return;
-            }
-
-            maxDepth--;
-
-            doPivot(data, a, b, buffer);
-            int mlo = buffer[0];
-            int mhi = buffer[1];
-            // Avoiding recursion on the larger subproblem guarantees
-            // a stack depth of at most lg(b-a).
-            if (mlo-a < b-mhi)
-            {
-                quickSort(data, a, mlo, maxDepth);
-                a = mhi; // i.e., quickSort(data, mhi, b)
-            }
-            else
-            {
-                quickSort(data, mhi, b, maxDepth);
-                b = mlo; // i.e., quickSort(data, a, mlo)
-            }
-        }
-
-        if (b-a > 1)
-            insertionSort(data, a, b);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
index 0735d6e..a470527 100644
--- a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
@@ -19,6 +19,11 @@ package org.apache.cassandra.utils.memory;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.rows.ArrayBackedRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class AbstractAllocator
@@ -40,4 +45,32 @@ public abstract class AbstractAllocator
     }
 
     public abstract ByteBuffer allocate(int size);
+
+    public Row.Builder cloningArrayBackedRowBuilder(Columns columns)
+    {
+        return new CloningArrayBackedRowBuilder(columns, this);
+    }
+
+    private static class CloningArrayBackedRowBuilder extends ArrayBackedRow.SortedBuilder
+    {
+        private final AbstractAllocator allocator;
+
+        private CloningArrayBackedRowBuilder(Columns columns, AbstractAllocator allocator)
+        {
+            super(columns);
+            this.allocator = allocator;
+        }
+
+        @Override
+        public void newRow(Clustering clustering)
+        {
+            super.newRow(clustering.copy(allocator));
+        }
+
+        @Override
+        public void addCell(Cell cell)
+        {
+            super.addCell(cell.copy(allocator));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index 443aca2..15499ae 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -59,8 +59,7 @@ public abstract class MemtableAllocator
         this.offHeap = offHeap;
     }
 
-    public abstract MemtableRowData.ReusableRow newReusableRow();
-    public abstract RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp);
+    public abstract Row.Builder rowBuilder(CFMetaData metadata, OpOrder.Group opGroup, boolean isStatic);
     public abstract DecoratedKey clone(DecoratedKey key, OpOrder.Group opGroup);
     public abstract DataReclaimer reclaimer();
 
@@ -103,16 +102,10 @@ public abstract class MemtableAllocator
         return state == LifeCycle.LIVE;
     }
 
-    public static interface RowAllocator extends Row.Writer
-    {
-        public void allocateNewRow(int clusteringSize, Columns columns, boolean isStatic);
-        public MemtableRowData allocatedRowData();
-    }
-
     public static interface DataReclaimer
     {
-        public DataReclaimer reclaim(MemtableRowData row);
-        public DataReclaimer reclaimImmediately(MemtableRowData row);
+        public DataReclaimer reclaim(Row row);
+        public DataReclaimer reclaimImmediately(Row row);
         public DataReclaimer reclaimImmediately(DecoratedKey key);
         public void cancel();
         public void commit();
@@ -120,12 +113,12 @@ public abstract class MemtableAllocator
 
     public static final DataReclaimer NO_OP = new DataReclaimer()
     {
-        public DataReclaimer reclaim(MemtableRowData update)
+        public DataReclaimer reclaim(Row update)
         {
             return this;
         }
 
-        public DataReclaimer reclaimImmediately(MemtableRowData update)
+        public DataReclaimer reclaimImmediately(Row update)
         {
             return this;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
index 144f439..31df444 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
@@ -27,20 +27,15 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public abstract class MemtableBufferAllocator extends MemtableAllocator
 {
-
     protected MemtableBufferAllocator(SubAllocator onHeap, SubAllocator offHeap)
     {
         super(onHeap, offHeap);
     }
 
-    public MemtableRowData.ReusableRow newReusableRow()
+    public Row.Builder rowBuilder(CFMetaData metadata, OpOrder.Group writeOp, boolean isStatic)
     {
-        return MemtableRowData.BufferRowData.createReusableRow();
-    }
-
-    public RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp)
-    {
-        return new RowBufferAllocator(allocator(writeOp), cfm.isCounter());
+        Columns columns = isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars;
+        return allocator(writeOp).cloningArrayBackedRowBuilder(columns);
     }
 
     public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
@@ -54,71 +49,4 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator
     {
         return new ContextAllocator(writeOp, this);
     }
-
-    private static class RowBufferAllocator extends RowDataBlock.Writer implements RowAllocator
-    {
-        private final AbstractAllocator allocator;
-        private final boolean isCounter;
-
-        private MemtableRowData.BufferClustering clustering;
-        private int clusteringIdx;
-        private LivenessInfo info;
-        private DeletionTime deletion;
-        private RowDataBlock data;
-
-        private RowBufferAllocator(AbstractAllocator allocator, boolean isCounter)
-        {
-            super(true);
-            this.allocator = allocator;
-            this.isCounter = isCounter;
-        }
-
-        public void allocateNewRow(int clusteringSize, Columns columns, boolean isStatic)
-        {
-            data = new RowDataBlock(columns, 1, false, isCounter);
-            clustering = isStatic ? null : new MemtableRowData.BufferClustering(clusteringSize);
-            clusteringIdx = 0;
-            updateWriter(data);
-        }
-
-        public MemtableRowData allocatedRowData()
-        {
-            MemtableRowData row = new MemtableRowData.BufferRowData(clustering == null ? Clustering.STATIC_CLUSTERING : clustering,
-                                                                    info,
-                                                                    deletion,
-                                                                    data);
-
-            clustering = null;
-            info = LivenessInfo.NONE;
-            deletion = DeletionTime.LIVE;
-            data = null;
-
-            return row;
-        }
-
-        public void writeClusteringValue(ByteBuffer value)
-        {
-            clustering.setClusteringValue(clusteringIdx++, value == null ? null : allocator.clone(value));
-        }
-
-        public void writePartitionKeyLivenessInfo(LivenessInfo info)
-        {
-            this.info = info;
-        }
-
-        public void writeRowDeletion(DeletionTime deletion)
-        {
-            this.deletion = deletion;
-        }
-
-        @Override
-        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
-        {
-            ByteBuffer v = allocator.clone(value);
-            if (column.isComplex())
-                complexWriter.addCell(column, v, info, MemtableRowData.BufferCellPath.clone(path, allocator));
-            else
-                simpleWriter.addCell(column, v, info);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 7ca859d..7b95430 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.NativeDecoratedKey;
-import org.apache.cassandra.db.rows.MemtableRowData;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class NativeAllocator extends MemtableAllocator
@@ -53,13 +53,7 @@ public class NativeAllocator extends MemtableAllocator
         super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
     }
 
-    public MemtableRowData.ReusableRow newReusableRow()
-    {
-        // TODO
-        throw new UnsupportedOperationException();
-    }
-
-    public RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp)
+    public Row.Builder rowBuilder(CFMetaData metadata, OpOrder.Group opGroup, boolean isStatic)
     {
         // TODO
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/data/corrupt-sstables/la-1-big-CRC.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-CRC.db b/test/data/corrupt-sstables/la-1-big-CRC.db
index f13b9c7..1a0c525 100644
Binary files a/test/data/corrupt-sstables/la-1-big-CRC.db and b/test/data/corrupt-sstables/la-1-big-CRC.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/data/corrupt-sstables/la-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Data.db b/test/data/corrupt-sstables/la-1-big-Data.db
index dc516d8..e6c5eb9 100644
Binary files a/test/data/corrupt-sstables/la-1-big-Data.db and b/test/data/corrupt-sstables/la-1-big-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/data/corrupt-sstables/la-1-big-Digest.adler32
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Digest.adler32 b/test/data/corrupt-sstables/la-1-big-Digest.adler32
index e447277..93deb45 100644
--- a/test/data/corrupt-sstables/la-1-big-Digest.adler32
+++ b/test/data/corrupt-sstables/la-1-big-Digest.adler32
@@ -1 +1 @@
-2370519993
\ No newline at end of file
+3942663153
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/data/corrupt-sstables/la-1-big-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Index.db b/test/data/corrupt-sstables/la-1-big-Index.db
index 178221e..6e5e352 100644
Binary files a/test/data/corrupt-sstables/la-1-big-Index.db and b/test/data/corrupt-sstables/la-1-big-Index.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/data/corrupt-sstables/la-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Statistics.db b/test/data/corrupt-sstables/la-1-big-Statistics.db
index 23b76ac..15220e0 100644
Binary files a/test/data/corrupt-sstables/la-1-big-Statistics.db and b/test/data/corrupt-sstables/la-1-big-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/data/corrupt-sstables/la-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-TOC.txt b/test/data/corrupt-sstables/la-1-big-TOC.txt
index 9cbcd44..9ad71ef 100644
--- a/test/data/corrupt-sstables/la-1-big-TOC.txt
+++ b/test/data/corrupt-sstables/la-1-big-TOC.txt
@@ -1,8 +1,8 @@
 Statistics.db
-Filter.db
-Data.db
-Summary.db
-Digest.adler32
 CRC.db
 TOC.txt
+Data.db
 Index.db
+Summary.db
+Digest.adler32
+Filter.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index e1dd953..8a63a27 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -487,7 +487,7 @@ public class CommitLogStressTest
                     if (!(UTF8Type.instance.compose(row.clustering().get(0)).startsWith("name")))
                         continue;
 
-                    for (Cell cell : row)
+                    for (Cell cell : row.cells())
                     {
                         hash = hash(hash, cell.value());
                         ++cells;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index ce6ac22..12305ef 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -319,7 +319,7 @@ public class SchemaLoader
                 .addPartitionKey("key", AsciiType.instance)
                 .build();
 
-        return cfm.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(ksName, cfName, "indexed", AsciiType.instance, null)
+        return cfm.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(ksName, cfName, "indexed", AsciiType.instance)
                                                                 .setIndex("indexe1", IndexType.CUSTOM, indexOptions));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 423b3c0..654b8c6 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -340,7 +340,7 @@ public class Util
                 assert !iterator.hasNext() : "Expecting a single partition but got more";
 
                 assert partition.hasNext() : "Expecting one row in one partition but got an empty partition";
-                Row row = ((Row)partition.next()).takeAlias();
+                Row row = ((Row)partition.next());
                 assert !partition.hasNext() : "Expecting a single row but got more";
                 return row;
             }
@@ -356,7 +356,7 @@ public class Util
             {
                 assert !iterator.hasNext() : "Expecting a single partition but got more";
                 assert partition.hasNext() : "Expecting one row in one partition but got an empty partition";
-                Row row = partition.next().takeAlias();
+                Row row = partition.next();
                 assert !partition.hasNext() : "Expecting a single row but got more";
                 return row;
             }
@@ -444,10 +444,22 @@ public class Util
         return CBuilder.create(new ClusteringComparator(types));
     }
 
+    public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator b)
+    {
+        return Objects.equals(a.columns(), b.columns())
+            && Objects.equals(a.metadata(), b.metadata())
+            && Objects.equals(a.isReverseOrder(), b.isReverseOrder())
+            && Objects.equals(a.partitionKey(), b.partitionKey())
+            && Objects.equals(a.partitionLevelDeletion(), b.partitionLevelDeletion())
+            && Objects.equals(a.staticRow(), b.staticRow())
+            && Objects.equals(a.stats(), b.stats())
+            && Iterators.elementsEqual(a, b);
+    }
+
     // moved & refactored from KeyspaceTest in < 3.0
     public static void assertColumns(Row row, String... expectedColumnNames)
     {
-        Iterator<Cell> cells = row == null ? Iterators.<Cell>emptyIterator() : row.iterator();
+        Iterator<Cell> cells = row == null ? Iterators.<Cell>emptyIterator() : row.cells().iterator();
         String[] actual = Iterators.toArray(Iterators.transform(cells, new Function<Cell, String>()
         {
             public String apply(Cell cell)
@@ -472,7 +484,7 @@ public class Util
     {
         assertNotNull(cell);
         assertEquals(0, ByteBufferUtil.compareUnsigned(cell.value(), ByteBufferUtil.bytes(value)));
-        assertEquals(timestamp, cell.livenessInfo().timestamp());
+        assertEquals(timestamp, cell.timestamp());
     }
 
     public static void assertClustering(CFMetaData cfm, Row row, Object... clusteringValue)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index fb5d84f..71b87f9 100644
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@ -56,7 +56,7 @@ public class AutoSavingCacheTest
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
         for (int i = 0; i < 2; i++)
         {
-            ColumnDefinition colDef = new ColumnDefinition(cfs.metadata, ByteBufferUtil.bytes("col1"), AsciiType.instance, 0, ColumnDefinition.Kind.REGULAR);
+            ColumnDefinition colDef = ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("col1"), AsciiType.instance);
             RowUpdateBuilder rowBuilder = new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), "key1");
             rowBuilder.add(colDef, "val1");
             rowBuilder.build().apply();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java b/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
index 0e5e192..c875165 100644
--- a/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
+++ b/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
@@ -38,10 +38,10 @@ public class ColumnDefinitionTest
                          .addRegularColumn("val", AsciiType.instance)
                          .build();
 
-        ColumnDefinition cd0 = ColumnDefinition.staticDef(cfm, ByteBufferUtil.bytes("TestColumnDefinitionName0"), BytesType.instance, null)
+        ColumnDefinition cd0 = ColumnDefinition.staticDef(cfm, ByteBufferUtil.bytes("TestColumnDefinitionName0"), BytesType.instance)
                                                .setIndex("random index name 0", IndexType.KEYS, null);
 
-        ColumnDefinition cd1 = ColumnDefinition.staticDef(cfm, ByteBufferUtil.bytes("TestColumnDefinition1"), LongType.instance, null);
+        ColumnDefinition cd1 = ColumnDefinition.staticDef(cfm, ByteBufferUtil.bytes("TestColumnDefinition1"), LongType.instance);
 
         testSerializeDeserialize(cfm, cd0);
         testSerializeDeserialize(cfm, cd1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/cql3/ColumnConditionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ColumnConditionTest.java b/test/unit/org/apache/cassandra/cql3/ColumnConditionTest.java
index 9a768de..71524c5 100644
--- a/test/unit/org/apache/cassandra/cql3/ColumnConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ColumnConditionTest.java
@@ -25,19 +25,20 @@ import org.junit.Test;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.rows.AbstractCell;
+import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.serializers.Int32Serializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.*;
 
 public class ColumnConditionTest
 {
-    public static ByteBuffer UNSET_BYTE_BUFFER = ByteBuffer.wrap(new byte[]{});
+    private static final CellPath LIST_PATH = CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()));
 
     public static final ByteBuffer ZERO = Int32Type.instance.fromString("0");
     public static final ByteBuffer ONE = Int32Type.instance.fromString("1");
@@ -51,12 +52,17 @@ public class ColumnConditionTest
         Cell cell = null;
         if (columnValue != null)
         {
-            ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true), null);
-            cell = new TestCell(definition, null, columnValue, LivenessInfo.NONE);
+            ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
+            cell = testCell(definition, columnValue, LIST_PATH);
         }
         return bound.isSatisfiedByValue(conditionValue, cell, Int32Type.instance, bound.operator);
     }
 
+    private static Cell testCell(ColumnDefinition column, ByteBuffer value, CellPath path)
+    {
+        return new BufferCell(column, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, path);
+    }
+
     private static void assertThrowsIRE(ColumnCondition.Bound bound, ByteBuffer conditionValue, ByteBuffer columnValue)
     {
         try
@@ -69,7 +75,7 @@ public class ColumnConditionTest
     @Test
     public void testSimpleBoundIsSatisfiedByValue() throws InvalidRequestException
     {
-        ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true), null);
+        ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
 
         // EQ
         ColumnCondition condition = ColumnCondition.condition(definition, new Constants.Value(ONE), Operator.EQ);
@@ -157,7 +163,7 @@ public class ColumnConditionTest
         Map<ByteBuffer, CollectionType> typeMap = new HashMap<>();
         typeMap.put(ByteBufferUtil.bytes("c"), ListType.getInstance(Int32Type.instance, true));
 
-        ColumnDefinition definition = new ColumnDefinition(cfm, ByteBufferUtil.bytes("c"), ListType.getInstance(Int32Type.instance, true), 0, ColumnDefinition.Kind.REGULAR);
+        ColumnDefinition definition = ColumnDefinition.regularDef(cfm, ByteBufferUtil.bytes("c"), ListType.getInstance(Int32Type.instance, true));
 
         List<Cell> cells = new ArrayList<>(columnValues.size());
         if (columnValues != null)
@@ -166,7 +172,7 @@ public class ColumnConditionTest
             {
                 ByteBuffer key = Int32Serializer.instance.serialize(i);
                 ByteBuffer value = columnValues.get(i);
-                cells.add(new TestCell(definition, CellPath.create(key), value, LivenessInfo.NONE));
+                cells.add(testCell(definition, value, CellPath.create(key)));
             };
         }
 
@@ -177,7 +183,7 @@ public class ColumnConditionTest
     // sets use the same check as lists
     public void testListCollectionBoundAppliesTo() throws InvalidRequestException
     {
-        ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true), null);
+        ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
 
         // EQ
         ColumnCondition condition = ColumnCondition.condition(definition, null, new Lists.Value(Arrays.asList(ONE)), Operator.EQ);
@@ -288,7 +294,7 @@ public class ColumnConditionTest
         CFMetaData cfm = CFMetaData.compile("create table foo(a int PRIMARY KEY, b int, c set<int>)", "ks");
         Map<ByteBuffer, CollectionType> typeMap = new HashMap<>();
         typeMap.put(ByteBufferUtil.bytes("c"), SetType.getInstance(Int32Type.instance, true));
-        ColumnDefinition definition = new ColumnDefinition(cfm, ByteBufferUtil.bytes("c"), SetType.getInstance(Int32Type.instance, true), 0, ColumnDefinition.Kind.REGULAR);
+        ColumnDefinition definition = ColumnDefinition.regularDef(cfm, ByteBufferUtil.bytes("c"), SetType.getInstance(Int32Type.instance, true));
 
         List<Cell> cells = new ArrayList<>(columnValues.size());
         if (columnValues != null)
@@ -296,7 +302,7 @@ public class ColumnConditionTest
             for (int i = 0; i < columnValues.size(); i++)
             {
                 ByteBuffer key = columnValues.get(i);
-                cells.add(new TestCell(definition, CellPath.create(key), ByteBufferUtil.EMPTY_BYTE_BUFFER, LivenessInfo.NONE));
+                cells.add(testCell(definition, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.create(key)));
             };
         }
 
@@ -306,7 +312,7 @@ public class ColumnConditionTest
     @Test
     public void testSetCollectionBoundAppliesTo() throws InvalidRequestException
     {
-        ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true), null);
+        ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
 
         // EQ
         ColumnCondition condition = ColumnCondition.condition(definition, null, new Sets.Value(set(ONE)), Operator.EQ);
@@ -420,13 +426,13 @@ public class ColumnConditionTest
         CFMetaData cfm = CFMetaData.compile("create table foo(a int PRIMARY KEY, b map<int, int>)", "ks");
         Map<ByteBuffer, CollectionType> typeMap = new HashMap<>();
         typeMap.put(ByteBufferUtil.bytes("b"), MapType.getInstance(Int32Type.instance, Int32Type.instance, true));
-        ColumnDefinition definition = new ColumnDefinition(cfm, ByteBufferUtil.bytes("b"), MapType.getInstance(Int32Type.instance, Int32Type.instance, true), 0, ColumnDefinition.Kind.REGULAR);
+        ColumnDefinition definition = ColumnDefinition.regularDef(cfm, ByteBufferUtil.bytes("b"), MapType.getInstance(Int32Type.instance, Int32Type.instance, true));
 
         List<Cell> cells = new ArrayList<>(columnValues.size());
         if (columnValues != null)
         {
             for (Map.Entry<ByteBuffer, ByteBuffer> entry : columnValues.entrySet())
-                cells.add(new TestCell(definition, CellPath.create(entry.getKey()), entry.getValue(), LivenessInfo.NONE));
+                cells.add(testCell(definition, entry.getValue(), CellPath.create(entry.getKey())));
         }
 
         return bound.mapAppliesTo(MapType.getInstance(Int32Type.instance, Int32Type.instance, true), cells.iterator(), conditionValues, bound.operator);
@@ -435,7 +441,7 @@ public class ColumnConditionTest
     @Test
     public void testMapCollectionBoundIsSatisfiedByValue() throws InvalidRequestException
     {
-        ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true), null);
+        ColumnDefinition definition = ColumnDefinition.regularDef("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
 
         Map<ByteBuffer, ByteBuffer> placeholderMap = new TreeMap<>();
         placeholderMap.put(ONE, ONE);
@@ -573,45 +579,4 @@ public class ColumnConditionTest
         assertTrue(mapAppliesTo(bound, map(ByteBufferUtil.EMPTY_BYTE_BUFFER, ONE), map(ByteBufferUtil.EMPTY_BYTE_BUFFER, ONE)));
         assertTrue(mapAppliesTo(bound, map(ONE, ByteBufferUtil.EMPTY_BYTE_BUFFER), map(ONE, ByteBufferUtil.EMPTY_BYTE_BUFFER)));
     }
-
-    static class TestCell extends AbstractCell
-    {
-        private final ColumnDefinition column;
-        private final CellPath path;
-        private final ByteBuffer value;
-        private final LivenessInfo info;
-
-        public TestCell(ColumnDefinition column, CellPath path, ByteBuffer value, LivenessInfo info)
-        {
-            this.column = column;
-            this.path = path;
-            this.value = value;
-            this.info = info.takeAlias();
-        }
-
-        public ColumnDefinition column()
-        {
-            return column;
-        }
-
-        public boolean isCounterCell()
-        {
-            return false;
-        }
-
-        public ByteBuffer value()
-        {
-            return value;
-        }
-
-        public LivenessInfo livenessInfo()
-        {
-            return info;
-        }
-
-        public CellPath path()
-        {
-            return path;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/cql3/SimpleQueryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/SimpleQueryTest.java b/test/unit/org/apache/cassandra/cql3/SimpleQueryTest.java
index ad0dd7b..052b53d 100644
--- a/test/unit/org/apache/cassandra/cql3/SimpleQueryTest.java
+++ b/test/unit/org/apache/cassandra/cql3/SimpleQueryTest.java
@@ -307,7 +307,7 @@ public class SimpleQueryTest extends CQLTester
         createTable("CREATE TABLE %s (k text, t int, v1 text, v2 int, PRIMARY KEY (k, t));");
 
         for (int t = 0; t < N; t++)
-                execute("INSERT INTO %s (k, t, v1, v2) values (?, ?, ?, ?)", "key", t, "v" + t, t + 10);
+            execute("INSERT INTO %s (k, t, v1, v2) values (?, ?, ?, ?)", "key", t, "v" + t, t + 10);
 
         flush();
 
@@ -434,7 +434,7 @@ public class SimpleQueryTest extends CQLTester
     @Test
     public void collectionDeletionTest() throws Throwable
     {
-        createTable("CREATE TABLE %s (k text PRIMARY KEY, s set<int>);");
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, s set<int>);");
 
         execute("INSERT INTO %s (k, s) VALUES (?, ?)", 1, set(1));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/CellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CellTest.java b/test/unit/org/apache/cassandra/db/CellTest.java
index 9560804..e8cb1cb 100644
--- a/test/unit/org/apache/cassandra/db/CellTest.java
+++ b/test/unit/org/apache/cassandra/db/CellTest.java
@@ -60,12 +60,12 @@ public class CellTest
                 // don't test equality for both sides native, as this is based on CellName resolution
                 if (lhs && rhs)
                     continue;
-                Cell a = buildCell(cfm, "a", "a", 1, 1);
-                Cell b = buildCell(cfm, "a", "a", 1, 0);
+                Cell a = expiring(cfm, "val", "a", 1, 1);
+                Cell b = regular(cfm, "val", "a", 1);
                 Assert.assertNotSame(a, b);
                 Assert.assertNotSame(b, a);
 
-                a = deleted(cfm, "a", "a", 1, 1);
+                a = deleted(cfm, "val", 1, 1);
                 Assert.assertNotSame(a, b);
                 Assert.assertNotSame(b, a);
             }
@@ -76,18 +76,18 @@ public class CellTest
     public void testExpiringCellReconile()
     {
         // equal
-        Assert.assertEquals(0, testExpiring("a", "a", 1, 1, null, null, null, null));
+        Assert.assertEquals(0, testExpiring("val", "a", 1, 1, null, null, null, null));
 
         // newer timestamp
-        Assert.assertEquals(-1, testExpiring("a", "a", 2, 1, null, null, 1L, null));
-        Assert.assertEquals(-1, testExpiring("a", "a", 2, 1, null, "b", 1L, 2));
+        Assert.assertEquals(-1, testExpiring("val", "a", 2, 1, null, null, 1L, null));
+        Assert.assertEquals(-1, testExpiring("val", "a", 2, 1, null, "val", 1L, 2));
 
-        Assert.assertEquals(-1, testExpiring("a", "a", 1, 2, null, null, null, 1));
-        Assert.assertEquals(1, testExpiring("a", "a", 1, 2, null, "b", null, 1));
+        Assert.assertEquals(-1, testExpiring("val", "a", 1, 2, null, null, null, 1));
+        Assert.assertEquals(1, testExpiring("val", "a", 1, 2, null, "val", null, 1));
 
         // newer value
-        Assert.assertEquals(-1, testExpiring("a", "b", 2, 1, null, "a", null, null));
-        Assert.assertEquals(-1, testExpiring("a", "b", 2, 1, null, "a", null, 2));
+        Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", null, null));
+        Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", null, 2));
     }
 
     private int testExpiring(String n1, String v1, long t1, int et1, String n2, String v2, Long t2, Integer et2)
@@ -100,8 +100,8 @@ public class CellTest
             t2 = t1;
         if (et2 == null)
             et2 = et1;
-        Cell c1 = buildCell(cfm, n1, v1, t1, et1);
-        Cell c2 = buildCell(cfm, n2, v2, t2, et2);
+        Cell c1 = expiring(cfm, n1, v1, t1, et1);
+        Cell c2 = expiring(cfm, n2, v2, t2, et2);
 
         int now = FBUtilities.nowInSeconds();
         if (Cells.reconcile(c1, c2, now) == c1)
@@ -109,56 +109,21 @@ public class CellTest
         return Cells.reconcile(c2, c1, now) == c2 ? 1 : 0;
     }
 
-    private Cell buildCell(CFMetaData cfm, String columnName, String value, long timestamp, int ttl)
+    private Cell regular(CFMetaData cfm, String columnName, String value, long timestamp)
     {
         ColumnDefinition cdef = cfm.getColumnDefinition(ByteBufferUtil.bytes(columnName));
-        LivenessInfo info = SimpleLivenessInfo.forUpdate(timestamp, ttl, FBUtilities.nowInSeconds(), cfm);
-        return new TestCell(cdef, ByteBufferUtil.bytes(value), info);
+        return BufferCell.live(cfm, cdef, timestamp, ByteBufferUtil.bytes(value));
     }
 
-    private Cell deleted(CFMetaData cfm, String columnName, String value, int localDeletionTime, long timestamp)
+    private Cell expiring(CFMetaData cfm, String columnName, String value, long timestamp, int localExpirationTime)
     {
         ColumnDefinition cdef = cfm.getColumnDefinition(ByteBufferUtil.bytes(columnName));
-        LivenessInfo info = SimpleLivenessInfo.forDeletion(timestamp, localDeletionTime);
-        return new TestCell(cdef, ByteBufferUtil.bytes(value), info);
+        return new BufferCell(cdef, timestamp, 1, localExpirationTime, ByteBufferUtil.bytes(value), null);
     }
 
-    public static class TestCell extends AbstractCell
+    private Cell deleted(CFMetaData cfm, String columnName, int localDeletionTime, long timestamp)
     {
-        private final ColumnDefinition column;
-        private final ByteBuffer value;
-        private final LivenessInfo info;
-
-        public TestCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info)
-        {
-            this.column = column;
-            this.value = value;
-            this.info = info.takeAlias();
-        }
-
-        public ColumnDefinition column()
-        {
-            return column;
-        }
-
-        public boolean isCounterCell()
-        {
-            return false;
-        }
-
-        public ByteBuffer value()
-        {
-            return value;
-        }
-
-        public LivenessInfo livenessInfo()
-        {
-            return info;
-        }
-
-        public CellPath path()
-        {
-            return null;
-        }
+        ColumnDefinition cdef = cfm.getColumnDefinition(ByteBufferUtil.bytes(columnName));
+        return BufferCell.tombstone(cdef, timestamp, localDeletionTime);
     }
 }