You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:31 UTC

[02/15] cassandra git commit: Simplify some 8099's implementations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index deb1480..065479b 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -146,7 +146,7 @@ public class ColumnFamilyStoreTest
             public void runMayThrow() throws IOException
             {
                 Row toCheck = Util.getOnlyRowUnfiltered(Util.cmd(cfs, "key1").build());
-                Iterator<Cell> iter = toCheck.iterator();
+                Iterator<Cell> iter = toCheck.cells().iterator();
                 assert(Iterators.size(iter) == 0);
             }
         };
@@ -254,7 +254,7 @@ public class ColumnFamilyStoreTest
         ByteBuffer val = ByteBufferUtil.bytes("val1");
 
         // insert
-        ColumnDefinition newCol = new ColumnDefinition(cfs.metadata, ByteBufferUtil.bytes("val2"), AsciiType.instance, 1, ColumnDefinition.Kind.REGULAR);
+        ColumnDefinition newCol = ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("val2"), AsciiType.instance);
         new RowUpdateBuilder(cfs.metadata, 0, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
         new RowUpdateBuilder(cfs.metadata, 0, "key2").clustering("Column1").add("val", "val1").build().applyUnsafe();
         assertRangeCount(cfs, col, val, 2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 5838e23..21bdd9b 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.vint.VIntCoding;
 
 public class CommitLogTest
 {
@@ -230,7 +231,7 @@ public class CommitLogTest
     private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
     {
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(cfName);
-        // We don't want to allocate a size of 0 as this is optimize under the hood and our computation would
+        // We don't want to allocate a size of 0 as this is optimized under the hood and our computation would
         // break testEqualRecordLimit
         int allocSize = 1;
         Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, key)
@@ -239,7 +240,16 @@ public class CommitLogTest
 
         int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
         max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
-        return max - (int) Mutation.serializer.serializedSize(rm, MessagingService.current_version) + allocSize;
+
+        // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
+        int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
+        max -= mutationOverhead;
+
+        // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value.
+        int sizeOfMax = VIntCoding.computeVIntSize(max);
+        max -= sizeOfMax;
+        assert VIntCoding.computeVIntSize(max) == sizeOfMax; // sanity check that we're still encoded with the size we though we would
+        return max;
     }
 
     private static int getMaxRecordDataSize()
@@ -351,7 +361,7 @@ public class CommitLogTest
                 .applyUnsafe();
 
             assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
-                            .iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
+                            .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
 
             cfs.truncateBlocking();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/CounterCellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCellTest.java b/test/unit/org/apache/cassandra/db/CounterCellTest.java
index fb9f9ac..08e0b25 100644
--- a/test/unit/org/apache/cassandra/db/CounterCellTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCellTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.rows.AbstractCell;
+import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.db.rows.Cells;
@@ -80,52 +80,13 @@ public class CounterCellTest
         stepLength    = idLength + clockLength + countLength;
     }
 
-    private class TestCounterCell extends AbstractCell
-    {
-        private final ColumnDefinition column;
-        private final ByteBuffer value;
-        private final LivenessInfo info;
-
-        private TestCounterCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info)
-        {
-            this.column = column;
-            this.value = value;
-            this.info = info.takeAlias();
-        }
-
-        public ColumnDefinition column()
-        {
-            return column;
-        }
-
-        public boolean isCounterCell()
-        {
-            return true;
-        }
-
-        public ByteBuffer value()
-        {
-            return value;
-        }
-
-        public LivenessInfo livenessInfo()
-        {
-            return info;
-        }
-
-        public CellPath path()
-        {
-            return null;
-        }
-    }
-
     @Test
     public void testCreate()
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
         long delta = 3L;
 
-        TestCounterCell cell = createLegacyCounterCell(cfs, ByteBufferUtil.bytes("c1"), delta, 1, 0, 0);
+        Cell cell = createLegacyCounterCell(cfs, ByteBufferUtil.bytes("val"), delta, 1);
 
         assertEquals(delta, CounterContext.instance().total(cell.value()));
         assertEquals(1, cell.value().getShort(0));
@@ -136,131 +97,115 @@ public class CounterCellTest
 
     }
 
-    private TestCounterCell createLegacyCounterCell(ColumnFamilyStore cfs,
-                                                    ByteBuffer colName,
-                                                    long count,
-                                                    long ts,
-                                                    int ttl,
-                                                    int localDeletion)
+    private Cell createLegacyCounterCell(ColumnFamilyStore cfs, ByteBuffer colName, long count, long ts)
     {
         ColumnDefinition cDef = cfs.metadata.getColumnDefinition(colName);
         ByteBuffer val = CounterContext.instance().createLocal(count);
-        LivenessInfo li = new SimpleLivenessInfo(ts, ttl, localDeletion);
-        return new TestCounterCell(cDef, val, li);
+        return BufferCell.live(cfs.metadata, cDef, ts, val);
     }
 
-    private TestCounterCell createCounterCell(ColumnFamilyStore cfs,
-                                              ByteBuffer colName,
-                                              CounterId id,
-                                              long count,
-                                              long ts,
-                                              int ttl,
-                                              int localDeletion)
+    private Cell createCounterCell(ColumnFamilyStore cfs, ByteBuffer colName, CounterId id, long count, long ts)
     {
         ColumnDefinition cDef = cfs.metadata.getColumnDefinition(colName);
         ByteBuffer val = CounterContext.instance().createGlobal(id, ts, count);
-        LivenessInfo li = new SimpleLivenessInfo(ts, ttl, localDeletion);
-        return new TestCounterCell(cDef, val, li);
+        return BufferCell.live(cfs.metadata, cDef, ts, val);
+    }
+
+    private Cell createCounterCellFromContext(ColumnFamilyStore cfs, ByteBuffer colName, ContextState context, long ts)
+    {
+        ColumnDefinition cDef = cfs.metadata.getColumnDefinition(colName);
+        return BufferCell.live(cfs.metadata, cDef, ts, context.context);
     }
 
-    private TestCounterCell createCounterCellFromContext(ColumnFamilyStore cfs,
-                                                         ByteBuffer colName,
-                                                         ContextState context,
-                                                         long ts,
-                                                         int ttl,
-                                                         int localDeletion)
+    private Cell createDeleted(ColumnFamilyStore cfs, ByteBuffer colName, long ts, int localDeletionTime)
     {
         ColumnDefinition cDef = cfs.metadata.getColumnDefinition(colName);
-        LivenessInfo li = new SimpleLivenessInfo(ts, ttl, localDeletion);
-        return new TestCounterCell(cDef, context.context, li);
+        return BufferCell.tombstone(cDef, ts, localDeletionTime);
     }
 
     @Test
     public void testReconcile()
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
-        ColumnDefinition cDef = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1).metadata.getColumnDefinition(ByteBufferUtil.bytes("val"));
-        ByteBuffer col = ByteBufferUtil.bytes("c1");
+        ByteBuffer col = ByteBufferUtil.bytes("val");
 
-        AbstractCell left;
-        AbstractCell right;
+        Cell left;
+        Cell right;
 
         // both deleted, diff deletion time, same ts
-        left = createLegacyCounterCell(cfs, col, 1, 2, 0, 5);
-        right = createLegacyCounterCell(cfs, col, 1, 2, 0, 10);
+        left = createDeleted(cfs, col, 2, 5);
+        right = createDeleted(cfs, col, 2, 10);
         assert Cells.reconcile(left, right, 10) == right;
 
         // diff ts
-        right = createLegacyCounterCell(cfs, col, 1, 1, 0, 10);
+        right = createLegacyCounterCell(cfs, col, 1, 10);
         assert Cells.reconcile(left, right, 10) == left;
 
         // < tombstone
-        left = new CellTest.TestCell(cDef, ByteBufferUtil.bytes(5L), SimpleLivenessInfo.forDeletion(6, 6));
-        right = createLegacyCounterCell(cfs, col, 1, 5, 0, 5);
+        left = createDeleted(cfs, col, 6, 6);
+        right = createLegacyCounterCell(cfs, col, 1, 5);
         assert Cells.reconcile(left, right, 10) == left;
 
         // > tombstone
-        left = new CellTest.TestCell(cDef, ByteBufferUtil.bytes(5L), SimpleLivenessInfo.forDeletion(1, 1));
-        right = createLegacyCounterCell(cfs, col, 1, 5, 0, Integer.MAX_VALUE);
+        left = createDeleted(cfs, col, 1, 1);
+        right = createLegacyCounterCell(cfs, col, 1, 5);
         assert Cells.reconcile(left, right, 10) == left;
 
         // == tombstone
-        left = new CellTest.TestCell(cDef, ByteBufferUtil.bytes(5L), SimpleLivenessInfo.forDeletion(8, 8));
-        right = createLegacyCounterCell(cfs, col, 1, 8, 0, Integer.MAX_VALUE);
+        left = createDeleted(cfs, col, 8, 8);
+        right = createLegacyCounterCell(cfs, col, 1, 8);
         assert Cells.reconcile(left, right, 10) == left;
 
         // live + live
-        left = createLegacyCounterCell(cfs, col, 1, 2, 0, Integer.MAX_VALUE);
-        right = createLegacyCounterCell(cfs, col, 3, 5, 0, Integer.MAX_VALUE);
+        left = createLegacyCounterCell(cfs, col, 1, 2);
+        right = createLegacyCounterCell(cfs, col, 3, 5);
         Cell reconciled = Cells.reconcile(left, right, 10);
         assertEquals(CounterContext.instance().total(reconciled.value()), 4);
-        assertEquals(reconciled.livenessInfo().timestamp(), 5L);
+        assertEquals(reconciled.timestamp(), 5L);
 
         // Add, don't change TS
-        Cell addTen = createLegacyCounterCell(cfs, col, 10, 4, 0, Integer.MAX_VALUE);
+        Cell addTen = createLegacyCounterCell(cfs, col, 10, 4);
         reconciled = Cells.reconcile(reconciled, addTen, 10);
         assertEquals(CounterContext.instance().total(reconciled.value()), 14);
-        assertEquals(reconciled.livenessInfo().timestamp(), 5L);
+        assertEquals(reconciled.timestamp(), 5L);
 
         // Add w/new TS
-        Cell addThree = createLegacyCounterCell(cfs, col, 3, 7, 0, Integer.MAX_VALUE);
+        Cell addThree = createLegacyCounterCell(cfs, col, 3, 7);
         reconciled = Cells.reconcile(reconciled, addThree, 10);
         assertEquals(CounterContext.instance().total(reconciled.value()), 17);
-        assertEquals(reconciled.livenessInfo().timestamp(), 7L);
+        assertEquals(reconciled.timestamp(), 7L);
 
         // Confirm no deletion time
-        assert reconciled.livenessInfo().localDeletionTime() == Integer.MAX_VALUE;
+        assert reconciled.localDeletionTime() == Integer.MAX_VALUE;
 
-        Cell deleted = createLegacyCounterCell(cfs, col, 2, 8, 0, 8);
+        Cell deleted = createDeleted(cfs, col, 8, 8);
         reconciled = Cells.reconcile(reconciled, deleted, 10);
-        assertEquals(2, CounterContext.instance().total(reconciled.value()));
-        assertEquals(reconciled.livenessInfo().timestamp(), 8L);
-        assert reconciled.livenessInfo().localDeletionTime() == 8;
+        assert reconciled.localDeletionTime() == 8;
     }
 
     @Test
     public void testDiff()
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
-        ByteBuffer col = ByteBufferUtil.bytes("c1");
+        ByteBuffer col = ByteBufferUtil.bytes("val");
 
-        TestCounterCell leftCell;
-        TestCounterCell rightCell;
+        Cell leftCell;
+        Cell rightCell;
 
         // Equal count
-        leftCell = createLegacyCounterCell(cfs, col, 2, 2, 0, Integer.MAX_VALUE);
-        rightCell = createLegacyCounterCell(cfs, col, 2, 1, 0, Integer.MAX_VALUE);
+        leftCell = createLegacyCounterCell(cfs, col, 2, 2);
+        rightCell = createLegacyCounterCell(cfs, col, 2, 1);
         assertEquals(CounterContext.Relationship.EQUAL, CounterContext.instance().diff(leftCell.value(), rightCell.value()));
 
         // Non-equal count
-        leftCell = createLegacyCounterCell(cfs, col, 1, 2, 0, Integer.MAX_VALUE);
-        rightCell = createLegacyCounterCell(cfs, col, 2, 1, 0, Integer.MAX_VALUE);
+        leftCell = createLegacyCounterCell(cfs, col, 1, 2);
+        rightCell = createLegacyCounterCell(cfs, col, 2, 1);
         assertEquals(CounterContext.Relationship.DISJOINT, CounterContext.instance().diff(leftCell.value(), rightCell.value()));
 
         // timestamp
         CounterId id = CounterId.generate();
-        leftCell = createCounterCell(cfs, col, id, 2, 2, 0, Integer.MAX_VALUE);
-        rightCell = createCounterCell(cfs, col, id, 2, 1, 0, Integer.MAX_VALUE);
+        leftCell = createCounterCell(cfs, col, id, 2, 2);
+        rightCell = createCounterCell(cfs, col, id, 2, 1);
         assertEquals(CounterContext.Relationship.GREATER_THAN, CounterContext.instance().diff(leftCell.value(), rightCell.value()));
 
         ContextState leftContext;
@@ -273,9 +218,9 @@ public class CounterCellTest
         leftContext.writeRemote(CounterId.fromInt(9), 1L, 0L);
         rightContext = ContextState.wrap(ByteBufferUtil.clone(leftContext.context));
 
-        leftCell = createCounterCellFromContext(cfs, col, leftContext, 1, 0, Integer.MAX_VALUE);
-        rightCell = createCounterCellFromContext(cfs, col, rightContext, 1, 0, Integer.MAX_VALUE);
-        assertEquals(CounterContext.Relationship.EQUAL, CounterContext.instance().diff(leftCell.value, rightCell.value));
+        leftCell = createCounterCellFromContext(cfs, col, leftContext, 1);
+        rightCell = createCounterCellFromContext(cfs, col, rightContext, 1);
+        assertEquals(CounterContext.Relationship.EQUAL, CounterContext.instance().diff(leftCell.value(), rightCell.value()));
 
         // greater than: left has superset of nodes (counts equal)
         leftContext = ContextState.allocate(0, 0, 4);
@@ -289,10 +234,10 @@ public class CounterCellTest
         rightContext.writeRemote(CounterId.fromInt(6), 2L, 0L);
         rightContext.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        leftCell = createCounterCellFromContext(cfs, col, leftContext, 1, 0, Integer.MAX_VALUE);
-        rightCell = createCounterCellFromContext(cfs, col, rightContext, 1, 0, Integer.MAX_VALUE);
-        assertEquals(CounterContext.Relationship.GREATER_THAN, CounterContext.instance().diff(leftCell.value, rightCell.value));
-        assertEquals(CounterContext.Relationship.LESS_THAN, CounterContext.instance().diff(rightCell.value, leftCell.value));
+        leftCell = createCounterCellFromContext(cfs, col, leftContext, 1);
+        rightCell = createCounterCellFromContext(cfs, col, rightContext, 1);
+        assertEquals(CounterContext.Relationship.GREATER_THAN, CounterContext.instance().diff(leftCell.value(), rightCell.value()));
+        assertEquals(CounterContext.Relationship.LESS_THAN, CounterContext.instance().diff(rightCell.value(), leftCell.value()));
 
         // disjoint: right and left have disjoint node sets
         leftContext = ContextState.allocate(0, 0, 3);
@@ -305,17 +250,17 @@ public class CounterCellTest
         rightContext.writeRemote(CounterId.fromInt(6), 1L, 0L);
         rightContext.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        leftCell = createCounterCellFromContext(cfs, col, leftContext, 1, 0, Integer.MAX_VALUE);
-        rightCell = createCounterCellFromContext(cfs, col, rightContext, 1, 0, Integer.MAX_VALUE);
-        assertEquals(CounterContext.Relationship.DISJOINT, CounterContext.instance().diff(leftCell.value, rightCell.value));
-        assertEquals(CounterContext.Relationship.DISJOINT, CounterContext.instance().diff(rightCell.value, leftCell.value));
+        leftCell = createCounterCellFromContext(cfs, col, leftContext, 1);
+        rightCell = createCounterCellFromContext(cfs, col, rightContext, 1);
+        assertEquals(CounterContext.Relationship.DISJOINT, CounterContext.instance().diff(leftCell.value(), rightCell.value()));
+        assertEquals(CounterContext.Relationship.DISJOINT, CounterContext.instance().diff(rightCell.value(), leftCell.value()));
     }
 
     @Test
     public void testUpdateDigest() throws Exception
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1);
-        ByteBuffer col = ByteBufferUtil.bytes("c1");
+        ByteBuffer col = ByteBufferUtil.bytes("val");
 
         MessageDigest digest1 = MessageDigest.getInstance("md5");
         MessageDigest digest2 = MessageDigest.getInstance("md5");
@@ -326,14 +271,13 @@ public class CounterCellTest
         state.writeRemote(CounterId.fromInt(3), 4L, 4L);
         state.writeLocal(CounterId.fromInt(4), 4L, 4L);
 
-        TestCounterCell original = createCounterCellFromContext(cfs, col, state, 5, 0, Integer.MAX_VALUE);
+        Cell original = createCounterCellFromContext(cfs, col, state, 5);
 
         ColumnDefinition cDef = cfs.metadata.getColumnDefinition(col);
-        LivenessInfo li = new SimpleLivenessInfo(5, 0, Integer.MAX_VALUE);
-        TestCounterCell cleared = new TestCounterCell(cDef, CounterContext.instance().clearAllLocal(state.context), li);
+        Cell cleared = BufferCell.live(cfs.metadata, cDef, 5, CounterContext.instance().clearAllLocal(state.context));
 
-        CounterContext.instance().updateDigest(digest1, original.value);
-        CounterContext.instance().updateDigest(digest2, cleared.value);
+        CounterContext.instance().updateDigest(digest1, original.value());
+        CounterContext.instance().updateDigest(digest2, cleared.value());
 
         assert Arrays.equals(digest1.digest(), digest2.digest());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/KeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index fdd4f0c..8754209 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -129,8 +129,8 @@ public class KeyspaceTest extends CQLTester
 
     private static void assertRowsInSlice(ColumnFamilyStore cfs, String key, int sliceStart, int sliceEnd, int limit, boolean reversed, String columnValuePrefix)
     {
-        Clustering startClustering = new SimpleClustering(ByteBufferUtil.bytes(sliceStart));
-        Clustering endClustering = new SimpleClustering(ByteBufferUtil.bytes(sliceEnd));
+        Clustering startClustering = new Clustering(ByteBufferUtil.bytes(sliceStart));
+        Clustering endClustering = new Clustering(ByteBufferUtil.bytes(sliceEnd));
         Slices slices = Slices.with(cfs.getComparator(), Slice.make(startClustering, endClustering));
         ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, reversed);
         SinglePartitionSliceCommand command = singlePartitionSlice(cfs, key, filter, limit);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
index 8a697f4..d6d6d07 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
@@ -375,7 +375,7 @@ public class RangeTombstoneListTest
 
     private static Clustering clustering(int i)
     {
-        return new SimpleClustering(bb(i));
+        return new Clustering(bb(i));
     }
 
     private static ByteBuffer bb(int i)
@@ -395,12 +395,12 @@ public class RangeTombstoneListTest
 
     private static RangeTombstone rt(int start, boolean startInclusive, int end, boolean endInclusive, long tstamp)
     {
-        return new RangeTombstone(Slice.make(Slice.Bound.create(cmp, true, startInclusive, start), Slice.Bound.create(cmp, false, endInclusive, end)), new SimpleDeletionTime(tstamp, 0));
+        return new RangeTombstone(Slice.make(Slice.Bound.create(cmp, true, startInclusive, start), Slice.Bound.create(cmp, false, endInclusive, end)), new DeletionTime(tstamp, 0));
     }
 
     private static RangeTombstone rt(int start, int end, long tstamp, int delTime)
     {
-        return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.inclusiveEndOf(bb(end))), new SimpleDeletionTime(tstamp, delTime));
+        return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.inclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
     }
 
     private static RangeTombstone rtei(int start, int end, long tstamp)
@@ -410,7 +410,7 @@ public class RangeTombstoneListTest
 
     private static RangeTombstone rtei(int start, int end, long tstamp, int delTime)
     {
-        return new RangeTombstone(Slice.make(Slice.Bound.exclusiveStartOf(bb(start)), Slice.Bound.inclusiveEndOf(bb(end))), new SimpleDeletionTime(tstamp, delTime));
+        return new RangeTombstone(Slice.make(Slice.Bound.exclusiveStartOf(bb(start)), Slice.Bound.inclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
     }
 
     private static RangeTombstone rtie(int start, int end, long tstamp)
@@ -420,6 +420,6 @@ public class RangeTombstoneListTest
 
     private static RangeTombstone rtie(int start, int end, long tstamp, int delTime)
     {
-        return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.exclusiveEndOf(bb(end))), new SimpleDeletionTime(tstamp, delTime));
+        return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.exclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 8fb0dd0..c83103a 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class RangeTombstoneTest
@@ -110,19 +111,20 @@ public class RangeTombstoneTest
             cmdBuilder.includeRow(i);
 
         Partition partition = Util.getOnlyPartitionUnfiltered(cmdBuilder.build());
+        int nowInSec = FBUtilities.nowInSeconds();
 
         for (int i : live)
-            assertTrue("Row " + i + " should be live", partition.getRow(new SimpleClustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
         for (int i : dead)
-            assertTrue("Row " + i + " shouldn't be live", !partition.getRow(new SimpleClustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
 
         // Queries by slices
         partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(7).toIncl(30).build());
 
         for (int i : new int[]{ 7, 8, 9, 11, 13, 15, 17, 28, 29, 30 })
-            assertTrue("Row " + i + " should be live", partition.getRow(new SimpleClustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
         for (int i : new int[]{ 10, 12, 14, 16, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27 })
-            assertTrue("Row " + i + " shouldn't be live", partition.getRow(new SimpleClustering(bb(i))) == null);
+            assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
     }
 
     @Test
@@ -405,24 +407,25 @@ public class RangeTombstoneTest
         cfs.forceBlockingFlush();
 
         Partition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build());
+        int nowInSec = FBUtilities.nowInSeconds();
 
         for (int i = 0; i < 5; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new SimpleClustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
         for (int i = 16; i < 20; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new SimpleClustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
         for (int i = 5; i <= 15; i++)
-            assertTrue("Row " + i + " shouldn't be live", partition.getRow(new SimpleClustering(bb(i))) == null);
+            assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
 
         // Compact everything and re-test
         CompactionManager.instance.performMaximal(cfs, false);
         partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build());
 
         for (int i = 0; i < 5; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new SimpleClustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
         for (int i = 16; i < 20; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new SimpleClustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
         for (int i = 5; i <= 15; i++)
-            assertTrue("Row " + i + " shouldn't be live", partition.getRow(new SimpleClustering(bb(i))) == null);
+            assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
     }
 
     @Test
@@ -576,7 +579,7 @@ public class RangeTombstoneTest
         // compacted down to single sstable
         assertEquals(1, cfs.getSSTables().size());
 
-        assertEquals(10, index.deletes.size());
+        assertEquals(8, index.deletes.size());
     }
 
     private static ByteBuffer bb(int i)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index 2475821..3c53934 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -219,7 +219,7 @@ public class ReadMessageTest
         {
             for (PartitionUpdate upd : mutation.getPartitionUpdates())
             {
-                Row r = upd.getRow(new SimpleClustering(ByteBufferUtil.bytes("c")));
+                Row r = upd.getRow(new Clustering(ByteBufferUtil.bytes("c")));
                 if (r != null)
                 {
                     if (r.getCell(withCommit) != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 14c9832..9275dae 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@ -82,7 +82,7 @@ public class RecoveryManagerMissingHeaderTest
 
         CommitLog.instance.resetUnsafe(false);
 
-        Assert.assertTrue(AbstractUnfilteredRowIterator.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
-        Assert.assertTrue(AbstractUnfilteredRowIterator.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
+        Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
+        Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 9fc37be..baf9466 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -106,8 +106,8 @@ public class RecoveryManagerTest
         CommitLog.instance.resetUnsafe(false);
 
         DecoratedKey dk = Util.dk("keymulti");
-        Assert.assertTrue(AbstractUnfilteredRowIterator.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
-        Assert.assertTrue(AbstractUnfilteredRowIterator.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
+        Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
+        Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 6289c96..883149f 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -32,10 +32,7 @@ import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.rows.Unfiltered;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.IntegerType;
@@ -113,10 +110,9 @@ public class RowCacheTest
         for (Unfiltered unfiltered : Util.once(cachedCf.unfilteredIterator(ColumnFilter.selection(cachedCf.columns()), Slices.ALL, false)))
         {
             Row r = (Row) unfiltered;
-
-            for (Cell c : r)
+            for (ColumnData c : r)
             {
-                assertEquals(c.value(), ByteBufferUtil.bytes("val" + 0));
+                assertEquals(((Cell)c).value(), ByteBufferUtil.bytes("val" + 0));
             }
         }
         cachedStore.truncateBlocking();
@@ -156,7 +152,7 @@ public class RowCacheTest
                 Row r = (Row)ai.next();
                 assertFalse(ai.hasNext());
 
-                Iterator<Cell> ci = r.iterator();
+                Iterator<Cell> ci = r.cells().iterator();
                 assert(ci.hasNext());
                 Cell cell = ci.next();
 
@@ -183,7 +179,7 @@ public class RowCacheTest
                 Row r = (Row)ai.next();
                 assertFalse(ai.hasNext());
 
-                Iterator<Cell> ci = r.iterator();
+                Iterator<Cell> ci = r.cells().iterator();
                 assert(ci.hasNext());
                 Cell cell = ci.next();
 
@@ -309,9 +305,9 @@ public class RowCacheTest
 
             assertEquals(r.clustering().get(0), ByteBufferUtil.bytes(values[i].substring(3)));
 
-            for (Cell c : r)
+            for (ColumnData c : r)
             {
-                assertEquals(c.value(), ByteBufferUtil.bytes(values[i]));
+                assertEquals(((Cell)c).value(), ByteBufferUtil.bytes(values[i]));
             }
             i++;
         }
@@ -351,7 +347,7 @@ public class RowCacheTest
         for (int i = offset; i < offset + numberOfRows; i++)
         {
             DecoratedKey key = Util.dk("key" + i);
-            Clustering cl = new SimpleClustering(ByteBufferUtil.bytes("col" + i));
+            Clustering cl = new Clustering(ByteBufferUtil.bytes("col" + i));
             Util.getAll(Util.cmd(store, key).build());
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index f7851f0..9a97483 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -107,12 +107,12 @@ public class RowTest
             {
                 RangeTombstoneBoundMarker openMarker = (RangeTombstoneBoundMarker)merged.next();
                 Slice.Bound openBound = openMarker.clustering();
-                DeletionTime openDeletion = new SimpleDeletionTime(openMarker.deletionTime().markedForDeleteAt(),
+                DeletionTime openDeletion = new DeletionTime(openMarker.deletionTime().markedForDeleteAt(),
                                                                    openMarker.deletionTime().localDeletionTime());
 
                 RangeTombstoneBoundMarker closeMarker = (RangeTombstoneBoundMarker)merged.next();
                 Slice.Bound closeBound = closeMarker.clustering();
-                DeletionTime closeDeletion = new SimpleDeletionTime(closeMarker.deletionTime().markedForDeleteAt(),
+                DeletionTime closeDeletion = new DeletionTime(closeMarker.deletionTime().markedForDeleteAt(),
                                                                     closeMarker.deletionTime().localDeletionTime());
 
                 assertEquals(openDeletion, closeDeletion);
@@ -127,16 +127,18 @@ public class RowTest
         ColumnDefinition defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
         ColumnDefinition defB = cfm.getColumnDefinition(new ColumnIdentifier("b", true));
 
-        PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1);
-        Rows.writeClustering(update.metadata().comparator.make("c1"), update.writer());
-        writeSimpleCellValue(update.writer(), cfm, defA, "a1", 0, nowInSeconds);
-        writeSimpleCellValue(update.writer(), cfm, defA, "a2", 1, nowInSeconds);
-        writeSimpleCellValue(update.writer(), cfm, defB, "b1", 1, nowInSeconds);
-        update.writer().endOfRow();
+        Row.Builder builder = ArrayBackedRow.unsortedBuilder(cfm.partitionColumns().regulars, nowInSeconds);
+        builder.newRow(cfm.comparator.make("c1"));
+        writeSimpleCellValue(builder, cfm, defA, "a1", 0);
+        writeSimpleCellValue(builder, cfm, defA, "a2", 1);
+        writeSimpleCellValue(builder, cfm, defB, "b1", 1);
+        Row row = builder.build();
+
+        PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, dk, row);
 
         Unfiltered unfiltered = update.unfilteredIterator().next();
         assertTrue(unfiltered.kind() == Unfiltered.Kind.ROW);
-        Row row = (Row) unfiltered;
+        row = (Row) unfiltered;
         assertEquals("a2", defA.cellValueType().getString(row.getCell(defA).value()));
         assertEquals("b1", defB.cellValueType().getString(row.getCell(defB).value()));
         assertEquals(2, row.columns().columnCount());
@@ -147,12 +149,10 @@ public class RowTest
     {
         int ttl = 1;
         ColumnDefinition def = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
-        PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1);
-        Rows.writeClustering(update.metadata().comparator.make("c1"), update.writer());
-        update.writer().writeCell(def, false, ((AbstractType) def.cellValueType()).decompose("a1"),
-                                  SimpleLivenessInfo.forUpdate(0, ttl, nowInSeconds, cfm),
-                                  null);
-        update.writer().endOfRow();
+
+        Cell cell = BufferCell.expiring(def, 0, ttl, nowInSeconds, ((AbstractType) def.cellValueType()).decompose("a1"));
+
+        PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, dk, ArrayBackedRow.singleCellRow(cfm.comparator.make("c1"), cell));
         new Mutation(update).applyUnsafe();
 
         // when we read with a nowInSeconds before the cell has expired,
@@ -184,21 +184,15 @@ public class RowTest
     public void writeRangeTombstone(PartitionUpdate update, Object start, Object end, long markedForDeleteAt, int localDeletionTime)
     {
         ClusteringComparator comparator = cfs.getComparator();
-        update.addRangeTombstone(Slice.make(comparator.make(start), comparator.make(end)),
-                                 new SimpleDeletionTime(markedForDeleteAt, localDeletionTime));
+        update.add(new RangeTombstone(Slice.make(comparator.make(start), comparator.make(end)), new DeletionTime(markedForDeleteAt, localDeletionTime)));
     }
 
-    private void writeSimpleCellValue(Row.Writer writer,
+    private void writeSimpleCellValue(Row.Builder builder,
                                       CFMetaData cfm,
                                       ColumnDefinition columnDefinition,
                                       String value,
-                                      long timestamp,
-                                      int nowInSeconds)
+                                      long timestamp)
     {
-        writer.writeCell(columnDefinition,
-                         false,
-                         ((AbstractType) columnDefinition.cellValueType()).decompose(value),
-                         SimpleLivenessInfo.forUpdate(timestamp, LivenessInfo.NO_TTL, nowInSeconds, cfm),
-                         null);
+       builder.addCell(BufferCell.live(cfm, columnDefinition, timestamp, ((AbstractType) columnDefinition.cellValueType()).decompose(value)));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 6f20ccf..47bfa0c 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.BeforeClass;
@@ -333,7 +334,7 @@ public class ScrubTest
             writer.append(update.unfilteredIterator());
         }
         writer.finish(false);
-        */
+         */
 
         String root = System.getProperty("corrupt-sstable-root");
         assert root != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
index 3d13a22..d443b8c 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -24,6 +24,7 @@ package org.apache.cassandra.db.commitlog;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -131,7 +132,7 @@ public class CommitLogUpgradeTest
             {
                 for (Row row : update)
                 {
-                    for (Cell cell : row)
+                    for (Cell cell : row.cells())
                     {
                         hash = hash(hash, cell.value());
                         ++cells;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
new file mode 100644
index 0000000..1e5c23f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
@@ -0,0 +1,407 @@
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Slice.Bound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+import static org.junit.Assert.*;
+
+public class RowAndDeletionMergeIteratorTest
+{
+    private static final String KEYSPACE1 = "RowTest";
+    private static final String CF_STANDARD1 = "Standard1";
+
+    private int nowInSeconds;
+    private DecoratedKey dk;
+    private ColumnFamilyStore cfs;
+    private CFMetaData cfm;
+    private ColumnDefinition defA;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        CFMetaData cfMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)
+                                                  .addPartitionKey("key", AsciiType.instance)
+                                                  .addClusteringColumn("col1", Int32Type.instance)
+                                                  .addRegularColumn("a", Int32Type.instance)
+                                                  .build();
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    cfMetadata);
+
+    }
+
+    @Before
+    public void setup()
+    {
+        nowInSeconds = FBUtilities.nowInSeconds();
+        dk = Util.dk("key0");
+        cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+        cfm = cfs.metadata;
+        defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
+    }
+
+    @Test
+    public void testWithNoRangeTombstones()
+    {
+        Iterator<Row> rowIterator = createRowIterator();
+        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, Collections.emptyIterator(), false);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 0);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 1);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 2);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 3);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 4);
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testWithOnlyRangeTombstones()
+    {
+        int delTime = nowInSeconds + 1;
+        long timestamp = delTime * 1000;
+
+        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(rt(1, false, 3, false, timestamp, delTime),
+                                                                                       atLeast(4, timestamp, delTime));
+        UnfilteredRowIterator iterator = createMergeIterator(Collections.emptyIterator(), rangeTombstoneIterator, false);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 1);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_BOUND, 3);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_START_BOUND, 4);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.TOP);
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testWithAtMostRangeTombstone()
+    {
+        Iterator<Row> rowIterator = createRowIterator();
+
+        int delTime = nowInSeconds + 1;
+        long timestamp = delTime * 1000;
+
+        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime));
+
+        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 0);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 1);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 2);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 3);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 4);
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testWithGreaterThanRangeTombstone()
+    {
+        Iterator<Row> rowIterator = createRowIterator();
+
+        int delTime = nowInSeconds + 1;
+        long timestamp = delTime * 1000;
+
+        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(greaterThan(2, timestamp, delTime));
+
+        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 0);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 1);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 2);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 3);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 4);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.TOP);
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testWithAtMostAndGreaterThanRangeTombstone()
+    {
+        Iterator<Row> rowIterator = createRowIterator();
+
+        int delTime = nowInSeconds + 1;
+        long timestamp = delTime * 1000;
+
+        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime),
+                                                                                       greaterThan(2, timestamp, delTime));
+
+        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 0);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 1);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 2);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 3);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 4);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.TOP);
+
+        assertFalse(iterator.hasNext());
+    }
+
+    private void assertRtMarker(Unfiltered unfiltered, ClusteringPrefix.Kind kind, int col1)
+    {
+        assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind());
+        assertEquals(kind, unfiltered.clustering().kind());
+        assertEquals(bb(col1), unfiltered.clustering().get(0));
+    }
+
+    @Test
+    public void testWithIncludingEndExcludingStartMarker()
+    {
+        Iterator<Row> rowIterator = createRowIterator();
+
+        int delTime = nowInSeconds + 1;
+        long timestamp = delTime * 1000;
+
+        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(2, timestamp, delTime),
+                                                                                       greaterThan(2, timestamp, delTime));
+
+        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 0);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 1);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 2);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, 2);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 3);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 4);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.TOP);
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testWithExcludingEndIncludingStartMarker()
+    {
+        Iterator<Row> rowIterator = createRowIterator();
+
+        int delTime = nowInSeconds + 1;
+        long timestamp = delTime * 1000;
+
+        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(lessThan(2, timestamp, delTime),
+                                                                                       atLeast(2, timestamp, delTime));
+
+        UnfilteredRowIterator iterator = createMergeIterator(rowIterator, rangeTombstoneIterator, false);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.BOTTOM);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 0);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 1);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY, 2);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 2);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 3);
+
+        assertTrue(iterator.hasNext());
+        assertRow(iterator.next(), 4);
+
+        assertTrue(iterator.hasNext());
+        assertRtMarker(iterator.next(), Bound.TOP);
+
+        assertFalse(iterator.hasNext());
+    }
+
+    private void assertRtMarker(Unfiltered unfiltered, Bound bound)
+    {
+        assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind());
+        assertEquals(bound, unfiltered.clustering());
+    }
+
+    private void assertRow(Unfiltered unfiltered, int col1)
+    {
+        assertEquals(Unfiltered.Kind.ROW, unfiltered.kind());
+        assertEquals(cfm.comparator.make(col1), unfiltered.clustering());
+    }
+
+    private Iterator<RangeTombstone> createRangeTombstoneIterator(RangeTombstone... tombstones)
+    {
+        RangeTombstoneList list = new RangeTombstoneList(cfm.comparator, 10);
+
+        for (RangeTombstone tombstone : tombstones)
+            list.add(tombstone);
+
+        return list.iterator(Slice.ALL, false);
+    }
+
+    private Iterator<Row> createRowIterator()
+    {
+        PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1);
+        for (int i = 0; i < 5; i++)
+            addRow(update, i, i);
+
+        return update.iterator();
+    }
+
+    private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows, Iterator<RangeTombstone> tombstones, boolean reversed)
+    {
+        return new RowAndDeletionMergeIterator(cfm,
+                                               Util.dk("k"),
+                                               DeletionTime.LIVE,
+                                               ColumnFilter.all(cfm),
+                                               Rows.EMPTY_STATIC_ROW,
+                                               reversed,
+                                               RowStats.NO_STATS,
+                                               rows,
+                                               tombstones,
+                                               true);
+    }
+
+    private void addRow(PartitionUpdate update, int col1, int a)
+    {
+        update.add(ArrayBackedRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(cfm, defA, a, 0)));
+    }
+
+    private Cell makeCell(CFMetaData cfm, ColumnDefinition columnDefinition, int value, long timestamp)
+    {
+        return BufferCell.live(cfm, columnDefinition, timestamp, ((AbstractType)columnDefinition.cellValueType()).decompose(value));
+    }
+
+    private static RangeTombstone atLeast(int start, long tstamp, int delTime)
+    {
+        return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.TOP), new DeletionTime(tstamp, delTime));
+    }
+
+    private static RangeTombstone atMost(int end, long tstamp, int delTime)
+    {
+        return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
+    }
+
+    private static RangeTombstone lessThan(int end, long tstamp, int delTime)
+    {
+        return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.exclusiveEndOf(bb(end))), new DeletionTime(tstamp, delTime));
+    }
+
+    private static RangeTombstone greaterThan(int start, long tstamp, int delTime)
+    {
+        return new RangeTombstone(Slice.make(Slice.Bound.exclusiveStartOf(bb(start)), Slice.Bound.TOP), new DeletionTime(tstamp, delTime));
+    }
+
+    private static RangeTombstone rt(int start, boolean startInclusive, int end, boolean endInclusive, long tstamp, int delTime)
+    {
+        Slice.Bound startBound = startInclusive ? Slice.Bound.inclusiveStartOf(bb(start)) : Slice.Bound.exclusiveStartOf(bb(start));
+        Slice.Bound endBound = endInclusive ? Slice.Bound.inclusiveEndOf(bb(end)) : Slice.Bound.exclusiveEndOf(bb(end));
+
+        return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(tstamp, delTime));
+    }
+
+    private static ByteBuffer bb(int i)
+    {
+        return ByteBufferUtil.bytes(i);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/rows/RowAndTombstoneMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowAndTombstoneMergeIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/RowAndTombstoneMergeIteratorTest.java
deleted file mode 100644
index 88a6f7e..0000000
--- a/test/unit/org/apache/cassandra/db/rows/RowAndTombstoneMergeIteratorTest.java
+++ /dev/null
@@ -1,415 +0,0 @@
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.db.Slice.Bound;
-import org.apache.cassandra.db.ClusteringPrefix;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.db.SimpleDeletionTime;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.RangeTombstoneList;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.SimpleLivenessInfo;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.FBUtilities;
-import static org.junit.Assert.*;
-
-public class RowAndTombstoneMergeIteratorTest
-{
-    private static final String KEYSPACE1 = "RowTest";
-    private static final String CF_STANDARD1 = "Standard1";
-
-    private int nowInSeconds;
-    private DecoratedKey dk;
-    private ColumnFamilyStore cfs;
-    private CFMetaData cfm;
-    private ColumnDefinition defA;
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        CFMetaData cfMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)
-                                                  .addPartitionKey("key", AsciiType.instance)
-                                                  .addClusteringColumn("col1", Int32Type.instance)
-                                                  .addRegularColumn("a", Int32Type.instance)
-                                                  .build();
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    cfMetadata);
-
-    }
-
-    @Before
-    public void setup()
-    {
-        nowInSeconds = FBUtilities.nowInSeconds();
-        dk = Util.dk("key0");
-        cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
-        cfm = cfs.metadata;
-        defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
-    }
-
-    @Test
-    public void testWithNoRangeTombstones()
-    {
-        Iterator<Row> rowIterator = createRowIterator();
-
-        RowAndTombstoneMergeIterator iterator = new RowAndTombstoneMergeIterator(cfm.comparator, false)
-                                                          .setTo(rowIterator, Collections.emptyIterator());
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 0);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 1);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 2);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 3);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 4);
-
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void testWithOnlyRangeTombstones()
-    {
-        int delTime = nowInSeconds + 1;
-        long timestamp = delTime * 1000;
-
-        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(rt(1, false, 3, false, timestamp, delTime),
-                                                                                       atLeast(4, timestamp, delTime));
-        RowAndTombstoneMergeIterator iterator = new RowAndTombstoneMergeIterator(cfm.comparator, false)
-                                                          .setTo(Collections.emptyIterator(), rangeTombstoneIterator);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 1);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_BOUND, 3);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_START_BOUND, 4);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.TOP);
-
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void testWithAtMostRangeTombstone()
-    {
-        Iterator<Row> rowIterator = createRowIterator();
-
-        int delTime = nowInSeconds + 1;
-        long timestamp = delTime * 1000;
-
-        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime));
-
-        RowAndTombstoneMergeIterator iterator = new RowAndTombstoneMergeIterator(cfm.comparator, false)
-                                                          .setTo(rowIterator, rangeTombstoneIterator);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.BOTTOM);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 0);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 1);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 2);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 3);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 4);
-
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void testWithGreaterThanRangeTombstone()
-    {
-        Iterator<Row> rowIterator = createRowIterator();
-
-        int delTime = nowInSeconds + 1;
-        long timestamp = delTime * 1000;
-
-        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(greaterThan(2, timestamp, delTime));
-
-        RowAndTombstoneMergeIterator iterator = new RowAndTombstoneMergeIterator(cfm.comparator, false)
-                                                          .setTo(rowIterator, rangeTombstoneIterator);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 0);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 1);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 2);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 3);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 4);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.TOP);
-
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void testWithAtMostAndGreaterThanRangeTombstone()
-    {
-        Iterator<Row> rowIterator = createRowIterator();
-
-        int delTime = nowInSeconds + 1;
-        long timestamp = delTime * 1000;
-
-        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(0, timestamp, delTime),
-                                                                                       greaterThan(2, timestamp, delTime));
-
-        RowAndTombstoneMergeIterator iterator = new RowAndTombstoneMergeIterator(cfm.comparator, false)
-                                                          .setTo(rowIterator, rangeTombstoneIterator);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.BOTTOM);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 0);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_BOUND, 0);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 1);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 2);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_START_BOUND, 2);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 3);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 4);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.TOP);
-
-        assertFalse(iterator.hasNext());
-    }
-
-    private void assertRtMarker(Unfiltered unfiltered, ClusteringPrefix.Kind kind, int col1)    {
-        assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind());
-        assertEquals(kind, unfiltered.clustering().kind());
-        assertEquals(bb(col1), unfiltered.clustering().get(0));
-    }
-
-    @Test
-    public void testWithIncludingEndExcludingStartMarker()
-    {
-        Iterator<Row> rowIterator = createRowIterator();
-
-        int delTime = nowInSeconds + 1;
-        long timestamp = delTime * 1000;
-
-        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(atMost(2, timestamp, delTime),
-                                                                                       greaterThan(2, timestamp, delTime));
-
-        RowAndTombstoneMergeIterator iterator = new RowAndTombstoneMergeIterator(cfm.comparator, false)
-                                                          .setTo(rowIterator, rangeTombstoneIterator);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.BOTTOM);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 0);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 1);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 2);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, 2);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 3);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 4);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.TOP);
-
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void testWithExcludingEndIncludingStartMarker()
-    {
-        Iterator<Row> rowIterator = createRowIterator();
-
-        int delTime = nowInSeconds + 1;
-        long timestamp = delTime * 1000;
-
-        Iterator<RangeTombstone> rangeTombstoneIterator = createRangeTombstoneIterator(lessThan(2, timestamp, delTime),
-                                                                                       atLeast(2, timestamp, delTime));
-
-        RowAndTombstoneMergeIterator iterator = new RowAndTombstoneMergeIterator(cfm.comparator, false)
-                                                          .setTo(rowIterator, rangeTombstoneIterator);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.BOTTOM);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 0);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 1);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY, 2);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 2);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 3);
-
-        assertTrue(iterator.hasNext());
-        assertRow(iterator.next(), 4);
-
-        assertTrue(iterator.hasNext());
-        assertRtMarker(iterator.next(), Bound.TOP);
-
-        assertFalse(iterator.hasNext());
-    }
-
-    private void assertRtMarker(Unfiltered unfiltered, Bound bound)
-    {
-        assertEquals(Unfiltered.Kind.RANGE_TOMBSTONE_MARKER, unfiltered.kind());
-        assertEquals(bound, unfiltered.clustering());
-    }
-
-    private void assertRow(Unfiltered unfiltered, int col1)
-    {
-        assertEquals(Unfiltered.Kind.ROW, unfiltered.kind());
-        assertEquals(cfm.comparator.make(col1), unfiltered.clustering());
-    }
-
-    private Iterator<RangeTombstone> createRangeTombstoneIterator(RangeTombstone... tombstones)
-    {
-        RangeTombstoneList list = new RangeTombstoneList(cfm.comparator, 10);
-
-        for (RangeTombstone tombstone : tombstones)
-            list.add(tombstone);
-
-        return list.iterator(Slice.ALL, false);
-    }
-
-    private Iterator<Row> createRowIterator()
-    {
-        PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1);
-        for (int i = 0; i < 5; i++)
-            addRow(update, i, i);
-
-        return update.iterator();
-    }
-
-    private void addRow(PartitionUpdate update, int col1, int a)
-    {
-        Rows.writeClustering(update.metadata().comparator.make(col1), update.writer());
-        writeSimpleCellValue(update.writer(), cfm, defA, a, 0, nowInSeconds);
-        update.writer().endOfRow();
-    }
-
-    private void writeSimpleCellValue(Row.Writer writer,
-                                      CFMetaData cfm,
-                                      ColumnDefinition columnDefinition,
-                                      int value,
-                                      long timestamp,
-                                      int nowInSeconds)
-    {
-        writer.writeCell(columnDefinition,
-                         false,
-                         ((AbstractType) columnDefinition.cellValueType()).decompose(value),
-                         SimpleLivenessInfo.forUpdate(timestamp, LivenessInfo.NO_TTL, nowInSeconds, cfm),
-                         null);
-    }
-
-    private static RangeTombstone atLeast(int start, long tstamp, int delTime)
-    {
-        return new RangeTombstone(Slice.make(Slice.Bound.inclusiveStartOf(bb(start)), Slice.Bound.TOP), new SimpleDeletionTime(tstamp, delTime));
-    }
-
-    private static RangeTombstone atMost(int end, long tstamp, int delTime)
-    {
-        return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(bb(end))), new SimpleDeletionTime(tstamp, delTime));
-    }
-
-    private static RangeTombstone lessThan(int end, long tstamp, int delTime)
-    {
-        return new RangeTombstone(Slice.make(Slice.Bound.BOTTOM, Slice.Bound.exclusiveEndOf(bb(end))), new SimpleDeletionTime(tstamp, delTime));
-    }
-
-    private static RangeTombstone greaterThan(int start, long tstamp, int delTime)
-    {
-        return new RangeTombstone(Slice.make(Slice.Bound.exclusiveStartOf(bb(start)), Slice.Bound.TOP), new SimpleDeletionTime(tstamp, delTime));
-    }
-
-    private static RangeTombstone rt(int start, boolean startInclusive, int end, boolean endInclusive, long tstamp, int delTime)
-    {
-        Slice.Bound startBound = startInclusive ? Slice.Bound.inclusiveStartOf(bb(start)) : Slice.Bound.exclusiveStartOf(bb(start));
-        Slice.Bound endBound = endInclusive ? Slice.Bound.inclusiveEndOf(bb(end)) : Slice.Bound.exclusiveEndOf(bb(end));
-
-        return new RangeTombstone(Slice.make(startBound, endBound), new SimpleDeletionTime(tstamp, delTime));
-    }
-
-    private static ByteBuffer bb(int i)
-    {
-        return ByteBufferUtil.bytes(i);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index a607dca..e869c72 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -138,7 +138,7 @@ public class UnfilteredRowIteratorsMergeTest
     {
         List<UnfilteredRowIterator> us = sources.stream().map(l -> new Source(l.iterator())).collect(Collectors.toList());
         List<Unfiltered> merged = new ArrayList<>();
-        Iterators.addAll(merged, safeIterator(mergeIterators(us, iterations)));
+        Iterators.addAll(merged, mergeIterators(us, iterations));
         return merged;
     }
 
@@ -197,7 +197,7 @@ public class UnfilteredRowIteratorsMergeTest
                     includesEnd = r.nextBoolean();
                 }
                 int deltime = r.nextInt(DEL_RANGE);
-                DeletionTime dt = new SimpleDeletionTime(deltime, deltime);
+                DeletionTime dt = new DeletionTime(deltime, deltime);
                 content.add(new RangeTombstoneBoundMarker(boundFor(pos, true, includesStart), dt));
                 content.add(new RangeTombstoneBoundMarker(boundFor(pos + span, false, includesEnd), dt));
                 prev = pos + span - (includesEnd ? 0 : 1);
@@ -365,173 +365,16 @@ public class UnfilteredRowIteratorsMergeTest
         return Bound.create(Bound.boundKind(start, inclusive), new ByteBuffer[] {Int32Type.instance.decompose(pos)});
     }
 
-    private static SimpleClustering clusteringFor(int i)
+    private static Clustering clusteringFor(int i)
     {
-        return new SimpleClustering(Int32Type.instance.decompose(i));
+        return new Clustering(Int32Type.instance.decompose(i));
     }
 
     static Row emptyRowAt(int pos, Function<Integer, Integer> timeGenerator)
     {
         final Clustering clustering = clusteringFor(pos);
-        final LivenessInfo live = SimpleLivenessInfo.forUpdate(timeGenerator.apply(pos), 0, nowInSec, metadata);
-        return emptyRowAt(clustering, live, DeletionTime.LIVE);
-    }
-
-    public static class TestCell extends AbstractCell
-    {
-        private final ColumnDefinition column;
-        private final ByteBuffer value;
-        private final LivenessInfo info;
-
-        public TestCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info)
-        {
-            this.column = column;
-            this.value = value;
-            this.info = info.takeAlias();
-        }
-
-        @Override
-        public ColumnDefinition column()
-        {
-            return column;
-        }
-
-        @Override
-        public boolean isCounterCell()
-        {
-            return false;
-        }
-
-        @Override
-        public ByteBuffer value()
-        {
-            return value;
-        }
-
-        @Override
-        public LivenessInfo livenessInfo()
-        {
-            return info;
-        }
-
-        @Override
-        public CellPath path()
-        {
-            return null;
-        }
-    }
-
-    static Row emptyRowAt(final Clustering clustering, final LivenessInfo live, final DeletionTime deletion)
-    {
-        final ColumnDefinition columnDef = metadata.getColumnDefinition(new ColumnIdentifier("data", true));
-        final Cell cell = new TestCell(columnDef, clustering.get(0), live);
-
-        return new AbstractRow()
-        {
-            @Override
-            public Columns columns()
-            {
-                return Columns.of(columnDef);
-            }
-
-            @Override
-            public LivenessInfo primaryKeyLivenessInfo()
-            {
-                return live;
-            }
-
-            @Override
-            public DeletionTime deletion()
-            {
-                return deletion;
-            }
-
-            @Override
-            public boolean isEmpty()
-            {
-                return true;
-            }
-
-            @Override
-            public boolean hasComplexDeletion()
-            {
-                return false;
-            }
-
-            @Override
-            public Clustering clustering()
-            {
-                return clustering;
-            }
-
-            @Override
-            public Cell getCell(ColumnDefinition c)
-            {
-                return c == columnDef ? cell : null;
-            }
-
-            @Override
-            public Cell getCell(ColumnDefinition c, CellPath path)
-            {
-                return null;
-            }
-
-            @Override
-            public Iterator<Cell> getCells(ColumnDefinition c)
-            {
-                return Iterators.singletonIterator(cell);
-            }
-
-            @Override
-            public DeletionTime getDeletion(ColumnDefinition c)
-            {
-                return DeletionTime.LIVE;
-            }
-
-            @Override
-            public Iterator<Cell> iterator()
-            {
-                return Iterators.<Cell>emptyIterator();
-            }
-
-            @Override
-            public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
-            {
-                return new SearchIterator<ColumnDefinition, ColumnData>()
-                {
-                    @Override
-                    public boolean hasNext()
-                    {
-                        return false;
-                    }
-
-                    @Override
-                    public ColumnData next(ColumnDefinition column)
-                    {
-                        return null;
-                    }
-                };
-            }
-
-            @Override
-            public Kind kind()
-            {
-                return Unfiltered.Kind.ROW;
-            }
-
-            @Override
-            public Row takeAlias()
-            {
-                return this;
-            }
-
-            @Override
-            public String toString()
-            {
-                return Int32Type.instance.getString(clustering.get(0));
-            }
-        };
-
+        final LivenessInfo live = LivenessInfo.create(metadata, timeGenerator.apply(pos), nowInSec);
+        return ArrayBackedRow.noCellLiveRow(clustering, live);
     }
 
     private void dumpList(List<Unfiltered> list)
@@ -580,33 +423,6 @@ public class UnfilteredRowIteratorsMergeTest
         }
     }
 
-    static RangeTombstoneMarker safeMarker(RangeTombstoneMarker marker)
-    {
-        RangeTombstoneMarker.Builder writer = new RangeTombstoneMarker.Builder(1);
-        marker.copyTo(writer);
-        return writer.build();
-    }
-
-    private static Row safeRow(Row row)
-    {
-        return emptyRowAt(new SimpleClustering(row.clustering().get(0)), row.primaryKeyLivenessInfo(), row.deletion());
-    }
-    
-    public static UnfilteredRowIterator safeIterator(UnfilteredRowIterator iterator)
-    {
-        return new WrappingUnfilteredRowIterator(iterator)
-        {
-            @Override
-            public Unfiltered next()
-            {
-                Unfiltered next = super.next();
-                return next.kind() == Unfiltered.Kind.ROW
-                     ? safeRow((Row) next)
-                     : safeMarker((RangeTombstoneMarker) next);
-            }
-        };
-    }
-
     public void testForInput(String... inputs)
     {
         List<List<Unfiltered>> sources = new ArrayList<>();
@@ -674,6 +490,6 @@ public class UnfilteredRowIteratorsMergeTest
     {
         return new RangeTombstoneBoundMarker(Bound.create(Bound.boundKind(isStart, inclusive),
                                                           new ByteBuffer[] {clusteringFor(pos).get(0)}),
-                                             new SimpleDeletionTime(delTime, delTime));
+                                             new DeletionTime(delTime, delTime));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
index f4f7de3..c9f268a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.ClusteringPrefix;
-import org.apache.cassandra.db.SimpleDeletionTime;
+import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.LongType;
@@ -48,7 +48,7 @@ public class IndexHelperTest
     @Test
     public void testIndexHelper()
     {
-        SimpleDeletionTime deletionInfo = new SimpleDeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
+        DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
 
         List<IndexInfo> indexes = new ArrayList<>();
         indexes.add(new IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index f8644bb..5a7c074 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -28,8 +28,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.SimpleClustering;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.dht.Range;
@@ -107,7 +106,7 @@ public class SSTableLoaderTest
 
         assertEquals(1, partitions.size());
         assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
-        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(new SimpleClustering(ByteBufferUtil.bytes("col1")))
+        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(new Clustering(ByteBufferUtil.bytes("col1")))
                                                                    .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
                                                                    .value());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 4ffdcd7..13a371c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -501,7 +501,7 @@ public class SSTableReaderTest
                 public void run()
                 {
                     Row row = Util.getOnlyRowUnfiltered(Util.cmd(store, key).build());
-                    assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), row.iterator().next().value()));
+                    assertEquals(0, ByteBufferUtil.compare(String.format("%3d", index).getBytes(), row.cells().iterator().next().value()));
                 }
             }));