You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/18 18:16:02 UTC

[1/4] Include a timestamp with all read commands to determine column expiration

Updated Branches:
  refs/heads/trunk 62295f68c -> 1f7628ce7


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index 87700fe..cc15fe7 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -48,24 +48,25 @@ public class ReadMessageTest extends SchemaLoader
 
         ReadCommand rm, rm2;
         DecoratedKey dk = Util.dk("row1");
+        long ts = System.currentTimeMillis();
 
-        rm = new SliceByNamesReadCommand("Keyspace1", dk.key, "Standard1", new NamesQueryFilter(colList));
+        rm = new SliceByNamesReadCommand("Keyspace1", dk.key, "Standard1", ts, new NamesQueryFilter(colList));
         rm2 = serializeAndDeserializeReadMessage(rm);
         assert rm2.toString().equals(rm.toString());
 
-        rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2));
+        rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", ts, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2));
         rm2 = serializeAndDeserializeReadMessage(rm);
         assert rm2.toString().equals(rm.toString());
 
-        rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", new SliceQueryFilter(ByteBufferUtil.bytes("a"), ByteBufferUtil.bytes("z"), true, 5));
+        rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", ts, new SliceQueryFilter(ByteBufferUtil.bytes("a"), ByteBufferUtil.bytes("z"), true, 5));
         rm2 = serializeAndDeserializeReadMessage(rm);
         assertEquals(rm2.toString(), rm.toString());
 
-        rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2));
+        rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", ts, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2));
         rm2 = serializeAndDeserializeReadMessage(rm);
         assert rm2.toString().equals(rm.toString());
 
-        rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", new SliceQueryFilter(ByteBufferUtil.bytes("a"), ByteBufferUtil.bytes("z"), true, 5));
+        rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", ts, new SliceQueryFilter(ByteBufferUtil.bytes("a"), ByteBufferUtil.bytes("z"), true, 5));
         rm2 = serializeAndDeserializeReadMessage(rm);
         assertEquals(rm2.toString(), rm.toString());
     }
@@ -93,7 +94,7 @@ public class ReadMessageTest extends SchemaLoader
         rm.add("Standard1", ByteBufferUtil.bytes("Column1"), ByteBufferUtil.bytes("abcd"), 0);
         rm.apply();
 
-        ReadCommand command = new SliceByNamesReadCommand("Keyspace1", dk.key, "Standard1", new NamesQueryFilter(ByteBufferUtil.bytes("Column1")));
+        ReadCommand command = new SliceByNamesReadCommand("Keyspace1", dk.key, "Standard1", System.currentTimeMillis(), new NamesQueryFilter(ByteBufferUtil.bytes("Column1")));
         Row row = command.getRow(table);
         Column col = row.cf.getColumn(ByteBufferUtil.bytes("Column1"));
         assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 38b3385..5e34e91 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -72,8 +72,10 @@ public class RecoveryManagerTruncateTest extends SchemaLoader
 		{
 			return null;
 		}
-		cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(
-		        Util.dk(keyName), cfName, ByteBufferUtil.bytes(columnName)));
+		cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk(keyName),
+                                                                cfName,
+                                                                ByteBufferUtil.bytes(columnName),
+                                                                System.currentTimeMillis()));
 		if (cf == null)
 		{
 			return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
index fb42361..345f967 100644
--- a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
@@ -51,7 +51,7 @@ public class RemoveColumnFamilyTest extends SchemaLoader
         rm.delete("Standard1", 1);
         rm.apply();
 
-        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1"));
+        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1", System.currentTimeMillis()));
         assert retrieved.isMarkedForDelete();
         assertNull(retrieved.getColumn(ByteBufferUtil.bytes("Column1")));
         assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
index b32b3a9..bd97e00 100644
--- a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
+++ b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
@@ -53,7 +53,7 @@ public class RemoveColumnFamilyWithFlush1Test extends SchemaLoader
         rm.delete("Standard1", 1);
         rm.apply();
 
-        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1"));
+        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1", System.currentTimeMillis()));
         assert retrieved.isMarkedForDelete();
         assertNull(retrieved.getColumn(ByteBufferUtil.bytes("Column1")));
         assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
index d59d09b..037c5dd 100644
--- a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
+++ b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
@@ -51,7 +51,7 @@ public class RemoveColumnFamilyWithFlush2Test extends SchemaLoader
         rm.apply();
         store.forceBlockingFlush();
 
-        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1"));
+        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1", System.currentTimeMillis()));
         assert retrieved.isMarkedForDelete();
         assertNull(retrieved.getColumn(ByteBufferUtil.bytes("Column1")));
         assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveColumnTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveColumnTest.java b/test/unit/org/apache/cassandra/db/RemoveColumnTest.java
index 4c3f3aa..eccbfbc 100644
--- a/test/unit/org/apache/cassandra/db/RemoveColumnTest.java
+++ b/test/unit/org/apache/cassandra/db/RemoveColumnTest.java
@@ -54,10 +54,16 @@ public class RemoveColumnTest extends SchemaLoader
         rm.delete("Standard1", ByteBufferUtil.bytes("Column1"), 1);
         rm.apply();
 
-        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getNamesFilter(dk, "Standard1", ByteBufferUtil.bytes("Column1")));
-        assert retrieved.getColumn(ByteBufferUtil.bytes("Column1")).isMarkedForDelete();
+        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getNamesFilter(dk,
+                                                                                  "Standard1",
+                                                                                  ByteBufferUtil.bytes("Column1"),
+                                                                                  System.currentTimeMillis()));
+        assert retrieved.getColumn(ByteBufferUtil.bytes("Column1")).isMarkedForDelete(System.currentTimeMillis());
         assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
-        assertNull(Util.cloneAndRemoveDeleted(store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1")), Integer.MAX_VALUE));
+        assertNull(Util.cloneAndRemoveDeleted(store.getColumnFamily(QueryFilter.getIdentityFilter(dk,
+                                                                                                  "Standard1",
+                                                                                                  System.currentTimeMillis())),
+                                              Integer.MAX_VALUE));
     }
 
     @Test
@@ -67,15 +73,15 @@ public class RemoveColumnTest extends SchemaLoader
         long timestamp = System.currentTimeMillis();
         int localDeletionTime = (int) (timestamp / 1000);
         Column c = DeletedColumn.create(localDeletionTime, timestamp, "dc1");
-        assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete());
+        assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete(timestamp));
 
         // Simulate a node that is 30 seconds behind
         c = DeletedColumn.create(localDeletionTime + 30, timestamp + 30000, "dc2");
-        assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete());
+        assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete(timestamp));
 
         // Simulate a node that is 30 ahead behind
         c = DeletedColumn.create(localDeletionTime - 30, timestamp - 30000, "dc3");
-        assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete());
+        assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete(timestamp));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java b/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java
index 08971a7..d88ee60 100644
--- a/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java
+++ b/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java
@@ -58,8 +58,8 @@ public class RemoveSubColumnTest extends SchemaLoader
         rm.delete("Super1", cname, 1);
         rm.apply();
 
-        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Super1"));
-        assert retrieved.getColumn(cname).isMarkedForDelete();
+        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Super1", System.currentTimeMillis()));
+        assert retrieved.getColumn(cname).isMarkedForDelete(System.currentTimeMillis());
         assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
     }
 
@@ -86,7 +86,7 @@ public class RemoveSubColumnTest extends SchemaLoader
 
         // Mark current time and make sure the next insert happens at least
         // one second after the previous one (since gc resolution is the second)
-        int gcbefore = (int)(System.currentTimeMillis() / 1000);
+        QueryFilter filter = QueryFilter.getIdentityFilter(dk, "Super1", System.currentTimeMillis());
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
 
         // remove the column itself
@@ -94,8 +94,8 @@ public class RemoveSubColumnTest extends SchemaLoader
         rm.delete("Super1", cname, 2);
         rm.apply();
 
-        ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Super1"), gcbefore);
-        assert retrieved.getColumn(cname).isMarkedForDelete();
+        ColumnFamily retrieved = store.getColumnFamily(filter);
+        assert retrieved.getColumn(cname).isMarkedForDelete(System.currentTimeMillis());
         assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 3dfef4d..11fe86c 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -62,12 +62,22 @@ public class RowCacheTest extends SchemaLoader
         {
             DecoratedKey key = Util.dk("key" + i);
 
-            cachedStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+            cachedStore.getColumnFamily(key,
+                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                        false,
+                                        1,
+                                        System.currentTimeMillis());
             assert CacheService.instance.rowCache.size() == i + 1;
             assert cachedStore.containsCachedRow(key); // current key should be stored in the cache
 
             // checking if column is read correctly after cache
-            ColumnFamily cf = cachedStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+            ColumnFamily cf = cachedStore.getColumnFamily(key,
+                                                          ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                          ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                          false,
+                                                          1,
+                                                          System.currentTimeMillis());
             Collection<Column> columns = cf.getSortedColumns();
 
             Column column = columns.iterator().next();
@@ -84,11 +94,21 @@ public class RowCacheTest extends SchemaLoader
         {
             DecoratedKey key = Util.dk("key" + i);
 
-            cachedStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+            cachedStore.getColumnFamily(key,
+                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                        false,
+                                        1,
+                                        System.currentTimeMillis());
             assert cachedStore.containsCachedRow(key); // cache should be populated with the latest rows read (old ones should be popped)
 
             // checking if column is read correctly after cache
-            ColumnFamily cf = cachedStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+            ColumnFamily cf = cachedStore.getColumnFamily(key,
+                                                          ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                          ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                          false,
+                                                          1,
+                                                          System.currentTimeMillis());
             Collection<Column> columns = cf.getSortedColumns();
 
             Column column = columns.iterator().next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 5babea5..e45977a 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -67,12 +67,12 @@ public class RowTest extends SchemaLoader
     public void testExpiringColumnExpiration()
     {
         Column c = new ExpiringColumn(ByteBufferUtil.bytes("one"), ByteBufferUtil.bytes("A"), 0, 1);
-        assert !c.isMarkedForDelete();
+        assert !c.isMarkedForDelete(System.currentTimeMillis());
 
         // Because we keep the local deletion time with a precision of a
         // second, we could have to wait 2 seconds in worst case scenario.
         Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
 
-        assert c.isMarkedForDelete() && c.getMarkedForDeleteAt() == 0;
+        assert c.isMarkedForDelete(System.currentTimeMillis()) && c.getMarkedForDeleteAt() == 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index 267353f..728a03c 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -68,17 +68,17 @@ public class SerializationsTest extends AbstractSerializationsTester
         IPartitioner part = StorageService.getPartitioner();
         AbstractBounds<RowPosition> bounds = new Range<Token>(part.getRandomToken(), part.getRandomToken()).toRowBounds();
 
-        RangeSliceCommand namesCmd = new RangeSliceCommand(statics.KS, "Standard1", namesPred, bounds, 100);
+        RangeSliceCommand namesCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, namesPred, bounds, 100);
         MessageOut<RangeSliceCommand> namesCmdMsg = namesCmd.createMessage();
-        RangeSliceCommand emptyRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", emptyRangePred, bounds, 100);
+        RangeSliceCommand emptyRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, emptyRangePred, bounds, 100);
         MessageOut<RangeSliceCommand> emptyRangeCmdMsg = emptyRangeCmd.createMessage();
-        RangeSliceCommand regRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", nonEmptyRangePred, bounds, 100);
+        RangeSliceCommand regRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, nonEmptyRangePred, bounds, 100);
         MessageOut<RangeSliceCommand> regRangeCmdMsg = regRangeCmd.createMessage();
-        RangeSliceCommand namesCmdSup = new RangeSliceCommand(statics.KS, "Super1", namesSCPred, bounds, 100);
+        RangeSliceCommand namesCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, namesSCPred, bounds, 100);
         MessageOut<RangeSliceCommand> namesCmdSupMsg = namesCmdSup.createMessage();
-        RangeSliceCommand emptyRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", emptyRangePred, bounds, 100);
+        RangeSliceCommand emptyRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, emptyRangePred, bounds, 100);
         MessageOut<RangeSliceCommand> emptyRangeCmdSupMsg = emptyRangeCmdSup.createMessage();
-        RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", nonEmptyRangeSCPred, bounds, 100);
+        RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100);
         MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage();
 
         DataOutputStream out = getOutput("db.RangeSliceCommand.bin");
@@ -113,8 +113,8 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testSliceByNamesReadCommandWrite() throws IOException
     {
-        SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, namesPred);
-        SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, namesSCPred);
+        SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, namesPred);
+        SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, namesSCPred);
 
         DataOutputStream out = getOutput("db.SliceByNamesReadCommand.bin");
         SliceByNamesReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -148,8 +148,8 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testSliceFromReadCommandWrite() throws IOException
     {
-        SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, nonEmptyRangePred);
-        SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, nonEmptyRangeSCPred);
+        SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, nonEmptyRangePred);
+        SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, nonEmptyRangeSCPred);
         
         DataOutputStream out = getOutput("db.SliceFromReadCommand.bin");
         SliceFromReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -357,8 +357,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         }};
         private final String StandardCF = "Standard1";
         private final String SuperCF = "Super1";
-        private final ByteBuffer Start = ByteBufferUtil.bytes("Start");
-        private final ByteBuffer Stop = ByteBufferUtil.bytes("Stop");
+
+        private final long readTs = 1369935512292L;
 
         private final ColumnFamily StandardCf = TreeMapBackedSortedColumns.factory.create(KS, StandardCF);
         private final ColumnFamily SuperCf = TreeMapBackedSortedColumns.factory.create(KS, SuperCF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/TableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableTest.java b/test/unit/org/apache/cassandra/db/TableTest.java
index 8db1769..0c3b44d 100644
--- a/test/unit/org/apache/cassandra/db/TableTest.java
+++ b/test/unit/org/apache/cassandra/db/TableTest.java
@@ -81,13 +81,25 @@ public class TableTest extends SchemaLoader
             {
                 ColumnFamily cf;
 
-                cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY, "Standard3", new TreeSet<ByteBuffer>()));
+                cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY,
+                                                                        "Standard3",
+                                                                        new TreeSet<ByteBuffer>(),
+                                                                        System.currentTimeMillis()));
                 assertColumns(cf);
 
-                cf = cfStore.getColumnFamily(QueryFilter.getSliceFilter(TEST_KEY, "Standard3", ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 0));
+                cf = cfStore.getColumnFamily(QueryFilter.getSliceFilter(TEST_KEY,
+                                                                        "Standard3",
+                                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                                        false,
+                                                                        0,
+                                                                        System.currentTimeMillis()));
                 assertColumns(cf);
 
-                cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY, "Standard3", ByteBufferUtil.bytes("col99")));
+                cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY,
+                                                                        "Standard3",
+                                                                        ByteBufferUtil.bytes("col99"),
+                                                                        System.currentTimeMillis()));
                 assertColumns(cf);
             }
         };
@@ -113,10 +125,16 @@ public class TableTest extends SchemaLoader
             {
                 ColumnFamily cf;
 
-                cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY, "Standard1", ByteBufferUtil.bytes("col1")));
+                cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY,
+                                                                        "Standard1",
+                                                                        ByteBufferUtil.bytes("col1"),
+                                                                        System.currentTimeMillis()));
                 assertColumns(cf, "col1");
 
-                cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY, "Standard1", ByteBufferUtil.bytes("col3")));
+                cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY,
+                                                                        "Standard1",
+                                                                        ByteBufferUtil.bytes("col3"),
+                                                                        System.currentTimeMillis()));
                 assertColumns(cf, "col3");
             }
         };
@@ -137,13 +155,13 @@ public class TableTest extends SchemaLoader
         RowMutation rm = new RowMutation("Keyspace1", key.key, cf);
         rm.apply();
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("c"), false, 100);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("c"), false, 100, System.currentTimeMillis());
         assertEquals(2, cf.getColumnCount());
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("b"), false, 100);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("b"), false, 100, System.currentTimeMillis());
         assertEquals(1, cf.getColumnCount());
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("c"), false, 1);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("c"), false, 1, System.currentTimeMillis());
         assertEquals(1, cf.getColumnCount());
     }
 
@@ -193,30 +211,30 @@ public class TableTest extends SchemaLoader
                 assert DatabaseDescriptor.getColumnIndexSize() == 4096 : "Unexpected column index size, block boundaries won't be where tests expect them.";
 
                 // test forward, spanning a segment.
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col096"), ByteBufferUtil.bytes("col099"), false, 4);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col096"), ByteBufferUtil.bytes("col099"), false, 4, System.currentTimeMillis());
                 assertColumns(cf, "col096", "col097", "col098", "col099");
 
                 // test reversed, spanning a segment.
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col099"), ByteBufferUtil.bytes("col096"), true, 4);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col099"), ByteBufferUtil.bytes("col096"), true, 4, System.currentTimeMillis());
                 assertColumns(cf, "col096", "col097", "col098", "col099");
 
                 // test forward, within a segment.
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col100"), ByteBufferUtil.bytes("col103"), false, 4);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col100"), ByteBufferUtil.bytes("col103"), false, 4, System.currentTimeMillis());
                 assertColumns(cf, "col100", "col101", "col102", "col103");
 
                 // test reversed, within a segment.
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col103"), ByteBufferUtil.bytes("col100"), true, 4);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col103"), ByteBufferUtil.bytes("col100"), true, 4, System.currentTimeMillis());
                 assertColumns(cf, "col100", "col101", "col102", "col103");
 
                 // test forward from beginning, spanning a segment.
                 String[] strCols = new String[100]; // col000-col099
                 for (int i = 0; i < 100; i++)
                     strCols[i] = "col" + fmt.format(i);
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.bytes("col099"), false, 100);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.bytes("col099"), false, 100, System.currentTimeMillis());
                 assertColumns(cf, strCols);
 
                 // test reversed, from end, spanning a segment.
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.bytes("col288"), true, 12);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.bytes("col288"), true, 12, System.currentTimeMillis());
                 assertColumns(cf, "col288", "col289", "col290", "col291", "col292", "col293", "col294", "col295", "col296", "col297", "col298", "col299");
             }
         };
@@ -248,7 +266,7 @@ public class TableTest extends SchemaLoader
             RowMutation rm = new RowMutation("Keyspace1", ROW.key, cf);
             rm.apply();
 
-            cf = cfs.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 1);
+            cf = cfs.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 1, System.currentTimeMillis());
             assertEquals(1, Iterables.size(cf.getColumnNames()));
             assertEquals(i, cf.getColumnNames().iterator().next().getLong());
         }
@@ -260,11 +278,11 @@ public class TableTest extends SchemaLoader
         ColumnFamily cf;
 
         // key before the rows that exists
-        cf = cfStore.getColumnFamily(Util.dk("a"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+        cf = cfStore.getColumnFamily(Util.dk("a"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1, System.currentTimeMillis());
         assertColumns(cf);
 
         // key after the rows that exist
-        cf = cfStore.getColumnFamily(Util.dk("z"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+        cf = cfStore.getColumnFamily(Util.dk("z"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1, System.currentTimeMillis());
         assertColumns(cf);
     }
 
@@ -296,26 +314,26 @@ public class TableTest extends SchemaLoader
             {
                 ColumnFamily cf;
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col5"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col5"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2, System.currentTimeMillis());
                 assertColumns(cf, "col5", "col7");
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col4"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col4"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2, System.currentTimeMillis());
                 assertColumns(cf, "col4", "col5", "col7");
                 assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE), "col5", "col7");
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col5"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col5"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2, System.currentTimeMillis());
                 assertColumns(cf, "col3", "col4", "col5");
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col6"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col6"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2, System.currentTimeMillis());
                 assertColumns(cf, "col3", "col4", "col5");
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2, System.currentTimeMillis());
                 assertColumns(cf, "col7", "col9");
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col95"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col95"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2, System.currentTimeMillis());
                 assertColumns(cf);
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2, System.currentTimeMillis());
                 assertColumns(cf);
             }
         };
@@ -344,11 +362,11 @@ public class TableTest extends SchemaLoader
             {
                 ColumnFamily cf;
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2, System.currentTimeMillis());
                 assertColumns(cf, "col1", "col2");
                 assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE), "col1");
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1, System.currentTimeMillis());
                 assertColumns(cf, "col2");
                 assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE));
             }
@@ -389,7 +407,7 @@ public class TableTest extends SchemaLoader
             {
                 ColumnFamily cf;
 
-                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+                cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
                 assertColumns(cf, "col2", "col3", "col4");
 
                 ByteBuffer col = cf.getColumn(ByteBufferUtil.bytes("col2")).value();
@@ -454,7 +472,7 @@ public class TableTest extends SchemaLoader
             cfStore.forceBlockingFlush();
         }
         cfStore.metric.sstablesPerReadHistogram.clear();
-        ColumnFamily cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes(""), ByteBufferUtil.bytes("col1499"), false, 1000);
+        ColumnFamily cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes(""), ByteBufferUtil.bytes("col1499"), false, 1000, System.currentTimeMillis());
         assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
         int i = 0;
         for (Column c : cf.getSortedColumns())
@@ -463,7 +481,7 @@ public class TableTest extends SchemaLoader
         }
         assertEquals(i, 500);
         cfStore.metric.sstablesPerReadHistogram.clear();
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1500"), ByteBufferUtil.bytes("col2000"), false, 1000);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1500"), ByteBufferUtil.bytes("col2000"), false, 1000, System.currentTimeMillis());
         assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
 
         for (Column c : cf.getSortedColumns())
@@ -474,7 +492,7 @@ public class TableTest extends SchemaLoader
 
         // reverse
         cfStore.metric.sstablesPerReadHistogram.clear();
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col2000"), ByteBufferUtil.bytes("col1500"), true, 1000);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col2000"), ByteBufferUtil.bytes("col1500"), true, 1000, System.currentTimeMillis());
         assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
         i = 500;
         for (Column c : cf.getSortedColumns())
@@ -523,7 +541,7 @@ public class TableTest extends SchemaLoader
         ByteBuffer start = ct.builder().add(ByteBufferUtil.bytes("a5")).add(ByteBufferUtil.bytes(85)).build();
         ByteBuffer finish = ct.builder().add(ByteBufferUtil.bytes("a5")).buildAsEndOfRange();
         cfs.metric.sstablesPerReadHistogram.clear();
-        ColumnFamily cf = cfs.getColumnFamily(key, start, finish, false, 1000);
+        ColumnFamily cf = cfs.getColumnFamily(key, start, finish, false, 1000, System.currentTimeMillis());
         int colCount = 0;
         for (Column c : cf)
             colCount++;
@@ -535,7 +553,7 @@ public class TableTest extends SchemaLoader
     {
         DecoratedKey key = Util.dk("row3");
         ColumnFamily cf;
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
         assertColumns(cf, "col1000", "col1001", "col1002");
 
         ByteBuffer col;
@@ -546,7 +564,7 @@ public class TableTest extends SchemaLoader
         col = cf.getColumn(ByteBufferUtil.bytes("col1002")).value();
         assertEquals(ByteBufferUtil.string(col), "v1002");
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1195"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1195"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
         assertColumns(cf, "col1195", "col1196", "col1197");
 
         col = cf.getColumn(ByteBufferUtil.bytes("col1195")).value();
@@ -557,7 +575,7 @@ public class TableTest extends SchemaLoader
         assertEquals(ByteBufferUtil.string(col), "v1197");
 
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1996"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 1000);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1996"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 1000, System.currentTimeMillis());
         Column[] columns = cf.getSortedColumns().toArray(new Column[0]);
         for (int i = 1000; i < 1996; i++)
         {
@@ -567,7 +585,7 @@ public class TableTest extends SchemaLoader
             assertEquals(ByteBufferUtil.string(column.value()), ("v" + i));
         }
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1990"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1990"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
         assertColumns(cf, "col1990", "col1991", "col1992");
         col = cf.getColumn(ByteBufferUtil.bytes("col1990")).value();
         assertEquals(ByteBufferUtil.string(col), "v1990");
@@ -576,7 +594,7 @@ public class TableTest extends SchemaLoader
         col = cf.getColumn(ByteBufferUtil.bytes("col1992")).value();
         assertEquals(ByteBufferUtil.string(col), "v1992");
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 3);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 3, System.currentTimeMillis());
         assertColumns(cf, "col1997", "col1998", "col1999");
         col = cf.getColumn(ByteBufferUtil.bytes("col1997")).value();
         assertEquals(ByteBufferUtil.string(col), "v1997");
@@ -585,10 +603,10 @@ public class TableTest extends SchemaLoader
         col = cf.getColumn(ByteBufferUtil.bytes("col1999")).value();
         assertEquals(ByteBufferUtil.string(col), "v1999");
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col9000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 3);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col9000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 3, System.currentTimeMillis());
         assertColumns(cf, "col1997", "col1998", "col1999");
 
-        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col9000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col9000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
         assertColumns(cf);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/TimeSortTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TimeSortTest.java b/test/unit/org/apache/cassandra/db/TimeSortTest.java
index 87777ca..3ca996b 100644
--- a/test/unit/org/apache/cassandra/db/TimeSortTest.java
+++ b/test/unit/org/apache/cassandra/db/TimeSortTest.java
@@ -54,7 +54,7 @@ public class TimeSortTest extends SchemaLoader
         rm.add("StandardLong1", getBytes(0), ByteBufferUtil.bytes("b"), 0);
         rm.apply();
 
-        ColumnFamily cf = cfStore.getColumnFamily(key, getBytes(10), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000);
+        ColumnFamily cf = cfStore.getColumnFamily(key, getBytes(10), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000, System.currentTimeMillis());
         Collection<Column> columns = cf.getSortedColumns();
         assert columns.size() == 1;
     }
@@ -95,7 +95,7 @@ public class TimeSortTest extends SchemaLoader
         rm.apply();
 
         // verify
-        ColumnFamily cf = cfStore.getColumnFamily(key, getBytes(0), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000);
+        ColumnFamily cf = cfStore.getColumnFamily(key, getBytes(0), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000, System.currentTimeMillis());
         Collection<Column> columns = cf.getSortedColumns();
         assertEquals(12, columns.size());
         Iterator<Column> iter = columns.iterator();
@@ -108,7 +108,7 @@ public class TimeSortTest extends SchemaLoader
         TreeSet<ByteBuffer> columnNames = new TreeSet<ByteBuffer>(LongType.instance);
         columnNames.add(getBytes(10));
         columnNames.add(getBytes(0));
-        cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("900"), "StandardLong1", columnNames));
+        cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("900"), "StandardLong1", columnNames, System.currentTimeMillis()));
         assert "c".equals(ByteBufferUtil.string(cf.getColumn(getBytes(0)).value()));
         assert "c".equals(ByteBufferUtil.string(cf.getColumn(getBytes(10)).value()));
     }
@@ -120,7 +120,12 @@ public class TimeSortTest extends SchemaLoader
             DecoratedKey key = Util.dk(Integer.toString(i));
             for (int j = 0; j < 8; j += 3)
             {
-                ColumnFamily cf = table.getColumnFamilyStore("StandardLong1").getColumnFamily(key, getBytes(j * 2), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000);
+                ColumnFamily cf = table.getColumnFamilyStore("StandardLong1").getColumnFamily(key,
+                                                                                              getBytes(j * 2),
+                                                                                              ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                                                              false,
+                                                                                              1000,
+                                                                                              System.currentTimeMillis());
                 Collection<Column> columns = cf.getSortedColumns();
                 assert columns.size() == 8 - j;
                 int k = j;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index a351fd2..3ac6418 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -86,7 +86,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         // major compact and test that all columns but the resurrected one is completely gone
         CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE).get();
         cfs.invalidateCachedRow(key);
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assertColumns(cf, "5");
         assert cf.getColumn(ByteBufferUtil.bytes(String.valueOf(5))) != null;
     }
@@ -138,12 +138,12 @@ public class CompactionsPurgeTest extends SchemaLoader
 
         // verify that minor compaction does GC when key is provably not
         // present in a non-compacted sstable
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, cfName));
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, cfName, System.currentTimeMillis()));
         assert cf == null;
 
         // verify that minor compaction still GC when key is present
         // in a non-compacted sstable but the timestamp ensures we won't miss anything
-        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName));
+        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis()));
         Assert.assertEquals(1, cf.getColumnCount());
     }
 
@@ -180,8 +180,8 @@ public class CompactionsPurgeTest extends SchemaLoader
 
         // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone
         // so it would be invalid to assume we can throw out the c1 entry.
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName));
-        Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive());
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis()));
+        Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
         Assert.assertEquals(2, cf.getColumnCount());
     }
 
@@ -218,7 +218,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         // compact and test that the row is completely gone
         Util.compactAll(cfs).get();
         assert cfs.getSSTables().isEmpty();
-        ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+        ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assert cf == null : cf;
     }
 
@@ -244,7 +244,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         rm.apply();
 
         // move the key up in row cache
-        cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+        cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
 
         // deletes row
         rm = new RowMutation(tableName, key.key);
@@ -264,10 +264,10 @@ public class CompactionsPurgeTest extends SchemaLoader
         rm.apply();
 
         // Check that the second insert did went in
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assertEquals(10, cf.getColumnCount());
         for (Column c : cf)
-            assert !c.isMarkedForDelete();
+            assert !c.isMarkedForDelete(System.currentTimeMillis());
     }
 
     @Test
@@ -309,9 +309,9 @@ public class CompactionsPurgeTest extends SchemaLoader
         rm.apply();
 
         // Check that the second insert did went in
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assertEquals(10, cf.getColumnCount());
         for (Column c : cf)
-            assert !c.isMarkedForDelete();
+            assert !c.isMarkedForDelete(System.currentTimeMillis());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 9df5d25..1fe7a46 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -147,7 +147,7 @@ public class CompactionsTest extends SchemaLoader
 
         // check that the shadowed column is gone
         SSTableReader sstable = cfs.getSSTables().iterator().next();
-        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter()), key);
+        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter(), System.currentTimeMillis()), key);
         OnDiskAtomIterator iter = scanner.next();
         assertEquals(key, iter.getKey());
         assert iter.next() instanceof RangeTombstone;
@@ -314,7 +314,7 @@ public class CompactionsTest extends SchemaLoader
 
         Collection<SSTableReader> sstablesBefore = cfs.getSSTables();
 
-        QueryFilter filter = QueryFilter.getIdentityFilter(key, cfname);
+        QueryFilter filter = QueryFilter.getIdentityFilter(key, cfname, System.currentTimeMillis());
         assert !(cfs.getColumnFamily(filter).getColumnCount() == 0);
 
         // Remove key

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index d332ec3..3fb828b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -89,7 +89,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
 
         ActiveRepairService.CFPair p = new ActiveRepairService.CFPair(ksname, cfname);
         Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
-        int gcBefore = (int)(System.currentTimeMillis()/1000) - table.getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+        int gcBefore = table.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
         ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, gcBefore, p);
         ActiveRepairService.Validator validator = new ActiveRepairService.Validator(req);
         CompactionManager.instance.submitValidation(cfs, validator).get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index da83ffd..c14e860 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -140,7 +140,7 @@ public class TTLExpiryTest extends SchemaLoader
         cfs.enableAutoCompaction(true);
         assertEquals(1, cfs.getSSTables().size());
         SSTableReader sstable = cfs.getSSTables().iterator().next();
-        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Standard1", new IdentityQueryFilter()));
+        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Standard1", new IdentityQueryFilter(), System.currentTimeMillis()));
         assertTrue(scanner.hasNext());
         while(scanner.hasNext())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
index 44bf6a1..6ea1554 100644
--- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@ -90,7 +90,7 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
 
         for (Column column : indexedRow.getSortedColumns())
         {
-            assertTrue(column.isMarkedForDelete());
+            assertTrue(column.isMarkedForDelete(System.currentTimeMillis()));
         }
         assertTrue(Arrays.equals("k2".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
     }
@@ -108,7 +108,7 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
         assertNotNull(indexedRow);
         for (Column column : indexedRow.getSortedColumns())
         {
-            assertTrue(column.isMarkedForDelete());
+            assertTrue(column.isMarkedForDelete(System.currentTimeMillis()));
         }
         assertTrue(Arrays.equals("k3".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
     }
@@ -133,7 +133,8 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
         public void index(ByteBuffer rowKey)
         {
             QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey),
-                                                               baseCfs.getColumnFamilyName());
+                                                               baseCfs.getColumnFamilyName(),
+                                                               System.currentTimeMillis());
             LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter);
             LAST_INDEXED_KEY = rowKey;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
index 3614980..8314af8 100644
--- a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
@@ -183,7 +183,7 @@ public class CompositeTypeTest extends SchemaLoader
         addColumn(rm, cname3);
         rm.apply();
 
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("k"), cfName));
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("k"), cfName, System.currentTimeMillis()));
 
         Iterator<Column> iter = cf.getSortedColumns().iterator();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
index f681403..14ec219 100644
--- a/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
@@ -179,7 +179,7 @@ public class DynamicCompositeTypeTest extends SchemaLoader
         addColumn(rm, cname3);
         rm.apply();
 
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("k"), cfName));
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("k"), cfName, System.currentTimeMillis()));
 
         Iterator<Column> iter = cf.getSortedColumns().iterator();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 3f2241e..eeec8d9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -318,7 +318,7 @@ public class SSTableReaderTest extends SchemaLoader
         List<IndexExpression> clause = Arrays.asList(expr);
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = indexedCFS.search(clause, range, 100, new IdentityQueryFilter());
+        List<Row> rows = indexedCFS.search(range, clause, new IdentityQueryFilter(), 100);
         assert rows.size() == 1;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index b120a3f..79aed42 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -107,7 +107,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         local_range = StorageService.instance.getPrimaryRangesForEndpoint(tablename, LOCAL).iterator().next();
 
         // (we use REMOTE instead of LOCAL so that the reponses for the validator.complete() get lost)
-        int gcBefore = (int)(System.currentTimeMillis()/1000) - store.metadata.getGcGraceSeconds();
+        int gcBefore = store.gcBefore(System.currentTimeMillis());
         request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, gcBefore, new CFPair(tablename, cfname));
         // Set a fake session corresponding to this fake request
         ActiveRepairService.instance.submitArtificialRepairSession(request, tablename, cfname);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/service/RowResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RowResolverTest.java b/test/unit/org/apache/cassandra/service/RowResolverTest.java
index c2e0c1d..d19677c 100644
--- a/test/unit/org/apache/cassandra/service/RowResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/RowResolverTest.java
@@ -45,7 +45,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c1", "v2", 1));
 
-        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2), System.currentTimeMillis());
         assertColumns(resolved, "c1");
         assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
         assertNull(ColumnFamily.diff(cf2, resolved));
@@ -60,7 +60,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c2", "v2", 1));
 
-        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2), System.currentTimeMillis());
         assertColumns(resolved, "c1", "c2");
         assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
         assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
@@ -72,7 +72,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
         cf2.addColumn(column("c2", "v2", 1));
 
-        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(null, cf2));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(null, cf2), System.currentTimeMillis());
         assertColumns(resolved, "c2");
         assertColumns(ColumnFamily.diff(null, resolved), "c2");
         assertNull(ColumnFamily.diff(cf2, resolved));
@@ -84,7 +84,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf1 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
         cf1.addColumn(column("c1", "v1", 0));
 
-        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, null));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, null), System.currentTimeMillis());
         assertColumns(resolved, "c1");
         assertNull(ColumnFamily.diff(cf1, resolved));
         assertColumns(ColumnFamily.diff(null, resolved), "c1");
@@ -93,7 +93,7 @@ public class RowResolverTest extends SchemaLoader
     @Test
     public void testResolveSupersetNullBoth()
     {
-        assertNull(RowDataResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+        assertNull(RowDataResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null), System.currentTimeMillis()));
     }
 
     @Test
@@ -106,7 +106,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
         cf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000)));
 
-        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2), System.currentTimeMillis());
         // no columns in the cf
         assertColumns(resolved);
         assertTrue(resolved.isMarkedForDelete());
@@ -133,7 +133,7 @@ public class RowResolverTest extends SchemaLoader
         ColumnFamily cf4 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
         cf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
 
-        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4));
+        ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4), System.currentTimeMillis());
         // will have deleted marker and one column
         assertColumns(resolved, "two");
         assertColumn(resolved, "two", "B", 3);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index e3d9b8a..a86330c 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -100,7 +100,7 @@ public class StreamingTransferTest extends SchemaLoader
         {
             String key = "key" + offs[i];
             String col = "col" + offs[i];
-            assert cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(key), cfs.name)) != null;
+            assert cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(key), cfs.name, System.currentTimeMillis())) != null;
             assert rows.get(i).key.key.equals(ByteBufferUtil.bytes(key));
             assert rows.get(i).cf.getColumn(ByteBufferUtil.bytes(col)) != null;
         }
@@ -197,7 +197,7 @@ public class StreamingTransferTest extends SchemaLoader
             List<IndexExpression> clause = Arrays.asList(expr);
             IDiskAtomFilter filter = new IdentityQueryFilter();
             Range<RowPosition> range = Util.range("", "");
-            List<Row> rows = cfs.search(clause, range, 100, filter);
+            List<Row> rows = cfs.search(range, clause, filter, 100);
             assertEquals(1, rows.size());
             assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
         }
@@ -299,10 +299,10 @@ public class StreamingTransferTest extends SchemaLoader
         assert rows.get(1).cf.getColumnCount() == 1;
 
         // these keys fall outside of the ranges and should not be transferred
-        assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), "Standard1")) == null;
-        assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), "Standard1")) == null;
-        assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test2"), "Standard1")) == null;
-        assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test3"), "Standard1")) == null;
+        assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), "Standard1", System.currentTimeMillis())) == null;
+        assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), "Standard1", System.currentTimeMillis())) == null;
+        assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test2"), "Standard1", System.currentTimeMillis())) == null;
+        assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test3"), "Standard1", System.currentTimeMillis())) == null;
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index 6fd1764..5528d4c 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -202,13 +202,13 @@ public class SSTableExportTest extends SchemaLoader
         new SSTableImport().importJson(tempJson.getPath(), "Keyspace1", "Standard1", tempSS2.getPath());
 
         reader = SSTableReader.open(Descriptor.fromFilename(tempSS2.getPath()));
-        QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), "Standard1", ByteBufferUtil.bytes("name"));
+        QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), "Standard1", ByteBufferUtil.bytes("name"), System.currentTimeMillis());
         ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
         qf.collateOnDiskAtom(cf, qf.getSSTableColumnIterator(reader), Integer.MIN_VALUE);
         assertTrue(cf != null);
         assertTrue(cf.getColumn(ByteBufferUtil.bytes("name")).value().equals(hexToBytes("76616c")));
 
-        qf = QueryFilter.getNamesFilter(Util.dk("rowExclude"), "Standard1", ByteBufferUtil.bytes("name"));
+        qf = QueryFilter.getNamesFilter(Util.dk("rowExclude"), "Standard1", ByteBufferUtil.bytes("name"), System.currentTimeMillis());
         cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
         assert cf == null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableImportTest.java b/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
index a1a7673..b0f22be 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
@@ -53,7 +53,7 @@ public class SSTableImportTest extends SchemaLoader
 
         // Verify results
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
-        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1");
+        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1", System.currentTimeMillis());
         OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         ColumnFamily cf = cloneForAdditions(iter);
         while (iter.hasNext()) cf.addAtom(iter.next());
@@ -87,7 +87,7 @@ public class SSTableImportTest extends SchemaLoader
 
         // Verify results
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
-        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1");
+        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1", System.currentTimeMillis());
         OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         ColumnFamily cf = cloneForAdditions(iter);
         while (iter.hasNext()) cf.addAtom(iter.next());
@@ -108,7 +108,7 @@ public class SSTableImportTest extends SchemaLoader
 
         // Verify results
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
-        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Super4");
+        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Super4", System.currentTimeMillis());
         ColumnFamily cf = cloneForAdditions(qf.getSSTableColumnIterator(reader));
         qf.collateOnDiskAtom(cf, qf.getSSTableColumnIterator(reader), Integer.MIN_VALUE);
 
@@ -137,7 +137,7 @@ public class SSTableImportTest extends SchemaLoader
         new SSTableImport().importJson(jsonUrl, "Keyspace1", "Standard1", tempSS.getPath());
 
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
-        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1");
+        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1", System.currentTimeMillis());
         OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         ColumnFamily cf = cloneForAdditions(iter);
         while (iter.hasNext())
@@ -160,7 +160,7 @@ public class SSTableImportTest extends SchemaLoader
 
         // Verify results
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
-        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1");
+        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1", System.currentTimeMillis());
         OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         ColumnFamily cf = cloneForAdditions(iter);
         assertEquals(cf.deletionInfo(), new DeletionInfo(0, 0));
@@ -184,7 +184,7 @@ public class SSTableImportTest extends SchemaLoader
 
         // Verify results
         SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
-        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Counter1");
+        QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Counter1", System.currentTimeMillis());
         OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
         ColumnFamily cf = cloneForAdditions(iter);
         while (iter.hasNext()) cf.addAtom(iter.next());


[4/4] git commit: Include a timestamp with all read commands to determine column expiration

Posted by al...@apache.org.
Include a timestamp with all read commands to determine column expiration

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-5149


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f7628ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f7628ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f7628ce

Branch: refs/heads/trunk
Commit: 1f7628ce7c1b3f820717eaa44df9b182158eb49e
Parents: 62295f6
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Jun 18 19:15:02 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Jun 18 19:15:02 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/config/CFMetaData.java |   2 +-
 .../cassandra/config/ColumnDefinition.java      |   3 +-
 .../apache/cassandra/cql/QueryProcessor.java    |  17 +-
 .../cql3/statements/ColumnGroupMap.java         |   6 +-
 .../cql3/statements/ModificationStatement.java  |   4 +-
 .../cql3/statements/SelectStatement.java        |  49 ++--
 .../cassandra/cql3/statements/Selection.java    |  12 +-
 .../cassandra/db/CollationController.java       |   6 +-
 src/java/org/apache/cassandra/db/Column.java    |  25 +-
 .../org/apache/cassandra/db/ColumnFamily.java   |   9 +-
 .../cassandra/db/ColumnFamilySerializer.java    |  22 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 107 ++++----
 .../apache/cassandra/db/ColumnSerializer.java   |   2 +-
 .../org/apache/cassandra/db/CounterColumn.java  |   8 +-
 .../apache/cassandra/db/CounterMutation.java    |   7 +-
 .../cassandra/db/CounterUpdateColumn.java       |   2 +-
 src/java/org/apache/cassandra/db/DefsTable.java |   4 +-
 .../org/apache/cassandra/db/DeletedColumn.java  |   4 +-
 .../org/apache/cassandra/db/DeletionTime.java   |   4 +-
 .../org/apache/cassandra/db/ExpiringColumn.java |  15 +-
 .../cassandra/db/HintedHandOffManager.java      |  23 +-
 .../org/apache/cassandra/db/OnDiskAtom.java     |   2 +-
 .../apache/cassandra/db/RangeSliceCommand.java  |  83 +++---
 .../org/apache/cassandra/db/RangeTombstone.java |   4 +-
 .../org/apache/cassandra/db/ReadCommand.java    |  18 +-
 .../db/RetriedSliceFromReadCommand.java         |   9 +-
 src/java/org/apache/cassandra/db/Row.java       |   4 +-
 .../apache/cassandra/db/RowIteratorFactory.java |  16 +-
 .../cassandra/db/SliceByNamesReadCommand.java   |  28 +-
 .../cassandra/db/SliceFromReadCommand.java      |  30 ++-
 .../apache/cassandra/db/SliceQueryPager.java    |   7 +-
 .../org/apache/cassandra/db/SuperColumns.java   |   4 +-
 .../org/apache/cassandra/db/SystemTable.java    |  20 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../db/compaction/LazilyCompactedRow.java       |   4 +-
 .../db/compaction/PrecompactedRow.java          |   4 +-
 .../cassandra/db/filter/ColumnCounter.java      |  17 +-
 .../cassandra/db/filter/ExtendedFilter.java     |  49 +++-
 .../cassandra/db/filter/IDiskAtomFilter.java    |   4 +-
 .../cassandra/db/filter/NamesQueryFilter.java   |   8 +-
 .../apache/cassandra/db/filter/QueryFilter.java |  31 ++-
 .../cassandra/db/filter/SliceQueryFilter.java   |  20 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |   2 +-
 .../db/index/SecondaryIndexManager.java         |  20 +-
 .../db/index/SecondaryIndexSearcher.java        |  17 +-
 .../db/index/composites/CompositesIndex.java    |   2 +-
 .../CompositesIndexOnClusteringKey.java         |   4 +-
 .../CompositesIndexOnPartitionKey.java          |   4 +-
 .../composites/CompositesIndexOnRegular.java    |   4 +-
 .../db/index/composites/CompositesSearcher.java |  31 ++-
 .../cassandra/db/index/keys/KeysIndex.java      |   4 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |  27 +-
 .../io/sstable/SSTableIdentityIterator.java     |   5 +-
 .../cassandra/io/sstable/SSTableReader.java     |  10 +-
 .../cassandra/service/ActiveRepairService.java  |   2 +-
 .../apache/cassandra/service/CacheService.java  |   4 +-
 .../service/RangeSliceResponseResolver.java     |   6 +-
 .../service/RangeSliceVerbHandler.java          |  15 +-
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../cassandra/service/RowDataResolver.java      |  12 +-
 .../apache/cassandra/service/StorageProxy.java  |  35 ++-
 .../cassandra/thrift/CassandraServer.java       | 211 ++++++++-------
 .../serialization/2.0/db.RangeSliceCommand.bin  | Bin 753 -> 801 bytes
 .../2.0/db.SliceByNamesReadCommand.bin          | Bin 437 -> 485 bytes
 .../2.0/db.SliceFromReadCommand.bin             | Bin 437 -> 485 bytes
 .../org/apache/cassandra/db/LongTableTest.java  |   5 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |   2 +-
 test/unit/org/apache/cassandra/Util.java        |   9 +-
 .../org/apache/cassandra/config/DefsTest.java   |   6 +-
 .../org/apache/cassandra/db/CleanupTest.java    |   4 +-
 .../cassandra/db/CollationControllerTest.java   |   4 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     | 259 +++++++++++++------
 .../apache/cassandra/db/ColumnFamilyTest.java   |   4 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  12 +-
 .../apache/cassandra/db/KeyCollisionTest.java   |   2 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  10 +-
 .../apache/cassandra/db/ReadMessageTest.java    |  13 +-
 .../db/RecoveryManagerTruncateTest.java         |   6 +-
 .../cassandra/db/RemoveColumnFamilyTest.java    |   2 +-
 .../db/RemoveColumnFamilyWithFlush1Test.java    |   2 +-
 .../db/RemoveColumnFamilyWithFlush2Test.java    |   2 +-
 .../apache/cassandra/db/RemoveColumnTest.java   |  18 +-
 .../cassandra/db/RemoveSubColumnTest.java       |  10 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  28 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   4 +-
 .../apache/cassandra/db/SerializationsTest.java |  24 +-
 .../unit/org/apache/cassandra/db/TableTest.java |  94 ++++---
 .../org/apache/cassandra/db/TimeSortTest.java   |  13 +-
 .../db/compaction/CompactionsPurgeTest.java     |  22 +-
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |   2 +-
 .../db/index/PerRowSecondaryIndexTest.java      |   7 +-
 .../cassandra/db/marshal/CompositeTypeTest.java |   2 +-
 .../db/marshal/DynamicCompositeTypeTest.java    |   2 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 .../service/AntiEntropyServiceTestAbstract.java |   2 +-
 .../cassandra/service/RowResolverTest.java      |  14 +-
 .../streaming/StreamingTransferTest.java        |  12 +-
 .../cassandra/tools/SSTableExportTest.java      |   4 +-
 .../cassandra/tools/SSTableImportTest.java      |  12 +-
 102 files changed, 1008 insertions(+), 730 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a620d31..46b67e3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,8 @@
  * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
  * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
  * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
+ * Include a timestamp with all read commands to determine column expiration
+   (CASSANDRA-5149)
 
 1.2.6
  * Reduce SSTableLoader memory usage (CASSANDRA-5555)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b5ece0c..15713bb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1251,7 +1251,7 @@ public final class CFMetaData
 
     public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, Descriptor.Version version)
     {
-        return getOnDiskIterator(in, count, ColumnSerializer.Flag.LOCAL, (int) (System.currentTimeMillis() / 1000), version);
+        return getOnDiskIterator(in, count, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
     }
 
     public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 9d21435..470e7d7 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -306,7 +306,8 @@ public class ColumnDefinition
                                                        DefsTable.searchComposite(cfName, true),
                                                        DefsTable.searchComposite(cfName, false),
                                                        false,
-                                                       Integer.MAX_VALUE);
+                                                       Integer.MAX_VALUE,
+                                                       System.currentTimeMillis());
         return new Row(key, cf);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 57fec4a..a40bfea 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -71,7 +71,7 @@ public class QueryProcessor
 
     public static final String DEFAULT_KEY_NAME = bufferToString(CFMetaData.DEFAULT_KEY_NAME);
 
-    private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables)
+    private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now)
     throws InvalidRequestException, ReadTimeoutException, UnavailableException, IsBootstrappingException, WriteTimeoutException
     {
         List<ReadCommand> commands = new ArrayList<ReadCommand>();
@@ -87,7 +87,7 @@ public class QueryProcessor
                 ByteBuffer key = rawKey.getByteBuffer(metadata.getKeyValidator(),variables);
 
                 validateKey(key);
-                commands.add(new SliceByNamesReadCommand(metadata.ksName, key, select.getColumnFamily(), new NamesQueryFilter(columnNames)));
+                commands.add(new SliceByNamesReadCommand(metadata.ksName, key, select.getColumnFamily(), now, new NamesQueryFilter(columnNames)));
             }
         }
         // ...a range (slice) of column names
@@ -106,6 +106,7 @@ public class QueryProcessor
                 commands.add(new SliceFromReadCommand(metadata.ksName,
                                                       key,
                                                       select.getColumnFamily(),
+                                                      now,
                                                       new SliceQueryFilter(start, finish, select.isColumnsReversed(), select.getColumnsLimit())));
             }
         }
@@ -128,7 +129,7 @@ public class QueryProcessor
         return columnNames;
     }
 
-    private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables)
+    private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now)
     throws ReadTimeoutException, UnavailableException, InvalidRequestException
     {
         IPartitioner<?> p = StorageService.getPartitioner();
@@ -175,6 +176,7 @@ public class QueryProcessor
 
         List<org.apache.cassandra.db.Row> rows = StorageProxy.getRangeSlice(new RangeSliceCommand(metadata.ksName,
                                                                                                   select.getColumnFamily(),
+                                                                                                  now,
                                                                                                   columnFilter,
                                                                                                   bounds,
                                                                                                   expressions,
@@ -379,14 +381,15 @@ public class QueryProcessor
 
                 List<org.apache.cassandra.db.Row> rows;
 
+                long now = System.currentTimeMillis();
                 // By-key
                 if (!select.isKeyRange() && (select.getKeys().size() > 0))
                 {
-                    rows = getSlice(metadata, select, variables);
+                    rows = getSlice(metadata, select, variables, now);
                 }
                 else
                 {
-                    rows = multiRangeSlice(metadata, select, variables);
+                    rows = multiRangeSlice(metadata, select, variables, now);
                 }
 
                 // count resultset is a single column named "count"
@@ -429,7 +432,7 @@ public class QueryProcessor
                         {
                             for (org.apache.cassandra.db.Column c : row.cf.getSortedColumns())
                             {
-                                if (c.isMarkedForDelete())
+                                if (c.isMarkedForDelete(now))
                                     continue;
 
                                 ColumnDefinition cd = metadata.getColumnDefinitionFromColumnName(c.name());
@@ -474,7 +477,7 @@ public class QueryProcessor
                             if (cd != null)
                                 result.schema.value_types.put(name, TypeParser.getShortName(cd.getValidator()));
                             org.apache.cassandra.db.Column c = row.cf.getColumn(name);
-                            if (c == null || c.isMarkedForDelete())
+                            if (c == null || c.isMarkedForDelete(now))
                                 thriftColumns.add(new Column().setName(name));
                             else
                                 thriftColumns.add(thriftify(c));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
index 20fa3bd..8974523 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -104,20 +104,22 @@ public class ColumnGroupMap
     {
         private final CompositeType composite;
         private final int idx;
+        private final long now;
         private ByteBuffer[] previous;
 
         private final List<ColumnGroupMap> groups = new ArrayList<ColumnGroupMap>();
         private ColumnGroupMap currentGroup;
 
-        public Builder(CompositeType composite, boolean hasCollections)
+        public Builder(CompositeType composite, boolean hasCollections, long now)
         {
             this.composite = composite;
             this.idx = composite.types.size() - (hasCollections ? 2 : 1);
+            this.now = now;
         }
 
         public void add(Column c)
         {
-            if (c.isMarkedForDelete())
+            if (c.isMarkedForDelete(now))
                 return;
 
             ByteBuffer[] current = composite.split(c.name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index f6b7140..62f7fbd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -297,10 +297,12 @@ public abstract class ModificationStatement implements CQLStatement
         }
 
         List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
+        long now = System.currentTimeMillis();
         for (ByteBuffer key : partitionKeys)
             commands.add(new SliceFromReadCommand(keyspace(),
                                                   key,
                                                   columnFamily(),
+                                                  now,
                                                   new SliceQueryFilter(slices, false, Integer.MAX_VALUE)));
 
         List<Row> rows = local
@@ -313,7 +315,7 @@ public abstract class ModificationStatement implements CQLStatement
             if (row.cf == null || row.cf.getColumnCount() == 0)
                 continue;
 
-            ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true);
+            ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true, now);
             for (Column column : row.cf)
                 groupBuilder.add(column);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a96eb1d..fd47ba8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -127,17 +127,18 @@ public class SelectStatement implements CQLStatement
         cl.validateForRead(keyspace());
 
         int limit = getLimit(variables);
+        long now = System.currentTimeMillis();
         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit), cl)
-                       : StorageProxy.read(getSliceCommands(variables, limit), cl);
+                       ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit, now), cl)
+                       : StorageProxy.read(getSliceCommands(variables, limit, now), cl);
 
-        return processResults(rows, variables, limit);
+        return processResults(rows, variables, limit, now);
     }
 
-    private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit) throws RequestValidationException
+    private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
     {
         // Even for count, we need to process the result as it'll group some column together in sparse column families
-        ResultSet rset = process(rows, variables, limit);
+        ResultSet rset = process(rows, variables, limit, now);
         rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset;
         return new ResultMessage.Rows(rset);
     }
@@ -153,19 +154,20 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = Collections.<ByteBuffer>emptyList();
+        List<ByteBuffer> variables = Collections.emptyList();
         int limit = getLimit(variables);
+        long now = System.currentTimeMillis();
         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit))
-                       : readLocally(keyspace(), getSliceCommands(variables, limit));
+                       ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit, now))
+                       : readLocally(keyspace(), getSliceCommands(variables, limit, now));
 
-        return processResults(rows, variables, limit);
+        return processResults(rows, variables, limit, now);
     }
 
     public ResultSet process(List<Row> rows) throws InvalidRequestException
     {
         assert !parameters.isCount; // not yet needed
-        return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()));
+        return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()), System.currentTimeMillis());
     }
 
     public String keyspace()
@@ -178,7 +180,7 @@ public class SelectStatement implements CQLStatement
         return cfDef.cfm.cfName;
     }
 
-    private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit) throws RequestValidationException
+    private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
     {
         Collection<ByteBuffer> keys = getKeys(variables);
         List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
@@ -192,25 +194,18 @@ public class SelectStatement implements CQLStatement
             // We should not share the slice filter amongst the commands (hence the cloneShallow), due to
             // SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method
             // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly)
-            commands.add(ReadCommand.create(keyspace(), key, columnFamily(), filter.cloneShallow()));
+            commands.add(ReadCommand.create(keyspace(), key, columnFamily(), now, filter.cloneShallow()));
         }
         return commands;
     }
 
-    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit) throws RequestValidationException
+    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
     {
         IDiskAtomFilter filter = makeFilter(variables, limit);
         List<IndexExpression> expressions = getIndexExpressions(variables);
         // The LIMIT provided by the user is the number of CQL row he wants returned.
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
-        return new RangeSliceCommand(keyspace(),
-                                     columnFamily(),
-                                     filter,
-                                     getKeyBounds(variables),
-                                     expressions,
-                                     limit,
-                                     true,
-                                     false);
+        return new RangeSliceCommand(keyspace(), columnFamily(), now,  filter, getKeyBounds(variables), expressions, limit, true, false);
     }
 
     private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException
@@ -661,9 +656,9 @@ public class SelectStatement implements CQLStatement
         };
     }
 
-    private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit) throws InvalidRequestException
+    private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws InvalidRequestException
     {
-        Selection.ResultSetBuilder result = selection.resultSetBuilder();
+        Selection.ResultSetBuilder result = selection.resultSetBuilder(now);
         for (org.apache.cassandra.db.Row row : rows)
         {
             // Not columns match the query, skip
@@ -679,7 +674,7 @@ public class SelectStatement implements CQLStatement
                 // One cqlRow per column
                 for (Column c : columnsInOrder(row.cf, variables))
                 {
-                    if (c.isMarkedForDelete())
+                    if (c.isMarkedForDelete(now))
                         continue;
 
                     ByteBuffer[] components = null;
@@ -728,11 +723,11 @@ public class SelectStatement implements CQLStatement
                 // Sparse case: group column in cqlRow when composite prefix is equal
                 CompositeType composite = (CompositeType)cfDef.cfm.comparator;
 
-                ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections);
+                ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections, now);
 
                 for (Column c : row.cf)
                 {
-                    if (c.isMarkedForDelete())
+                    if (c.isMarkedForDelete(now))
                         continue;
 
                     builder.add(c);
@@ -743,7 +738,7 @@ public class SelectStatement implements CQLStatement
             }
             else
             {
-                if (row.cf.hasOnlyTombstones())
+                if (row.cf.hasOnlyTombstones(now))
                     continue;
 
                 // Static case: One cqlRow for all columns

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index d3018e5..cf2b62e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -213,9 +213,9 @@ public abstract class Selection
         return columnsList;
     }
 
-    public ResultSetBuilder resultSetBuilder()
+    public ResultSetBuilder resultSetBuilder(long now)
     {
-        return new ResultSetBuilder();
+        return new ResultSetBuilder(now);
     }
 
     private static ByteBuffer value(Column c)
@@ -240,12 +240,14 @@ public abstract class Selection
         List<ByteBuffer> current;
         final long[] timestamps;
         final int[] ttls;
+        final long now;
 
-        private ResultSetBuilder()
+        private ResultSetBuilder(long now)
         {
             this.resultSet = new ResultSet(metadata);
             this.timestamps = collectTimestamps ? new long[columnsList.size()] : null;
             this.ttls = collectTTLs ? new int[columnsList.size()] : null;
+            this.now = now;
         }
 
         public void add(ByteBuffer v)
@@ -264,14 +266,14 @@ public abstract class Selection
             {
                 int ttl = -1;
                 if (!isDead(c) && c instanceof ExpiringColumn)
-                    ttl = c.getLocalDeletionTime() - (int) (System.currentTimeMillis() / 1000);
+                    ttl = c.getLocalDeletionTime() - (int) (now / 1000);
                 ttls[current.size() - 1] = ttl;
             }
         }
 
         private boolean isDead(Column c)
         {
-            return c == null || c.isMarkedForDelete();
+            return c == null || c.isMarkedForDelete(now);
         }
 
         public void newRow() throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index f1fccef..d0d22c5 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -21,8 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
@@ -37,8 +35,6 @@ import org.apache.cassandra.utils.HeapAllocator;
 
 public class CollationController
 {
-    private static final Logger logger = LoggerFactory.getLogger(CollationController.class);
-
     private final ColumnFamilyStore cfs;
     private final QueryFilter filter;
     private final int gcBefore;
@@ -104,7 +100,7 @@ public class CollationController
             // (reduceNameFilter removes columns that are known to be irrelevant)
             NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
             TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns);
-            QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns));
+            QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
 
             /* add the SSTables on disk */
             Collections.sort(view.sstables, SSTable.maxTimestampComparator);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index b42097c..b210d22 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -143,14 +143,20 @@ public class Column implements OnDiskAtom
         return timestamp;
     }
 
-    public boolean isMarkedForDelete()
+    public boolean isMarkedForDelete(long now)
     {
-        return (int) (System.currentTimeMillis() / 1000) >= getLocalDeletionTime();
+        return false;
     }
 
+    public boolean isLive(long now)
+    {
+        return !isMarkedForDelete(now);
+    }
+
+    // Don't call unless the column is actually marked for delete.
     public long getMarkedForDeleteAt()
     {
-        throw new IllegalStateException("column is not marked for delete");
+        return Long.MAX_VALUE;
     }
 
     public int dataSize()
@@ -186,9 +192,7 @@ public class Column implements OnDiskAtom
     public Column diff(Column column)
     {
         if (timestamp() < column.timestamp())
-        {
             return column;
-        }
         return null;
     }
 
@@ -223,9 +227,9 @@ public class Column implements OnDiskAtom
     public Column reconcile(Column column, Allocator allocator)
     {
         // tombstones take precedence.  (if both are tombstones, then it doesn't matter which one we use.)
-        if (isMarkedForDelete())
+        if (isMarkedForDelete(System.currentTimeMillis()))
             return timestamp() < column.timestamp() ? column : this;
-        if (column.isMarkedForDelete())
+        if (column.isMarkedForDelete(System.currentTimeMillis()))
             return timestamp() > column.timestamp() ? this : column;
         // break ties by comparing values.
         if (timestamp() == column.timestamp())
@@ -276,7 +280,7 @@ public class Column implements OnDiskAtom
         StringBuilder sb = new StringBuilder();
         sb.append(comparator.getString(name));
         sb.append(":");
-        sb.append(isMarkedForDelete());
+        sb.append(isMarkedForDelete(System.currentTimeMillis()));
         sb.append(":");
         sb.append(value.remaining());
         sb.append("@");
@@ -284,11 +288,6 @@ public class Column implements OnDiskAtom
         return sb.toString();
     }
 
-    public boolean isLive()
-    {
-        return !isMarkedForDelete();
-    }
-
     protected void validateName(CFMetaData metadata) throws MarshalException
     {
         metadata.comparator.validate(name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index cda2d9b..36396f8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -31,7 +31,6 @@ import java.util.UUID;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.collect.ImmutableMap;
-
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
 import org.apache.cassandra.cache.IRowCacheEntry;
@@ -194,7 +193,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
      *   </code>
      *  but is potentially faster.
      */
-     public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
+    public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
 
     /**
      * Replace oldColumn if present by newColumn.
@@ -426,13 +425,11 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         return metadata.comparator;
     }
 
-    public boolean hasOnlyTombstones()
+    public boolean hasOnlyTombstones(long now)
     {
         for (Column column : this)
-        {
-            if (column.isLive())
+            if (column.isLive(now))
                 return false;
-        }
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index f5cf3d4..411b040 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -102,11 +102,10 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
             return null;
 
         ColumnFamily cf = factory.create(Schema.instance.getCFMetaData(deserializeCfId(in, version)));
-        int expireBefore = (int) (System.currentTimeMillis() / 1000);
 
         if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
         {
-            SuperColumns.deserializerSuperColumnFamily(in, cf, flag, expireBefore, version);
+            SuperColumns.deserializerSuperColumnFamily(in, cf, flag, version);
         }
         else
         {
@@ -115,9 +114,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
             ColumnSerializer columnSerializer = Column.serializer;
             int size = in.readInt();
             for (int i = 0; i < size; ++i)
-            {
-                cf.addColumn(columnSerializer.deserialize(in, flag, expireBefore));
-            }
+                cf.addColumn(columnSerializer.deserialize(in, flag));
         }
         return cf;
     }
@@ -170,21 +167,6 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
         throw new UnsupportedOperationException();
     }
 
-    public void deserializeColumnsFromSSTable(DataInput in, ColumnFamily cf, int size, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
-    {
-        Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(in, size, flag, expireBefore, version);
-        while (iter.hasNext())
-            cf.addAtom(iter.next());
-    }
-
-    public void deserializeFromSSTable(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, Descriptor.Version version) throws IOException
-    {
-        cf.delete(DeletionInfo.serializer().deserializeFromSSTable(in, version));
-        int size = in.readInt();
-        int expireBefore = (int) (System.currentTimeMillis() / 1000);
-        deserializeColumnsFromSSTable(in, cf, size, flag, expireBefore, version);
-    }
-
     public void serializeCfId(UUID cfId, DataOutput out, int version) throws IOException
     {
         UUIDSerializer.serializer.serialize(cfId, out, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4287df6..648c25a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,10 +33,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.db.compaction.*;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +48,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -73,6 +70,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import static org.apache.cassandra.config.CFMetaData.Caching;
 
@@ -1180,24 +1178,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return metric.writeLatency.recentLatencyHistogram.getBuckets(true);
     }
 
-    public ColumnFamily getColumnFamily(DecoratedKey key, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+    public ColumnFamily getColumnFamily(DecoratedKey key,
+                                        ByteBuffer start,
+                                        ByteBuffer finish,
+                                        boolean reversed,
+                                        int limit,
+                                        long timestamp)
     {
-        return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit));
-    }
-
-    /**
-     * get a list of columns starting from a given column, in a specified order.
-     * only the latest version of a column is returned.
-     * @return null if there is no data and no tombstones; otherwise a ColumnFamily
-     */
-    public ColumnFamily getColumnFamily(QueryFilter filter)
-    {
-        return getColumnFamily(filter, gcBefore());
-    }
-
-    public int gcBefore()
-    {
-        return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
+        return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
     }
 
     /**
@@ -1236,7 +1224,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         try
         {
-            ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name), Integer.MIN_VALUE);
+            ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp),
+                                                   Integer.MIN_VALUE);
             if (sentinelSuccess && data != null)
                 CacheService.instance.rowCache.replace(key, sentinel, data);
 
@@ -1249,7 +1238,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore)
+    public int gcBefore(long now)
+    {
+        return (int) (now / 1000) - metadata.getGcGraceSeconds();
+    }
+
+    /**
+     * get a list of columns starting from a given column, in a specified order.
+     * only the latest version of a column is returned.
+     * @return null if there is no data and no tombstones; otherwise a ColumnFamily
+     */
+    public ColumnFamily getColumnFamily(QueryFilter filter)
     {
         assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName();
 
@@ -1258,6 +1257,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         long start = System.nanoTime();
         try
         {
+            int gcBefore = gcBefore(filter.timestamp);
             if (isRowCacheEnabled())
             {
                 UUID cfId = Schema.instance.getId(table.getName(), name);
@@ -1274,7 +1274,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     return null;
                 }
 
-                result = filterColumnFamily(cached, filter, gcBefore);
+                result = filterColumnFamily(cached, filter);
             }
             else
             {
@@ -1301,12 +1301,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      *  tombstones that are no longer relevant.
      *  The returned column family won't be thread safe.
      */
-    ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter, int gcBefore)
+    ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
     {
         ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
         OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
-        filter.collateOnDiskAtom(cf, ci, gcBefore);
-        return removeDeletedCF(cf, gcBefore);
+        filter.collateOnDiskAtom(cf, ci, gcBefore(filter.timestamp));
+        return removeDeletedCF(cf, gcBefore(filter.timestamp));
     }
 
     /**
@@ -1440,7 +1440,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
             return files;
         }
-        finally {
+        finally
+        {
             SSTableReader.releaseReferences(view.sstables);
         }
     }
@@ -1468,14 +1469,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
       * @param range Either a Bounds, which includes start key, or a Range, which does not.
       * @param columnFilter description of the columns we're interested in for each row
      */
-    public AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+    private AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range,
+                                                       IDiskAtomFilter columnFilter,
+                                                       long timestamp)
     {
         assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
 
         final RowPosition startWith = range.left;
         final RowPosition stopAt = range.right;
 
-        QueryFilter filter = new QueryFilter(null, name, columnFilter);
+        QueryFilter filter = new QueryFilter(null, name, columnFilter, timestamp);
 
         final ViewFragment view = markReferenced(range);
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
@@ -1524,25 +1527,43 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter)
+    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
+                                   List<IndexExpression> rowFilter,
+                                   IDiskAtomFilter columnFilter,
+                                   int maxResults)
     {
-        return getRangeSlice(range, maxResults, columnFilter, rowFilter, false, false);
+        return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis(), false, false);
     }
 
-    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean countCQL3Rows, boolean isPaging)
+    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
+                                   List<IndexExpression> rowFilter,
+                                   IDiskAtomFilter columnFilter,
+                                   int maxResults,
+                                   long now,
+                                   boolean countCQL3Rows,
+                                   boolean isPaging)
     {
-        return filter(getSequentialIterator(range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging));
+        return filter(getSequentialIterator(range, columnFilter, now),
+                      ExtendedFilter.create(this, rowFilter, columnFilter, maxResults, now, countCQL3Rows, isPaging));
     }
 
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter columnFilter,
+                            int maxResults)
     {
-        return search(clause, range, maxResults, dataFilter, false);
+        return search(range, clause, columnFilter, maxResults, System.currentTimeMillis(), false);
     }
 
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter columnFilter,
+                            int maxResults,
+                            long now,
+                            boolean countCQL3Rows)
     {
         Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator()));
-        return indexManager.search(clause, range, maxResults, dataFilter, countCQL3Rows);
+        return indexManager.search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
     }
 
     public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
@@ -1566,7 +1587,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
                     if (extraFilter != null)
                     {
-                        ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter));
+                        ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
                         if (cf != null)
                             data.addAll(cf, HeapAllocator.instance);
                     }
@@ -1751,14 +1772,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return Iterables.concat(stores);
     }
 
-    public static List<ColumnFamilyStore> allUserDefined()
-    {
-        List<ColumnFamilyStore> cfses = new ArrayList<ColumnFamilyStore>();
-        for (Table table : Sets.difference(ImmutableSet.copyOf(Table.all()), Schema.systemKeyspaceNames))
-            cfses.addAll(table.getColumnFamilyStores());
-        return cfses;
-    }
-
     public Iterable<DecoratedKey> keySamples(Range<Token> range)
     {
         Collection<SSTableReader> sstables = getSSTables();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 353dda9..fb38b5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -88,7 +88,7 @@ public class ColumnSerializer implements ISerializer<Column>
      */
     public Column deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
     {
-        return deserialize(in, flag, (int) (System.currentTimeMillis() / 1000));
+        return deserialize(in, flag, Integer.MIN_VALUE);
     }
 
     public Column deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 15da1df..207ded6 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -169,9 +169,11 @@ public class CounterColumn extends Column
     {
         assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
 
-        if (column.isMarkedForDelete()) // live + tombstone: track last tombstone
+        // live + tombstone: track last tombstone
+        if (column.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired column, so the current time is irrelevant
         {
-            if (timestamp() < column.timestamp()) // live < tombstone
+            // live < tombstone
+            if (timestamp() < column.timestamp())
             {
                 return column;
             }
@@ -230,7 +232,7 @@ public class CounterColumn extends Column
         StringBuilder sb = new StringBuilder();
         sb.append(comparator.getString(name));
         sb.append(":");
-        sb.append(isMarkedForDelete());
+        sb.append(false);
         sb.append(":");
         sb.append(contextManager.toString(value));
         sb.append("@");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 6f60a26..9ace314 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -84,11 +84,12 @@ public class CounterMutation implements IMutation
     public RowMutation makeReplicationMutation()
     {
         List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
+        long timestamp = System.currentTimeMillis();
         for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
         {
             if (!columnFamily.metadata().getReplicateOnWrite())
                 continue;
-            addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, readCommands);
+            addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, timestamp, readCommands);
         }
 
         // create a replication RowMutation
@@ -106,11 +107,11 @@ public class CounterMutation implements IMutation
         return replicationMutation;
     }
 
-    private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, List<ReadCommand> commands)
+    private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
     {
         SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(columnFamily.metadata().comparator);
         Iterables.addAll(s, columnFamily.getColumnNames());
-        commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, new NamesQueryFilter(s)));
+        commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
     }
 
     public MessageOut<CounterMutation> makeMutationMessage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 9d9530e..1ae7dd7 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -64,7 +64,7 @@ public class CounterUpdateColumn extends Column
         assert (column instanceof CounterUpdateColumn) || (column instanceof DeletedColumn) : "Wrong class type.";
 
         // tombstones take precedence
-        if (column.isMarkedForDelete())
+        if (column.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired column, so the current time is irrelevant
             return timestamp() > column.timestamp() ? this : column;
 
         // neither is tombstoned

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index df551e0..4b6c1c2 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -152,7 +152,9 @@ public class DefsTable
     private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
     {
         ColumnFamilyStore cfsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
-        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, SystemTable.SCHEMA_COLUMNFAMILIES_CF)));
+        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
+                                                                                         SystemTable.SCHEMA_COLUMNFAMILIES_CF,
+                                                                                         System.currentTimeMillis())));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
index f9bda78..57c9bf9 100644
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ b/src/java/org/apache/cassandra/db/DeletedColumn.java
@@ -53,10 +53,8 @@ public class DeletedColumn extends Column
     }
 
     @Override
-    public boolean isMarkedForDelete()
+    public boolean isMarkedForDelete(long now)
     {
-        // We don't rely on the column implementation because it could mistakenly return false if
-        // some node are not exactly synchronized, which is problematic (see #4307)
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index d1ce0eb..5296529 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -83,9 +83,9 @@ public class DeletionTime implements Comparable<DeletionTime>
         return localDeletionTime < gcBefore;
     }
 
-    public boolean isDeleted(Column column)
+    public boolean isDeleted(Column column, long now)
     {
-        return column.isMarkedForDelete() && column.getMarkedForDeleteAt() <= markedForDeleteAt;
+        return column.isMarkedForDelete(now) && column.getMarkedForDeleteAt() <= markedForDeleteAt;
     }
 
     public long memorySize()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index 7660bad..f342310 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -158,16 +158,15 @@ public class ExpiringColumn extends Column
     }
 
     @Override
+    public boolean isMarkedForDelete(long now)
+    {
+        return (int) (now / 1000) >= getLocalDeletionTime();
+    }
+
+    @Override
     public long getMarkedForDeleteAt()
     {
-        if (isMarkedForDelete())
-        {
-            return timestamp;
-        }
-        else
-        {
-            throw new IllegalStateException("column is not marked for delete");
-        }
+        return timestamp;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index c10bed6..e89c769 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -326,15 +326,16 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         delivery:
         while (true)
         {
+            long now = System.currentTimeMillis();
             QueryFilter filter = QueryFilter.getSliceFilter(epkey,
                                                             SystemTable.HINTS_CF,
                                                             startColumn,
                                                             ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                             false,
-                                                            pageSize);
+                                                            pageSize,
+                                                            now);
 
-            ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
-                                                                     (int) (System.currentTimeMillis() / 1000));
+            ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int) (now / 1000));
 
             if (pagingFinished(hintsPage, startColumn))
                 break;
@@ -362,7 +363,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 // in which the local deletion timestamp was generated on the last column in the old page, in which
                 // case the hint will have no columns (since it's deleted) but will still be included in the resultset
                 // since (even with gcgs=0) it's still a "relevant" tombstone.
-                if (!hint.isLive())
+                if (!hint.isLive(System.currentTimeMillis()))
                     continue;
 
                 startColumn = hint.name();
@@ -479,7 +480,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         RowPosition minPos = p.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
         IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
-        List<Row> rows = hintStore.getRangeSlice(range, Integer.MAX_VALUE, filter, null);
+        List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE);
         for (Row row : rows)
         {
             UUID hostId = UUIDGen.getUUID(row.key.key);
@@ -576,18 +577,22 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         RowPosition minPos = partitioner.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos);
 
-        // Get a bunch of rows!
-        List<Row> rows;
         try
         {
-            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(Table.SYSTEM_KS, SystemTable.HINTS_CF, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
+            RangeSliceCommand cmd = new RangeSliceCommand(Table.SYSTEM_KS,
+                                                          SystemTable.HINTS_CF,
+                                                          System.currentTimeMillis(),
+                                                          predicate,
+                                                          range,
+                                                          null,
+                                                          LARGE_NUMBER);
+            return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE);
         }
         catch (Exception e)
         {
             logger.info("HintsCF getEPPendingHints timed out.");
             throw new RuntimeException(e);
         }
-        return rows;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 2fdb7ad..14a21c8 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -66,7 +66,7 @@ public interface OnDiskAtom
 
         public OnDiskAtom deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException
         {
-            return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, (int)(System.currentTimeMillis() / 1000), version);
+            return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
         }
 
         public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 4020a15..c7e71f3 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -15,24 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
@@ -46,8 +28,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -57,8 +37,6 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class RangeSliceCommand implements IReadCommand
@@ -69,6 +47,8 @@ public class RangeSliceCommand implements IReadCommand
 
     public final String column_family;
 
+    public final long timestamp;
+
     public final IDiskAtomFilter predicate;
     public final List<IndexExpression> row_filter;
 
@@ -77,20 +57,40 @@ public class RangeSliceCommand implements IReadCommand
     public final boolean countCQL3Rows;
     public final boolean isPaging;
 
-    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
+    public RangeSliceCommand(String keyspace,
+                             String column_family,
+                             long timestamp,
+                             IDiskAtomFilter predicate,
+                             AbstractBounds<RowPosition> range,
+                             int maxResults)
     {
-        this(keyspace, column_family, predicate, range, null, maxResults, false, false);
+        this(keyspace, column_family, timestamp, predicate, range, null, maxResults, false, false);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
+    public RangeSliceCommand(String keyspace,
+                             String column_family,
+                             long timestamp,
+                             IDiskAtomFilter predicate,
+                             AbstractBounds<RowPosition> range,
+                             List<IndexExpression> row_filter,
+                             int maxResults)
     {
-        this(keyspace, column_family, predicate, range, row_filter, maxResults, false, false);
+        this(keyspace, column_family, timestamp, predicate, range, row_filter, maxResults, false, false);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+    public RangeSliceCommand(String keyspace,
+                             String column_family,
+                             long timestamp,
+                             IDiskAtomFilter predicate,
+                             AbstractBounds<RowPosition> range,
+                             List<IndexExpression> row_filter,
+                             int maxResults,
+                             boolean countCQL3Rows,
+                             boolean isPaging)
     {
         this.keyspace = keyspace;
         this.column_family = column_family;
+        this.timestamp = timestamp;
         this.predicate = predicate;
         this.range = range;
         this.row_filter = row_filter;
@@ -110,6 +110,7 @@ public class RangeSliceCommand implements IReadCommand
         return "RangeSliceCommand{" +
                "keyspace='" + keyspace + '\'' +
                ", column_family='" + column_family + '\'' +
+               ", timestamp='" + timestamp + '\'' +
                ", predicate=" + predicate +
                ", range=" + range +
                ", row_filter =" + row_filter +
@@ -131,27 +132,14 @@ public class RangeSliceCommand implements IReadCommand
 
 class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
 {
-    // For compatibility with pre-1.2 sake. We should remove at some point.
-    public static SlicePredicate asSlicePredicate(IDiskAtomFilter predicate)
-    {
-        SlicePredicate sp = new SlicePredicate();
-        if (predicate instanceof NamesQueryFilter)
-        {
-            sp.setColumn_names(new ArrayList<ByteBuffer>(((NamesQueryFilter)predicate).columns));
-        }
-        else
-        {
-            SliceQueryFilter sqf = (SliceQueryFilter)predicate;
-            sp.setSlice_range(new SliceRange(sqf.start(), sqf.finish(), sqf.reversed, sqf.count));
-        }
-        return sp;
-    }
-
     public void serialize(RangeSliceCommand sliceCommand, DataOutput out, int version) throws IOException
     {
         out.writeUTF(sliceCommand.keyspace);
         out.writeUTF(sliceCommand.column_family);
 
+        if (version >= MessagingService.VERSION_20)
+            out.writeLong(sliceCommand.timestamp);
+
         IDiskAtomFilter filter = sliceCommand.predicate;
         if (version < MessagingService.VERSION_20)
         {
@@ -199,6 +187,8 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         String keyspace = in.readUTF();
         String columnFamily = in.readUTF();
 
+        long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 
         IDiskAtomFilter predicate;
@@ -234,7 +224,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator);
         }
 
-        List<IndexExpression> rowFilter = null;
+        List<IndexExpression> rowFilter;
         int filterCount = in.readInt();
         rowFilter = new ArrayList<IndexExpression>(filterCount);
         for (int i = 0; i < filterCount; i++)
@@ -250,7 +240,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         int maxResults = in.readInt();
         boolean countCQL3Rows = in.readBoolean();
         boolean isPaging = in.readBoolean();
-        return new RangeSliceCommand(keyspace, columnFamily, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
+        return new RangeSliceCommand(keyspace, columnFamily, timestamp, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
     }
 
     public long serializedSize(RangeSliceCommand rsc, int version)
@@ -258,6 +248,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
         size += TypeSizes.NATIVE.sizeof(rsc.column_family);
 
+        if (version >= MessagingService.VERSION_20)
+            size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
+
         IDiskAtomFilter filter = rsc.predicate;
         if (version < MessagingService.VERSION_20)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 9342570..e30cd5b 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -237,11 +237,11 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
             }
         }
 
-        public boolean isDeleted(Column column)
+        public boolean isDeleted(Column column, long now)
         {
             for (RangeTombstone tombstone : ranges)
             {
-                if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column))
+                if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column, now))
                     return true;
             }
             return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 3cff8b6..61a2478 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -35,10 +35,10 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.RowDataResolver;
 
-
 public abstract class ReadCommand implements IReadCommand
 {
-    public enum Type {
+    public enum Type
+    {
         GET_BY_NAMES((byte)1),
         GET_SLICES((byte)2);
 
@@ -65,23 +65,25 @@ public abstract class ReadCommand implements IReadCommand
     public final String table;
     public final String cfName;
     public final ByteBuffer key;
+    public final long timestamp;
     private boolean isDigestQuery = false;
     protected final Type commandType;
 
-    protected ReadCommand(String table, ByteBuffer key, String cfName, Type cmdType)
+    protected ReadCommand(String table, ByteBuffer key, String cfName, long timestamp, Type cmdType)
     {
         this.table = table;
         this.key = key;
         this.cfName = cfName;
+        this.timestamp = timestamp;
         this.commandType = cmdType;
     }
 
-    public static ReadCommand create(String table, ByteBuffer key, String cfName, IDiskAtomFilter filter)
+    public static ReadCommand create(String table, ByteBuffer key, String cfName, long timestamp, IDiskAtomFilter filter)
     {
         if (filter instanceof SliceQueryFilter)
-            return new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+            return new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
         else
-            return new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+            return new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
     }
 
     public boolean isDigestQuery()
@@ -143,7 +145,7 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
-                newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+                newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
                 newCommand.setDigestQuery(command.isDigestQuery());
                 superColumn = scFilter.scName;
             }
@@ -187,7 +189,7 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
-                newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+                newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
                 newCommand.setDigestQuery(command.isDigestQuery());
                 superColumn = scFilter.scName;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index 52bad89..7ca57a8 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -19,25 +19,26 @@ package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.filter.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+
 public class RetriedSliceFromReadCommand extends SliceFromReadCommand
 {
     static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class);
     public final int originalCount;
 
-    public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter, int originalCount)
+    public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter, int originalCount)
     {
-        super(table, key, cfName, filter);
+        super(table, key, cfName, timestamp, filter);
         this.originalCount = originalCount;
     }
 
     @Override
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, filter, originalCount);
+        ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, timestamp, filter, originalCount);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index 49aa426..13e6f67 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -54,9 +54,9 @@ public class Row
                ')';
     }
 
-    public int getLiveCount(IDiskAtomFilter filter)
+    public int getLiveCount(IDiskAtomFilter filter, long now)
     {
-        return cf == null ? 0 : filter.getLiveCount(cf);
+        return cf == null ? 0 : filter.getLiveCount(cf, now);
     }
 
     public static class RowSerializer implements IVersionedSerializer<Row>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 4588146..c9f715c 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -52,11 +52,11 @@ public class RowIteratorFactory
      * @return A row iterator following all the given restrictions
      */
     public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
-                                          final Collection<SSTableReader> sstables,
-                                          final RowPosition startWith,
-                                          final RowPosition stopAt,
-                                          final QueryFilter filter,
-                                          final ColumnFamilyStore cfs)
+                                                     final Collection<SSTableReader> sstables,
+                                                     final RowPosition startWith,
+                                                     final RowPosition stopAt,
+                                                     final QueryFilter filter,
+                                                     final ColumnFamilyStore cfs)
     {
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.
         final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<CloseableIterator<OnDiskAtomIterator>>();
@@ -76,7 +76,7 @@ public class RowIteratorFactory
         // reduce rows from all sources into a single row
         return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
         {
-            private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+            private final int gcBefore = cfs.gcBefore(filter.timestamp);
             private final List<OnDiskAtomIterator> colIters = new ArrayList<OnDiskAtomIterator>();
             private DecoratedKey key;
             private ColumnFamily returnCF;
@@ -106,8 +106,8 @@ public class RowIteratorFactory
                 }
                 else
                 {
-                    QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter);
-                    returnCF = cfs.filterColumnFamily(cached, keyFilter, gcBefore);
+                    QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter, filter.timestamp);
+                    returnCF = cfs.filterColumnFamily(cached, keyFilter);
                 }
 
                 Row rv = new Row(key, returnCF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 909ba76..2942249 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -36,15 +36,15 @@ public class SliceByNamesReadCommand extends ReadCommand
 
     public final NamesQueryFilter filter;
 
-    public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, NamesQueryFilter filter)
+    public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter)
     {
-        super(table, key, cfName, Type.GET_BY_NAMES);
+        super(table, key, cfName, timestamp, Type.GET_BY_NAMES);
         this.filter = filter;
     }
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, filter);
+        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }
@@ -52,7 +52,7 @@ public class SliceByNamesReadCommand extends ReadCommand
     public Row getRow(Table table)
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(new QueryFilter(dk, cfName, filter));
+        return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
     }
 
     @Override
@@ -62,6 +62,7 @@ public class SliceByNamesReadCommand extends ReadCommand
                "table='" + table + '\'' +
                ", key=" + ByteBufferUtil.bytesToHex(key) +
                ", cfName='" + cfName + '\'' +
+               ", timestamp='" + timestamp + '\'' +
                ", filter=" + filter +
                ')';
     }
@@ -91,6 +92,9 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         else
             out.writeUTF(command.cfName);
 
+        if (version >= MessagingService.VERSION_20)
+            out.writeLong(cmd.timestamp);
+
         NamesQueryFilter.serializer.serialize(command.filter, out, version);
     }
 
@@ -113,6 +117,8 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
             cfName = in.readUTF();
         }
 
+        long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
         CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
         ReadCommand command;
         if (version < MessagingService.VERSION_20)
@@ -135,14 +141,14 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
 
             // Due to SC compat, it's possible we get back a slice filter at this point
             if (filter instanceof NamesQueryFilter)
-                command = new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+                command = new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
             else
-                command = new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+                command = new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
         }
         else
         {
             NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator);
-            command = new SliceByNamesReadCommand(table, key, cfName, filter);
+            command = new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
         }
 
         command.setDigestQuery(isDigest);
@@ -165,15 +171,15 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         size += sizes.sizeof((short)keySize) + keySize;
 
         if (version < MessagingService.VERSION_20)
-        {
             size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
-        }
         else
-        {
             size += sizes.sizeof(command.cfName);
-        }
+
+        if (version >= MessagingService.VERSION_20)
+            size += sizes.sizeof(cmd.timestamp);
 
         size += NamesQueryFilter.serializer.serializedSize(command.filter, version);
+
         return size;
     }
 }


[3/4] Include a timestamp with all read commands to determine column expiration

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index be64ae1..5c42de5 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -46,15 +46,15 @@ public class SliceFromReadCommand extends ReadCommand
 
     public final SliceQueryFilter filter;
 
-    public SliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter)
+    public SliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
     {
-        super(table, key, cfName, Type.GET_SLICES);
+        super(table, key, cfName, timestamp, Type.GET_SLICES);
         this.filter = filter;
     }
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, filter);
+        ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }
@@ -62,7 +62,7 @@ public class SliceFromReadCommand extends ReadCommand
     public Row getRow(Table table)
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(new QueryFilter(dk, cfName, filter));
+        return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
     }
 
     @Override
@@ -77,7 +77,7 @@ public class SliceFromReadCommand extends ReadCommand
         if (maxLiveColumns < count)
             return null;
 
-        int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf);
+        int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp);
         if (liveCountInRow < getOriginalRequestedCount())
         {
             // We asked t (= count) live columns and got l (=liveCountInRow) ones.
@@ -86,7 +86,7 @@ public class SliceFromReadCommand extends ReadCommand
             // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
             int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
             SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
-            return new RetriedSliceFromReadCommand(table, key, cfName, newFilter, getOriginalRequestedCount());
+            return new RetriedSliceFromReadCommand(table, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
         }
 
         return null;
@@ -98,7 +98,7 @@ public class SliceFromReadCommand extends ReadCommand
         if ((row == null) || (row.cf == null))
             return;
 
-        filter.trim(row.cf, getOriginalRequestedCount());
+        filter.trim(row.cf, getOriginalRequestedCount(), timestamp);
     }
 
     public IDiskAtomFilter filter()
@@ -123,6 +123,7 @@ public class SliceFromReadCommand extends ReadCommand
                "table='" + table + '\'' +
                ", key='" + ByteBufferUtil.bytesToHex(key) + '\'' +
                ", cfName='" + cfName + '\'' +
+               ", timestamp='" + timestamp + '\'' +
                ", filter='" + filter + '\'' +
                ')';
     }
@@ -147,6 +148,9 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         else
             out.writeUTF(realRM.cfName);
 
+        if (version >= MessagingService.VERSION_20)
+            out.writeLong(realRM.timestamp);
+
         SliceQueryFilter.serializer.serialize(realRM.filter, out, version);
     }
 
@@ -169,6 +173,8 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
             cfName = in.readUTF();
         }
 
+        long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
         CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
         SliceQueryFilter filter;
         if (version < MessagingService.VERSION_20)
@@ -183,7 +189,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
             filter = SliceQueryFilter.serializer.deserialize(in, version);
         }
 
-        ReadCommand command = new SliceFromReadCommand(table, key, cfName, filter);
+        ReadCommand command = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
         command.setDigestQuery(isDigest);
         return command;
     }
@@ -204,15 +210,15 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         size += sizes.sizeof((short) keySize) + keySize;
 
         if (version < MessagingService.VERSION_20)
-        {
             size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
-        }
         else
-        {
             size += sizes.sizeof(command.cfName);
-        }
+
+        if (version >= MessagingService.VERSION_20)
+            size += sizes.sizeof(cmd.timestamp);
 
         size += SliceQueryFilter.serializer.serializedSize(command.filter, version);
+
         return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
index 7b8e4d4..c1933ad 100644
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -51,10 +51,11 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
         if (exhausted)
             return null;
 
+        long now = System.currentTimeMillis();
         SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
-        QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter);
+        QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter, now);
         ColumnFamily cf = cfs.getColumnFamily(filter);
-        if (cf == null || sliceFilter.getLiveCount(cf) < DEFAULT_PAGE_SIZE)
+        if (cf == null || sliceFilter.getLiveCount(cf, now) < DEFAULT_PAGE_SIZE)
         {
             exhausted = true;
         }
@@ -62,7 +63,7 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
         {
             Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
             Column lastColumn = iter.next();
-            while (lastColumn.isMarkedForDelete())
+            while (lastColumn.isMarkedForDelete(now))
                 lastColumn = iter.next();
 
             int i = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
index 753cd91..54f59d6 100644
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -102,13 +102,13 @@ public class SuperColumns
         return scMap;
     }
 
-    public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int expireBefore, int version) throws IOException
+    public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int version) throws IOException
     {
         // Note that there was no way to insert a range tombstone in a SCF in 1.2
         cf.delete(DeletionInfo.serializer().deserialize(in, version, cf.getComparator()));
         assert !cf.deletionInfo().rangeIterator().hasNext();
 
-        Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, expireBefore);
+        Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, Integer.MIN_VALUE);
         while (iter.hasNext())
             cf.addAtom(iter.next());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index e62b258..90bfd8d 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -535,7 +535,8 @@ public class SystemTable
         ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
         QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(table)),
                                                         INDEX_CF,
-                                                        ByteBufferUtil.bytes(indexName));
+                                                        ByteBufferUtil.bytes(indexName),
+                                                        System.currentTimeMillis());
         return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
     }
 
@@ -596,7 +597,8 @@ public class SystemTable
                                                         ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                         ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                         true,
-                                                        1);
+                                                        1,
+                                                        System.currentTimeMillis());
         ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
         if (cf != null && cf.getColumnCount() != 0)
             return CounterId.wrap(cf.iterator().next().name());
@@ -629,7 +631,7 @@ public class SystemTable
         List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
 
         Table table = Table.open(Table.SYSTEM_KS);
-        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF);
+        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
         ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
 
         CounterId previous = null;
@@ -673,11 +675,10 @@ public class SystemTable
     {
         Token minToken = StorageService.getPartitioner().getMinimumToken();
 
-        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(),
-                                                                            minToken.maxKeyBound()),
-                                                     Integer.MAX_VALUE,
+        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
+                                                     null,
                                                      new IdentityQueryFilter(),
-                                                     null);
+                                                     Integer.MAX_VALUE);
     }
 
     public static Collection<RowMutation> serializeSchema()
@@ -729,7 +730,7 @@ public class SystemTable
         DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 
         ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_KEYSPACES_CF);
-        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF));
+        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
 
         return new Row(key, result);
     }
@@ -743,7 +744,8 @@ public class SystemTable
                                                         DefsTable.searchComposite(cfName, true),
                                                         DefsTable.searchComposite(cfName, false),
                                                         false,
-                                                        Integer.MAX_VALUE);
+                                                        Integer.MAX_VALUE,
+                                                        System.currentTimeMillis());
 
         return new Row(key, result);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 90f6dfa..6c9f50d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -628,7 +628,7 @@ public class CompactionManager implements CompactionManagerMBean
             // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
             // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
             // 'as good as in the non-snapshot' case)
-            gcBefore = (int) (cfs.getSnapshotCreationTime(validator.request.sessionid) / 1000) - cfs.metadata.getGcGraceSeconds();
+            gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(validator.request.sessionid));
         }
         else
         {
@@ -731,9 +731,7 @@ public class CompactionManager implements CompactionManagerMBean
     {
         // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
         // add any GcGrace however since 2ndary indexes are local to a node.
-        return cfs.isIndex()
-               ? (int) (System.currentTimeMillis() / 1000)
-               : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+        return cfs.isIndex() ? (int) (System.currentTimeMillis() / 1000) : cfs.gcBefore(System.currentTimeMillis());
     }
 
     private static class ValidationCompactionIterable extends CompactionIterable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index e9cd2f1..32361b2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -209,7 +209,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 Column column = (Column) current;
                 container.addColumn(column);
                 if (indexer != SecondaryIndexManager.nullUpdater
-                    && !column.isMarkedForDelete()
+                    && !column.isMarkedForDelete(System.currentTimeMillis())
                     && !container.getColumn(column.name()).equals(column))
                 {
                     indexer.remove(column);
@@ -247,7 +247,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
                 // not the range tombstone. For that we use the columnIndexer tombstone tracker.
                 // Note that this doesn't work for super columns.
-                if (indexBuilder.tombstoneTracker().isDeleted(reduced))
+                if (indexBuilder.tombstoneTracker().isDeleted(reduced, System.currentTimeMillis()))
                     return null;
 
                 columns++;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index f69a955..85f2a23 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -133,7 +133,7 @@ public class PrecompactedRow extends AbstractCompactedRow
             {
                 container.addColumn(column);
                 if (indexer != SecondaryIndexManager.nullUpdater
-                    && !column.isMarkedForDelete()
+                    && !column.isMarkedForDelete(System.currentTimeMillis())
                     && !container.getColumn(column.name()).equals(column))
                 {
                     indexer.remove(column);
@@ -149,7 +149,7 @@ public class PrecompactedRow extends AbstractCompactedRow
         };
 
         Iterator<Column> reduced = MergeIterator.get(data, fcomp, reducer);
-        filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC);
+        filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC, System.currentTimeMillis());
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index 697ad02..a5d54ce 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -31,18 +31,24 @@ public class ColumnCounter
 {
     protected int live;
     protected int ignored;
+    protected final long timestamp;
+
+    public ColumnCounter(long timestamp)
+    {
+        this.timestamp = timestamp;
+    }
 
     public void count(Column column, ColumnFamily container)
     {
-        if (!isLive(column, container))
+        if (!isLive(column, container, timestamp))
             ignored++;
         else
             live++;
     }
 
-    protected static boolean isLive(Column column, ColumnFamily container)
+    protected static boolean isLive(Column column, ColumnFamily container, long timestamp)
     {
-        return column.isLive() && (!container.deletionInfo().isDeleted(column));
+        return column.isLive(timestamp) && (!container.deletionInfo().isDeleted(column));
     }
 
     public int live()
@@ -71,8 +77,9 @@ public class ColumnCounter
          *                column. If 0, all columns are grouped, otherwise we group
          *                those for which the {@code toGroup} first component are equals.
          */
-        public GroupByPrefix(CompositeType type, int toGroup)
+        public GroupByPrefix(long timestamp, CompositeType type, int toGroup)
         {
+            super(timestamp);
             this.type = type;
             this.toGroup = toGroup;
 
@@ -81,7 +88,7 @@ public class ColumnCounter
 
         public void count(Column column, ColumnFamily container)
         {
-            if (!isLive(column, container))
+            if (!isLive(column, container, timestamp))
             {
                 ignored++;
                 return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 064cc6e..d061ac2 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -44,34 +44,42 @@ public abstract class ExtendedFilter
     private static final Logger logger = LoggerFactory.getLogger(ExtendedFilter.class);
 
     public final ColumnFamilyStore cfs;
+    public final long timestamp;
     protected final IDiskAtomFilter originalFilter;
     private final int maxResults;
     private final boolean countCQL3Rows;
     private final boolean isPaging;
 
-    public static ExtendedFilter create(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows, boolean isPaging)
+    public static ExtendedFilter create(ColumnFamilyStore cfs,
+                                        List<IndexExpression> clause,
+                                        IDiskAtomFilter filter,
+                                        int maxResults,
+                                        long timestamp,
+                                        boolean countCQL3Rows,
+                                        boolean isPaging)
     {
         if (clause == null || clause.isEmpty())
         {
-            return new EmptyClauseFilter(cfs, filter, maxResults, countCQL3Rows, isPaging);
+            return new EmptyClauseFilter(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
         }
         else
         {
             if (isPaging)
                 throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
             return cfs.getComparator() instanceof CompositeType
-                 ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, countCQL3Rows)
-                 : new FilterWithClauses(cfs, filter, clause, maxResults, countCQL3Rows);
+                 ? new FilterWithCompositeClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows)
+                 : new FilterWithClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
         }
     }
 
-    protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+    protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, long timestamp, boolean countCQL3Rows, boolean isPaging)
     {
         assert cfs != null;
         assert filter != null;
         this.cfs = cfs;
         this.originalFilter = filter;
         this.maxResults = maxResults;
+        this.timestamp = timestamp;
         this.countCQL3Rows = countCQL3Rows;
         this.isPaging = isPaging;
         if (countCQL3Rows)
@@ -112,7 +120,7 @@ public abstract class ExtendedFilter
         if (initialFilter() instanceof SliceQueryFilter)
             return ((SliceQueryFilter)initialFilter()).lastCounted();
         else
-            return initialFilter().getLiveCount(data);
+            return initialFilter().getLiveCount(data, timestamp);
     }
 
     /** The initial filter we'll do our first slice with (either the original or a superset of it) */
@@ -167,9 +175,14 @@ public abstract class ExtendedFilter
         protected final List<IndexExpression> clause;
         protected final IDiskAtomFilter initialFilter;
 
-        public FilterWithClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
+        public FilterWithClauses(ColumnFamilyStore cfs,
+                                 List<IndexExpression> clause,
+                                 IDiskAtomFilter filter,
+                                 int maxResults,
+                                 long timestamp,
+                                 boolean countCQL3Rows)
         {
-            super(cfs, filter, maxResults, countCQL3Rows, false);
+            super(cfs, filter, maxResults, timestamp, countCQL3Rows, false);
             assert clause != null;
             this.clause = clause;
             this.initialFilter = computeInitialFilter();
@@ -271,7 +284,7 @@ public abstract class ExtendedFilter
                 return data;
             ColumnFamily pruned = data.cloneMeShallow();
             OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
-            originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore());
+            originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
             return pruned;
         }
 
@@ -335,9 +348,14 @@ public abstract class ExtendedFilter
 
     private static class FilterWithCompositeClauses extends FilterWithClauses
     {
-        public FilterWithCompositeClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
+        public FilterWithCompositeClauses(ColumnFamilyStore cfs,
+                                          List<IndexExpression> clause,
+                                          IDiskAtomFilter filter,
+                                          int maxResults,
+                                          long timestamp,
+                                          boolean countCQL3Rows)
         {
-            super(cfs, filter, clause, maxResults, countCQL3Rows);
+            super(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
         }
 
         /*
@@ -359,9 +377,14 @@ public abstract class ExtendedFilter
 
     private static class EmptyClauseFilter extends ExtendedFilter
     {
-        public EmptyClauseFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+        public EmptyClauseFilter(ColumnFamilyStore cfs,
+                                 IDiskAtomFilter filter,
+                                 int maxResults,
+                                 long timestamp,
+                                 boolean countCQL3Rows,
+                                 boolean isPaging)
         {
-            super(cfs, filter, maxResults, countCQL3Rows, isPaging);
+            super(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
         }
 
         public IDiskAtomFilter initialFilter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index e032f1d..35f71e5 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -66,14 +66,14 @@ public interface IDiskAtomFilter
      * by the filter code, which should have some limit on the number of columns
      * to avoid running out of memory on large rows.
      */
-    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore);
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now);
 
     public Comparator<Column> getColumnComparator(AbstractType<?> comparator);
 
     public boolean isReversed();
     public void updateColumnsLimit(int newLimit);
 
-    public int getLiveCount(ColumnFamily cf);
+    public int getLiveCount(ColumnFamily cf, long now);
 
     public IDiskAtomFilter cloneShallow();
     public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 570eb29..297f227 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -90,7 +90,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return new SSTableNamesIterator(sstable, file, key, columns, indexEntry);
     }
 
-    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
     {
         while (reducedColumns.hasNext())
             container.addIfRelevant(reducedColumns.next(), gcBefore);
@@ -118,15 +118,15 @@ public class NamesQueryFilter implements IDiskAtomFilter
     {
     }
 
-    public int getLiveCount(ColumnFamily cf)
+    public int getLiveCount(ColumnFamily cf, long now)
     {
         if (countCQL3Rows)
-            return cf.hasOnlyTombstones() ? 0 : 1;
+            return cf.hasOnlyTombstones(now) ? 0 : 1;
 
         int count = 0;
         for (Column column : cf)
         {
-            if (column.isLive())
+            if (column.isLive(now))
                 count++;
         }
         return count;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 126b240..ac0c632 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -33,12 +33,14 @@ public class QueryFilter
     public final DecoratedKey key;
     public final String cfName;
     public final IDiskAtomFilter filter;
+    public final long timestamp;
 
-    public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter)
+    public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter, long timestamp)
     {
         this.key = key;
         this.cfName = cfName;
         this.filter = filter;
+        this.timestamp = timestamp;
     }
 
     public OnDiskAtomIterator getMemtableColumnIterator(Memtable memtable)
@@ -79,7 +81,7 @@ public class QueryFilter
     public void collateOnDiskAtom(ColumnFamily returnCF, Iterator<? extends OnDiskAtom> toCollate, int gcBefore)
     {
         Iterator<Column> columns = gatherTombstones(returnCF, toCollate);
-        filter.collectReducedColumns(returnCF, columns, gcBefore);
+        filter.collectReducedColumns(returnCF, columns, gcBefore, timestamp);
     }
 
     public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, final int gcBefore)
@@ -107,7 +109,7 @@ public class QueryFilter
         };
         Iterator<Column> reduced = MergeIterator.get(toCollate, fcomp, reducer);
 
-        filter.collectReducedColumns(returnCF, reduced, gcBefore);
+        filter.collectReducedColumns(returnCF, reduced, gcBefore, timestamp);
     }
 
     /**
@@ -178,19 +180,26 @@ public class QueryFilter
      * @param finish column to stop slice at, inclusive; empty for "the last column"
      * @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
      * @param limit maximum number of non-deleted columns to return
+     * @param timestamp time to use for determining expiring columns' state
      */
-    public static QueryFilter getSliceFilter(DecoratedKey key, String cfName, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+    public static QueryFilter getSliceFilter(DecoratedKey key,
+                                             String cfName,
+                                             ByteBuffer start,
+                                             ByteBuffer finish,
+                                             boolean reversed,
+                                             int limit,
+                                             long timestamp)
     {
-        return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit));
+        return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit), timestamp);
     }
 
     /**
      * return a QueryFilter object that includes every column in the row.
      * This is dangerous on large rows; avoid except for test code.
      */
-    public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName)
+    public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName, long timestamp)
     {
-        return new QueryFilter(key, cfName, new IdentityQueryFilter());
+        return new QueryFilter(key, cfName, new IdentityQueryFilter(), timestamp);
     }
 
     /**
@@ -199,17 +208,17 @@ public class QueryFilter
      * @param cfName column family to query
      * @param columns the column names to restrict the results to, sorted in comparator order
      */
-    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns)
+    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns, long timestamp)
     {
-        return new QueryFilter(key, cfName, new NamesQueryFilter(columns));
+        return new QueryFilter(key, cfName, new NamesQueryFilter(columns), timestamp);
     }
 
     /**
      * convenience method for creating a name filter matching a single column
      */
-    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column)
+    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column, long timestamp)
     {
-        return new QueryFilter(key, cfName, new NamesQueryFilter(column));
+        return new QueryFilter(key, cfName, new NamesQueryFilter(column), timestamp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index fda7ac5..b76ce04 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -120,9 +120,9 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return reversed ? comparator.columnReverseComparator : comparator.columnComparator;
     }
 
-    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
     {
-        columnCounter = getColumnCounter(container);
+        columnCounter = getColumnCounter(container, now);
 
         while (reducedColumns.hasNext())
         {
@@ -142,28 +142,28 @@ public class SliceQueryFilter implements IDiskAtomFilter
         Tracing.trace("Read {} live and {} tombstoned cells", columnCounter.live(), columnCounter.ignored());
     }
 
-    public int getLiveCount(ColumnFamily cf)
+    public int getLiveCount(ColumnFamily cf, long now)
     {
-        ColumnCounter counter = getColumnCounter(cf);
+        ColumnCounter counter = getColumnCounter(cf, now);
         for (Column column : cf)
             counter.count(column, cf);
         return counter.live();
     }
 
-    private ColumnCounter getColumnCounter(ColumnFamily container)
+    private ColumnCounter getColumnCounter(ColumnFamily container, long now)
     {
         AbstractType<?> comparator = container.getComparator();
         if (compositesToGroup < 0)
-            return new ColumnCounter();
+            return new ColumnCounter(now);
         else if (compositesToGroup == 0)
-            return new ColumnCounter.GroupByPrefix(null, 0);
+            return new ColumnCounter.GroupByPrefix(now, null, 0);
         else
-            return new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+            return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup);
     }
 
-    public void trim(ColumnFamily cf, int trimTo)
+    public void trim(ColumnFamily cf, int trimTo, long now)
     {
-        ColumnCounter counter = getColumnCounter(cf);
+        ColumnCounter counter = getColumnCounter(cf, now);
 
         Collection<Column> columns = reversed
                                    ? cf.getReverseSortedColumns()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index f12acdc..4d0914a 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -91,7 +91,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     public void delete(ByteBuffer rowKey, Column column)
     {
-        if (column.isMarkedForDelete())
+        if (column.isMarkedForDelete(System.currentTimeMillis()))
             return;
 
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, column));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index af370d5..17ac81f 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -509,12 +509,17 @@ public class SecondaryIndexManager
      * Performs a search across a number of column indexes
      * TODO: add support for querying across index types
      *
-     * @param clause the index query clause
      * @param range the row range to restrict to
-     * @param dataFilter the column range to restrict to
+     * @param clause the index query clause
+     * @param columnFilter the column range to restrict to
      * @return found indexed rows
      */
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter columnFilter,
+                            int maxResults,
+                            long now,
+                            boolean countCQL3Rows)
     {
         List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
 
@@ -525,8 +530,7 @@ public class SecondaryIndexManager
         if (indexSearchers.size() > 1)
             throw new RuntimeException("Unable to search across multiple secondary index types");
 
-
-        return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, countCQL3Rows);
+        return indexSearchers.get(0).search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
     }
 
     public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
@@ -584,7 +588,7 @@ public class SecondaryIndexManager
 
         public void insert(Column column)
         {
-            if (column.isMarkedForDelete())
+            if (column.isMarkedForDelete(System.currentTimeMillis()))
                 return;
 
             for (SecondaryIndex index : indexFor(column.name()))
@@ -605,7 +609,7 @@ public class SecondaryIndexManager
                 {
                     // insert the new value before removing the old one, so we never have a period
                     // where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
-                    if (!column.isMarkedForDelete())
+                    if (!column.isMarkedForDelete(System.currentTimeMillis()))
                         ((PerColumnSecondaryIndex) index).insert(key.key, column);
                     ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
                 }
@@ -614,7 +618,7 @@ public class SecondaryIndexManager
 
         public void remove(Column column)
         {
-            if (column.isMarkedForDelete())
+            if (column.isMarkedForDelete(System.currentTimeMillis()))
                 return;
 
             for (SecondaryIndex index : indexFor(column.name()))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 16ac091..ddd79dd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -39,7 +39,12 @@ public abstract class SecondaryIndexSearcher
         this.baseCfs = indexManager.baseCfs;
     }
 
-    public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows);
+    public abstract List<Row> search(AbstractBounds<RowPosition> range,
+                                     List<IndexExpression> clause,
+                                     IDiskAtomFilter dataFilter,
+                                     int maxResults,
+                                     long now,
+                                     boolean countCQL3Rows);
 
     /**
      * @return true this index is able to handle given clauses.
@@ -49,16 +54,6 @@ public abstract class SecondaryIndexSearcher
         return highestSelectivityPredicate(clause) != null;
     }
 
-    protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue)
-    {
-        Column liveColumn = liveData.getColumn(indexedColumnName);
-        if (liveColumn == null || liveColumn.isMarkedForDelete())
-            return true;
-        
-        ByteBuffer liveValue = liveColumn.value();
-        return 0 != liveData.metadata().getValueValidator(indexedColumnName).compare(indexedValue, liveValue);
-    }
-
     protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
     {
         IndexExpression best = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 131b8c6..0720e83 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -93,7 +93,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry);
 
-    public abstract boolean isStale(IndexedEntry entry, ColumnFamily data);
+    public abstract boolean isStale(IndexedEntry entry, ColumnFamily data, long now);
 
     public void delete(IndexedEntry entry)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 8ad990e..b767d6c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -107,9 +107,9 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
         return true;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
-        return data == null || data.hasOnlyTombstones();
+        return data == null || data.hasOnlyTombstones(now);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index 2034c71..7a00de8 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -95,9 +95,9 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
         return true;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
-        return data == null || data.hasOnlyTombstones();
+        return data == null || data.hasOnlyTombstones(now);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index 40a2ee1..7159c23 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -91,11 +91,11 @@ public class CompositesIndexOnRegular extends CompositesIndex
             && comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
         ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).build();
         Column liveColumn = data.getColumn(bb);
-        if (liveColumn == null || liveColumn.isMarkedForDelete())
+        if (liveColumn == null || liveColumn.isMarkedForDelete(now))
             return true;
 
         ByteBuffer liveValue = liveColumn.value();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index e16d94d..e8c0a09 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -21,18 +21,18 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class CompositesSearcher extends SecondaryIndexSearcher
 {
@@ -44,10 +44,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter dataFilter,
+                            int maxResults,
+                            long now,
+                            boolean countCQL3Rows)
     {
         assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
+        ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
         return baseCfs.filter(getIndexedIterator(range, filter), filter);
     }
 
@@ -69,7 +74,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         return isStart ? builder.build() : builder.buildAsEndOfRange();
     }
 
-    public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
     {
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
@@ -80,8 +85,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
 
         if (logger.isDebugEnabled())
-            logger.debug("Most-selective indexed predicate is {}",
-                         ((AbstractSimplePerColumnSecondaryIndex) index).expressionString(primary));
+            logger.debug("Most-selective indexed predicate is {}", index.expressionString(primary));
 
         /*
          * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
@@ -155,14 +159,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                         if (logger.isTraceEnabled())
                             logger.trace("Scanning index {} starting with {}",
-                                         ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), indexComparator.getString(startPrefix));
+                                         index.expressionString(primary), indexComparator.getString(startPrefix));
 
                         QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
                                                                              index.getIndexCfs().name,
                                                                              lastSeenPrefix,
                                                                              endPrefix,
                                                                              false,
-                                                                             rowsPerQuery);
+                                                                             rowsPerQuery,
+                                                                             filter.timestamp);
                         ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
                         if (indexRow == null || indexRow.getColumnCount() == 0)
                             return makeReturn(currentKey, data);
@@ -185,7 +190,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                     {
                         Column column = indexColumns.poll();
                         lastSeenPrefix = column.name();
-                        if (column.isMarkedForDelete())
+                        if (column.isMarkedForDelete(filter.timestamp))
                         {
                             logger.trace("skipping {}", column.name());
                             continue;
@@ -242,8 +247,8 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                                                                            false,
                                                                            Integer.MAX_VALUE,
                                                                            baseCfs.metadata.clusteringKeyColumns().size());
-                        ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter));
-                        if (index.isStale(entry, newData))
+                        ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
+                        if (index.isStale(entry, newData, filter.timestamp))
                         {
                             index.delete(entry);
                             continue;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index cd89e92..f47b1e1 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -48,10 +48,10 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
         return new KeysSearcher(baseCfs.indexManager, columns);
     }
 
-    public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data)
+    public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
     {
         Column liveColumn = data.getColumn(columnDef.name);
-        if (liveColumn == null || liveColumn.isMarkedForDelete())
+        if (liveColumn == null || liveColumn.isMarkedForDelete(now))
             return true;
 
         ByteBuffer liveValue = liveColumn.value();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 8901bf5..e919d8a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
@@ -33,8 +36,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.HeapAllocator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class KeysSearcher extends SecondaryIndexSearcher
 {
@@ -46,14 +47,19 @@ public class KeysSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter dataFilter,
+                            int maxResults,
+                            long now,
+                            boolean countCQL3Rows)
     {
         assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
+        ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
         return baseCfs.filter(getIndexedIterator(range, filter), filter);
     }
 
-    public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
     {
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
@@ -106,7 +112,8 @@ public class KeysSearcher extends SecondaryIndexSearcher
                                                                              lastSeenKey,
                                                                              endKey,
                                                                              false,
-                                                                             rowsPerQuery);
+                                                                             rowsPerQuery,
+                                                                             filter.timestamp);
                         ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
                         logger.trace("fetched {}", indexRow);
                         if (indexRow == null)
@@ -139,7 +146,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                     {
                         Column column = indexColumns.next();
                         lastSeenKey = column.name();
-                        if (column.isMarkedForDelete())
+                        if (column.isMarkedForDelete(filter.timestamp))
                         {
                             logger.trace("skipping {}", column.name());
                             continue;
@@ -158,7 +165,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         }
 
                         logger.trace("Returning index hit for {}", dk);
-                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter()));
+                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter(), filter.timestamp));
                         // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
                         if (data == null)
                             data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
@@ -168,12 +175,12 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
                         if (extraFilter != null)
                         {
-                            ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter));
+                            ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));
                             if (cf != null)
                                 data.addAll(cf, HeapAllocator.instance);
                         }
 
-                        if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data))
+                        if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data, filter.timestamp))
                         {
                             // delete the index entry w/ its own timestamp
                             Column dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index d35e14d..65164ef 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -181,7 +181,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
     {
         ColumnFamily cf = columnFamily.cloneMeShallow(containerFactory, false);
         // since we already read column count, just pass that value and continue deserialization
-        columnFamily.serializer.deserializeColumnsFromSSTable(in, cf, columnCount, flag, expireBefore, dataVersion);
+        Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(in, columnCount, flag, expireBefore, dataVersion);
+        while (iter.hasNext())
+            cf.addAtom(iter.next());
+
         if (validateColumns)
         {
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8119388..f63ea6a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1076,12 +1076,12 @@ public class SSTableReader extends SSTable
         return getScanner((RateLimiter) null);
     }
 
-   public SSTableScanner getScanner(RateLimiter limiter)
-   {
-       return new SSTableScanner(this, null, limiter);
-   }
+    public SSTableScanner getScanner(RateLimiter limiter)
+    {
+        return new SSTableScanner(this, null, limiter);
+    }
 
-   /**
+    /**
     * Direct I/O SSTableScanner over a defined range of tokens.
     *
     * @param range the range of keys to cover

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b692ab0..82ecbe9 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -836,7 +836,7 @@ public class ActiveRepairService
                 if (isSequential)
                     makeSnapshots(endpoints);
 
-                int gcBefore = (int)(System.currentTimeMillis()/1000) - Table.open(tablename).getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+                int gcBefore = Table.open(tablename).getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
 
                 for (InetAddress endpoint : allEndpoints)
                     treeRequests.add(new TreeRequest(getName(), endpoint, range, gcBefore, new CFPair(tablename, cfname)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index d301507..b3cef49 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -300,7 +300,7 @@ public class CacheService implements CacheServiceMBean
                 public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
                 {
                     DecoratedKey key = cfs.partitioner.decorateKey(buffer);
-                    ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name), Integer.MIN_VALUE);
+                    ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE);
                     return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
                 }
             });
@@ -311,7 +311,7 @@ public class CacheService implements CacheServiceMBean
             for (ByteBuffer key : buffers)
             {
                 DecoratedKey dk = cfs.partitioner.decorateKey(key);
-                ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name), Integer.MIN_VALUE);
+                ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE);
                 if (data != null)
                     rowCache.put(new RowCacheKey(cfs.metadata.cfId, dk), data);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 5ffcdbe..4aab87d 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -48,13 +48,15 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
     };
 
     private final String table;
+    private final long timestamp;
     private List<InetAddress> sources;
     protected final Collection<MessageIn<RangeSliceReply>> responses = new ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>();
     public final List<AsyncOneResponse> repairResults = new ArrayList<AsyncOneResponse>();
 
-    public RangeSliceResponseResolver(String table)
+    public RangeSliceResponseResolver(String table, long timestamp)
     {
         this.table = table;
+        this.timestamp = timestamp;
     }
 
     public void setSources(List<InetAddress> endpoints)
@@ -142,7 +144,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
         protected Row getReduced()
         {
             ColumnFamily resolved = versions.size() > 1
-                                  ? RowDataResolver.resolveSuperset(versions)
+                                  ? RowDataResolver.resolveSuperset(versions, timestamp)
                                   : versions.get(0);
             if (versions.size() < sources.size())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index b6f257e..f63fcb1 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -35,9 +35,20 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
     {
         ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
         if (cfs.indexManager.hasIndexFor(command.row_filter))
-            return cfs.search(command.row_filter, command.range, command.maxResults, command.predicate, command.countCQL3Rows);
+            return cfs.search(command.range,
+                              command.row_filter,
+                              command.predicate,
+                              command.maxResults,
+                              command.timestamp,
+                              command.countCQL3Rows);
         else
-            return cfs.getRangeSlice(command.range, command.maxResults, command.predicate, command.row_filter, command.countCQL3Rows, command.isPaging);
+            return cfs.getRangeSlice(command.range,
+                                     command.row_filter,
+                                     command.predicate,
+                                     command.maxResults,
+                                     command.timestamp,
+                                     command.countCQL3Rows,
+                                     command.isPaging);
     }
 
     public void doVerb(MessageIn<RangeSliceCommand> message, int id)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index fe7f4d7..7cb5a23 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -186,7 +186,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
                 ReadRepairMetrics.repairedBackground.mark();
                 
                 ReadCommand readCommand = (ReadCommand) command;
-                final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter());
+                final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter(), readCommand.timestamp);
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
                 MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
index 8533f4f..69cd381 100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -39,11 +39,13 @@ public class RowDataResolver extends AbstractRowResolver
     private int maxLiveCount = 0;
     public List<AsyncOneResponse> repairResults = Collections.emptyList();
     private final IDiskAtomFilter filter;
+    private final long timestamp;
 
-    public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
+    public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter, long timestamp)
     {
         super(key, table);
         this.filter = qFilter;
+        this.timestamp = timestamp;
     }
 
     /*
@@ -74,12 +76,12 @@ public class RowDataResolver extends AbstractRowResolver
                 endpoints.add(message.from);
 
                 // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
-                int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
+                int liveCount = cf == null ? 0 : filter.getLiveCount(cf, timestamp);
                 if (liveCount > maxLiveCount)
                     maxLiveCount = liveCount;
             }
 
-            resolved = resolveSuperset(versions);
+            resolved = resolveSuperset(versions, timestamp);
             if (logger.isDebugEnabled())
                 logger.debug("versions merged");
 
@@ -125,7 +127,7 @@ public class RowDataResolver extends AbstractRowResolver
         return results;
     }
 
-    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
+    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions, long now)
     {
         assert Iterables.size(versions) > 0;
 
@@ -146,7 +148,7 @@ public class RowDataResolver extends AbstractRowResolver
         // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
         // this will handle removing columns and subcolumns that are supressed by a row or
         // supercolumn tombstone.
-        QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter());
+        QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter(), now);
         List<CloseableIterator<Column>> iters = new ArrayList<CloseableIterator<Column>>();
         for (ColumnFamily version : versions)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1383be7..9d2f6c1 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -212,9 +212,13 @@ public class StorageProxy implements StorageProxyMBean
 
             // read the current value and compare with expected
             Tracing.trace("Reading existing values for CAS precondition");
-            ReadCommand readCommand = expected == null
-                                    ? new SliceFromReadCommand(table, key, cfName, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1))
-                                    : new SliceByNamesReadCommand(table, key, cfName, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
+            long timestamp = System.currentTimeMillis();
+            IDiskAtomFilter filter = expected == null
+                                   ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
+                                   : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames()));
+            ReadCommand readCommand = filter instanceof SliceQueryFilter
+                                    ? new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter) filter)
+                                    : new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter) filter);
             List<Row> rows = read(Arrays.asList(readCommand), ConsistencyLevel.QUORUM);
             ColumnFamily current = rows.get(0).cf;
             if (!casApplies(expected, current))
@@ -245,16 +249,18 @@ public class StorageProxy implements StorageProxyMBean
         throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1);
     }
 
-    private static boolean hasLiveColumns(ColumnFamily cf)
+    private static boolean hasLiveColumns(ColumnFamily cf, long now)
     {
-        return cf != null && !cf.hasOnlyTombstones();
+        return cf != null && !cf.hasOnlyTombstones(now);
     }
 
     private static boolean casApplies(ColumnFamily expected, ColumnFamily current)
     {
-        if (!hasLiveColumns(expected))
-            return !hasLiveColumns(current);
-        else if (!hasLiveColumns(current))
+        long now = System.currentTimeMillis();
+
+        if (!hasLiveColumns(expected, now))
+            return !hasLiveColumns(current, now);
+        else if (!hasLiveColumns(current, now))
             return false;
 
         // current has been built from expected, so we know that it can't have columns
@@ -264,14 +270,14 @@ public class StorageProxy implements StorageProxyMBean
         for (Column e : expected)
         {
             Column c = current.getColumn(e.name());
-            if (e.isLive())
+            if (e.isLive(now))
             {
-                if (!(c != null && c.isLive() && c.value().equals(e.value())))
+                if (!(c != null && c.isLive(now) && c.value().equals(e.value())))
                     return false;
             }
             else
             {
-                if (c != null && c.isLive())
+                if (c != null && c.isLive(now))
                     return false;
             }
         }
@@ -1179,7 +1185,7 @@ public class StorageProxy implements StorageProxyMBean
                     ReadRepairMetrics.repairedBlocking.mark();
 
                     // Do a full data read to resolve the correct response (and repair node that need be)
-                    RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
+                    RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter(), exec.command.timestamp);
                     ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
 
                     if (repairCommands == null)
@@ -1397,6 +1403,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
                                                                   command.column_family,
+                                                                  command.timestamp,
                                                                   commandPredicate,
                                                                   range,
                                                                   command.row_filter,
@@ -1405,7 +1412,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                   command.isPaging);
 
                 // collect replies and resolve according to consistency level
-                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
+                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
                 ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback(resolver, consistency_level, nodeCmd, filteredEndpoints);
                 handler.assureSufficientLiveNodes();
                 resolver.setSources(filteredEndpoints);
@@ -1431,7 +1438,7 @@ public class StorageProxy implements StorageProxyMBean
                     {
                         rows.add(row);
                         if (nodeCmd.countCQL3Rows)
-                            cql3RowCount += row.getLiveCount(commandPredicate);
+                            cql3RowCount += row.getLiveCount(commandPredicate, command.timestamp);
                     }
                     FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                 }


[2/4] Include a timestamp with all read commands to determine column expiration

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 2daa910..8f1fd21 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -32,10 +32,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.cassandra.auth.AuthenticatedUser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.auth.AuthenticatedUser;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -121,15 +121,15 @@ public class CassandraServer implements Cassandra.Iface
         return columnFamilyKeyMap;
     }
 
-    public List<ColumnOrSuperColumn> thriftifyColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder)
+    public List<ColumnOrSuperColumn> thriftifyColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder, long now)
     {
         ArrayList<ColumnOrSuperColumn> thriftColumns = new ArrayList<ColumnOrSuperColumn>(columns.size());
         for (org.apache.cassandra.db.Column column : columns)
         {
-            if (column.isMarkedForDelete())
+            if (column.isMarkedForDelete(now))
                 continue;
 
-            thriftColumns.add(thriftifyColumn(column));
+            thriftColumns.add(thriftifyColumnWithName(column, column.name()));
         }
 
         // we have to do the reversing here, since internally we pass results around in ColumnFamily
@@ -142,22 +142,15 @@ public class CassandraServer implements Cassandra.Iface
 
     private ColumnOrSuperColumn thriftifyColumnWithName(org.apache.cassandra.db.Column column, ByteBuffer newName)
     {
-        assert !column.isMarkedForDelete();
-
         if (column instanceof org.apache.cassandra.db.CounterColumn)
             return new ColumnOrSuperColumn().setCounter_column(thriftifySubCounter(column).setName(newName));
         else
             return new ColumnOrSuperColumn().setColumn(thriftifySubColumn(column).setName(newName));
     }
 
-    private ColumnOrSuperColumn thriftifyColumn(org.apache.cassandra.db.Column column)
-    {
-        return thriftifyColumnWithName(column, column.name());
-    }
-
     private Column thriftifySubColumn(org.apache.cassandra.db.Column column)
     {
-        assert !column.isMarkedForDelete() && !(column instanceof org.apache.cassandra.db.CounterColumn);
+        assert !(column instanceof org.apache.cassandra.db.CounterColumn);
 
         Column thrift_column = new Column(column.name()).setValue(column.value()).setTimestamp(column.timestamp());
         if (column instanceof ExpiringColumn)
@@ -169,18 +162,22 @@ public class CassandraServer implements Cassandra.Iface
 
     private CounterColumn thriftifySubCounter(org.apache.cassandra.db.Column column)
     {
-        assert !column.isMarkedForDelete() && (column instanceof org.apache.cassandra.db.CounterColumn);
+        assert column instanceof org.apache.cassandra.db.CounterColumn;
         return new CounterColumn(column.name(), CounterContext.instance().total(column.value()));
     }
 
-    private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder, boolean subcolumnsOnly, boolean isCounterCF)
+    private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<org.apache.cassandra.db.Column> columns,
+                                                            boolean reverseOrder,
+                                                            long now,
+                                                            boolean subcolumnsOnly,
+                                                            boolean isCounterCF)
     {
         if (subcolumnsOnly)
         {
             ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(columns.size());
             for (org.apache.cassandra.db.Column column : columns)
             {
-                if (column.isMarkedForDelete())
+                if (column.isMarkedForDelete(now))
                     continue;
 
                 thriftSuperColumns.add(thriftifyColumnWithName(column, SuperColumns.subName(column.name())));
@@ -192,19 +189,19 @@ public class CassandraServer implements Cassandra.Iface
         else
         {
             if (isCounterCF)
-                return thriftifyCounterSuperColumns(columns, reverseOrder);
+                return thriftifyCounterSuperColumns(columns, reverseOrder, now);
             else
-                return thriftifySuperColumns(columns, reverseOrder);
+                return thriftifySuperColumns(columns, reverseOrder, now);
         }
     }
 
-    private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder)
+    private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder, long now)
     {
         ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(columns.size());
         SuperColumn current = null;
         for (org.apache.cassandra.db.Column column : columns)
         {
-            if (column.isMarkedForDelete())
+            if (column.isMarkedForDelete(now))
                 continue;
 
             ByteBuffer scName = SuperColumns.scName(column.name());
@@ -222,13 +219,13 @@ public class CassandraServer implements Cassandra.Iface
         return thriftSuperColumns;
     }
 
-    private List<ColumnOrSuperColumn> thriftifyCounterSuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder)
+    private List<ColumnOrSuperColumn> thriftifyCounterSuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder, long now)
     {
         ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(columns.size());
         CounterSuperColumn current = null;
         for (org.apache.cassandra.db.Column column : columns)
         {
-            if (column.isMarkedForDelete())
+            if (column.isMarkedForDelete(now))
                 continue;
 
             ByteBuffer scName = SuperColumns.scName(column.name());
@@ -255,14 +252,14 @@ public class CassandraServer implements Cassandra.Iface
         {
             ColumnFamily cf = columnFamilies.get(StorageService.getPartitioner().decorateKey(command.key));
             boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).filter.reversed;
-            List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(cf, subColumnsOnly, reverseOrder);
+            List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(cf, subColumnsOnly, reverseOrder, command.timestamp);
             columnFamiliesMap.put(command.key, thriftifiedColumns);
         }
 
         return columnFamiliesMap;
     }
 
-    private List<ColumnOrSuperColumn> thriftifyColumnFamily(ColumnFamily cf, boolean subcolumnsOnly, boolean reverseOrder)
+    private List<ColumnOrSuperColumn> thriftifyColumnFamily(ColumnFamily cf, boolean subcolumnsOnly, boolean reverseOrder, long now)
     {
         if (cf == null || cf.getColumnCount() == 0)
             return EMPTY_COLUMNS;
@@ -270,11 +267,11 @@ public class CassandraServer implements Cassandra.Iface
         if (cf.metadata().isSuper())
         {
             boolean isCounterCF = cf.metadata().getDefaultValidator().isCommutative();
-            return thriftifySuperColumns(cf.getSortedColumns(), reverseOrder, subcolumnsOnly, isCounterCF);
+            return thriftifySuperColumns(cf.getSortedColumns(), reverseOrder, now, subcolumnsOnly, isCounterCF);
         }
         else
         {
-            return thriftifyColumns(cf.getSortedColumns(), reverseOrder);
+            return thriftifyColumns(cf.getSortedColumns(), reverseOrder, now);
         }
     }
 
@@ -299,7 +296,7 @@ public class CassandraServer implements Cassandra.Iface
             ClientState cState = state();
             String keyspace = cState.getKeyspace();
             state().hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
-            return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
+            return getSliceInternal(keyspace, key, column_parent, System.currentTimeMillis(), predicate, consistency_level);
         }
         catch (RequestValidationException e)
         {
@@ -311,6 +308,17 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
+    private List<ColumnOrSuperColumn> getSliceInternal(String keyspace,
+                                                       ByteBuffer key,
+                                                       ColumnParent column_parent,
+                                                       long timestamp,
+                                                       SlicePredicate predicate,
+                                                       ConsistencyLevel consistency_level)
+    throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+    {
+        return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, timestamp, predicate, consistency_level).get(key);
+    }
+
     public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
@@ -335,7 +343,7 @@ public class CassandraServer implements Cassandra.Iface
             ClientState cState = state();
             String keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
-            return multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
+            return multigetSliceInternal(keyspace, keys, column_parent, System.currentTimeMillis(), predicate, consistency_level);
         }
         catch (RequestValidationException e)
         {
@@ -347,7 +355,12 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
+    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
+                                                                             List<ByteBuffer> keys,
+                                                                             ColumnParent column_parent,
+                                                                             long timestamp,
+                                                                             SlicePredicate predicate,
+                                                                             ConsistencyLevel consistency_level)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
@@ -388,56 +401,12 @@ public class CassandraServer implements Cassandra.Iface
             ThriftValidation.validateKey(metadata, key);
             // Note that we should not share a slice filter amongst the command, due to SliceQueryFilter not  being immutable
             // due to its columnCounter used by the lastCounted() method (also see SelectStatement.getSliceCommands)
-            commands.add(ReadCommand.create(keyspace, key, column_parent.getColumn_family(), filter.cloneShallow()));
+            commands.add(ReadCommand.create(keyspace, key, column_parent.getColumn_family(), timestamp, filter.cloneShallow()));
         }
 
         return getSlice(commands, column_parent.isSetSuper_column(), consistencyLevel);
     }
 
-    private ColumnOrSuperColumn internal_get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
-    throws RequestValidationException, NotFoundException, UnavailableException, TimedOutException
-    {
-        ThriftClientState cState = state();
-        String keyspace = cState.getKeyspace();
-        cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.SELECT);
-
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
-        ThriftValidation.validateColumnPath(metadata, column_path);
-        org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
-        consistencyLevel.validateForRead(keyspace);
-
-        ThriftValidation.validateKey(metadata, key);
-
-        IDiskAtomFilter filter;
-        if (metadata.isSuper())
-        {
-            CompositeType type = (CompositeType)metadata.comparator;
-            SortedSet names = new TreeSet<ByteBuffer>(column_path.column == null ? type.types.get(0) : type.types.get(1));
-            names.add(column_path.column == null ? column_path.super_column : column_path.column);
-            filter = SuperColumns.fromSCNamesFilter(type, column_path.column == null ? null : column_path.bufferForSuper_column(), new NamesQueryFilter(names));
-        }
-        else
-        {
-            SortedSet<ByteBuffer> names = new TreeSet<ByteBuffer>(metadata.comparator);
-            names.add(column_path.column);
-            filter = new NamesQueryFilter(names);
-        }
-
-        ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, filter);
-
-        Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel);
-
-        ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
-
-        if (cf == null)
-            throw new NotFoundException();
-        List<ColumnOrSuperColumn> tcolumns = thriftifyColumnFamily(cf, metadata.isSuper() && column_path.column != null, false);
-        if (tcolumns.isEmpty())
-            throw new NotFoundException();
-        assert tcolumns.size() == 1;
-        return tcolumns.get(0);
-    }
-
     public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
     throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
     {
@@ -455,7 +424,46 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            return internal_get(key, column_path, consistency_level);
+            ThriftClientState cState = state();
+            String keyspace = cState.getKeyspace();
+            cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.SELECT);
+
+            CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
+            ThriftValidation.validateColumnPath(metadata, column_path);
+            org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+            consistencyLevel.validateForRead(keyspace);
+
+            ThriftValidation.validateKey(metadata, key);
+
+            IDiskAtomFilter filter;
+            if (metadata.isSuper())
+            {
+                CompositeType type = (CompositeType)metadata.comparator;
+                SortedSet names = new TreeSet<ByteBuffer>(column_path.column == null ? type.types.get(0) : type.types.get(1));
+                names.add(column_path.column == null ? column_path.super_column : column_path.column);
+                filter = SuperColumns.fromSCNamesFilter(type, column_path.column == null ? null : column_path.bufferForSuper_column(), new NamesQueryFilter(names));
+            }
+            else
+            {
+                SortedSet<ByteBuffer> names = new TreeSet<ByteBuffer>(metadata.comparator);
+                names.add(column_path.column);
+                filter = new NamesQueryFilter(names);
+            }
+
+            long now = System.currentTimeMillis();
+            ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, now, filter);
+
+            Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel);
+
+            ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
+
+            if (cf == null)
+                throw new NotFoundException();
+            List<ColumnOrSuperColumn> tcolumns = thriftifyColumnFamily(cf, metadata.isSuper() && column_path.column != null, false, now);
+            if (tcolumns.isEmpty())
+                throw new NotFoundException();
+            assert tcolumns.size() == 1;
+            return tcolumns.get(0);
         }
         catch (RequestValidationException e)
         {
@@ -490,9 +498,10 @@ public class CassandraServer implements Cassandra.Iface
             cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
             Table table = Table.open(keyspace);
             ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
+            long timestamp = System.currentTimeMillis();
 
             if (predicate.column_names != null)
-                return get_slice(key, column_parent, predicate, consistency_level).size();
+                return getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level).size();
 
             int pageSize;
             // request by page if this is a large row
@@ -525,7 +534,7 @@ public class CassandraServer implements Cassandra.Iface
             while (true)
             {
                 predicate.slice_range.count = Math.min(pageSize, Math.max(2, remaining)); // fetch at least two columns
-                columns = get_slice(key, column_parent, predicate, consistency_level);
+                columns = getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level);
                 if (columns.isEmpty())
                     break;
 
@@ -594,7 +603,12 @@ public class CassandraServer implements Cassandra.Iface
             cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
 
             Map<ByteBuffer, Integer> counts = new HashMap<ByteBuffer, Integer>();
-            Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
+            Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace,
+                                                                                                 keys,
+                                                                                                 column_parent,
+                                                                                                 System.currentTimeMillis(),
+                                                                                                 predicate,
+                                                                                                 consistency_level);
 
             for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet())
                 counts.put(cf.getKey(), cf.getValue().size());
@@ -1068,14 +1082,11 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            String keyspace = null;
-            CFMetaData metadata = null;
-
             ThriftClientState cState = state();
-            keyspace = cState.getKeyspace();
+            String keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
 
-            metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
+            CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
             ThriftValidation.validateColumnParent(metadata, column_parent);
             ThriftValidation.validatePredicate(metadata, column_parent, predicate);
             ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
@@ -1101,12 +1112,19 @@ public class CassandraServer implements Cassandra.Iface
                                 : RowPosition.forKey(range.end_key, p);
                 bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
             }
+            long now = System.currentTimeMillis();
             schedule(DatabaseDescriptor.getRangeRpcTimeout());
             try
             {
                 IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, column_parent.super_column);
-                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent.column_family, filter, bounds,
-                                                                        range.row_filter, range.count), consistencyLevel);
+                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
+                                                                        column_parent.column_family,
+                                                                        now,
+                                                                        filter,
+                                                                        bounds,
+                                                                        range.row_filter,
+                                                                        range.count),
+                                                  consistencyLevel);
             }
             finally
             {
@@ -1114,7 +1132,7 @@ public class CassandraServer implements Cassandra.Iface
             }
             assert rows != null;
 
-            return thriftifyKeySlices(rows, column_parent, predicate);
+            return thriftifyKeySlices(rows, column_parent, predicate, now);
         }
         catch (RequestValidationException e)
         {
@@ -1185,12 +1203,21 @@ public class CassandraServer implements Cassandra.Iface
             }
 
             List<Row> rows;
+            long now = System.currentTimeMillis();
             schedule(DatabaseDescriptor.getRangeRpcTimeout());
             try
             {
                 IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, null);
-                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, filter,
-                                                                        bounds, range.row_filter, range.count, true, true), consistencyLevel);
+                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
+                                                                        column_family,
+                                                                        now,
+                                                                        filter,
+                                                                        bounds,
+                                                                        range.row_filter,
+                                                                        range.count,
+                                                                        true,
+                                                                        true),
+                                                  consistencyLevel);
             }
             finally
             {
@@ -1198,7 +1225,7 @@ public class CassandraServer implements Cassandra.Iface
             }
             assert rows != null;
 
-            return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate);
+            return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate, now);
         }
         catch (RequestValidationException e)
         {
@@ -1219,13 +1246,13 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate)
+    private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate, long now)
     {
         List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
         boolean reversed = predicate.slice_range != null && predicate.slice_range.reversed;
         for (Row row : rows)
         {
-            List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(row.cf, column_parent.super_column != null, reversed);
+            List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(row.cf, column_parent.super_column != null, reversed, now);
             keySlices.add(new KeySlice(row.key.key, thriftifiedColumns));
         }
 
@@ -1265,15 +1292,17 @@ public class CassandraServer implements Cassandra.Iface
                                                                          p.getMinimumToken().minKeyBound());
 
             IDiskAtomFilter filter = ThriftValidation.asIFilter(column_predicate, metadata, column_parent.super_column);
+            long now = System.currentTimeMillis();
             RangeSliceCommand command = new RangeSliceCommand(keyspace,
                                                               column_parent.column_family,
+                                                              now,
                                                               filter,
                                                               bounds,
                                                               index_clause.expressions,
                                                               index_clause.count);
 
             List<Row> rows = StorageProxy.getRangeSlice(command, consistencyLevel);
-            return thriftifyKeySlices(rows, column_parent, column_predicate);
+            return thriftifyKeySlices(rows, column_parent, column_predicate, now);
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/data/serialization/2.0/db.RangeSliceCommand.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RangeSliceCommand.bin b/test/data/serialization/2.0/db.RangeSliceCommand.bin
index e96746d..81a0c02 100644
Binary files a/test/data/serialization/2.0/db.RangeSliceCommand.bin and b/test/data/serialization/2.0/db.RangeSliceCommand.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin b/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin
index 0e143a6..e9c33a2 100644
Binary files a/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin and b/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/data/serialization/2.0/db.SliceFromReadCommand.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.SliceFromReadCommand.bin b/test/data/serialization/2.0/db.SliceFromReadCommand.bin
index 7b357aa..1beede3 100644
Binary files a/test/data/serialization/2.0/db.SliceFromReadCommand.bin and b/test/data/serialization/2.0/db.SliceFromReadCommand.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/long/org/apache/cassandra/db/LongTableTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongTableTest.java b/test/long/org/apache/cassandra/db/LongTableTest.java
index e209b03..70b40d2 100644
--- a/test/long/org/apache/cassandra/db/LongTableTest.java
+++ b/test/long/org/apache/cassandra/db/LongTableTest.java
@@ -56,7 +56,10 @@ public class LongTableTest extends SchemaLoader
                 {
                     for (int j = 0; j < i; j++)
                     {
-                        cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key" + i), "Standard1", ByteBufferUtil.bytes("c" + j)));
+                        cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key" + i),
+                                                                                "Standard1",
+                                                                                ByteBufferUtil.bytes("c" + j),
+                                                                                System.currentTimeMillis()));
                         TableTest.assertColumns(cf, "c" + j);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 1ba45e6..a893576 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -425,7 +425,7 @@ public class SchemaLoader
         for (int i = offset; i < offset + numberOfRows; i++)
         {
             DecoratedKey key = Util.dk("key" + i);
-            store.getColumnFamily(QueryFilter.getNamesFilter(key, columnFamily, ByteBufferUtil.bytes("col" + i)));
+            store.getColumnFamily(QueryFilter.getNamesFilter(key, columnFamily, ByteBufferUtil.bytes("col" + i), System.currentTimeMillis()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index a14fd99..053fa7d 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -150,10 +150,7 @@ public class Util
                                : new SliceQueryFilter(SuperColumns.startOf(superColumn), SuperColumns.endOf(superColumn), false, Integer.MAX_VALUE);
 
         Token min = StorageService.getPartitioner().getMinimumToken();
-        return cfs.getRangeSlice(new Bounds<Token>(min, min).toRowBounds(),
-                                 10000,
-                                 filter,
-                                 null);
+        return cfs.getRangeSlice(new Bounds<Token>(min, min).toRowBounds(), null, filter, 10000);
     }
 
     /**
@@ -180,7 +177,7 @@ public class Util
     {
         ColumnFamilyStore cfStore = table.getColumnFamilyStore(cfName);
         assert cfStore != null : "Column family " + cfName + " has not been defined";
-        return cfStore.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+        return cfStore.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
     }
 
     public static byte[] concatByteArrays(byte[] first, byte[]... remaining)
@@ -258,7 +255,7 @@ public class Util
 
     public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
     {
-        int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+        int gcBefore = cfs.gcBefore(System.currentTimeMillis());
         AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore);
         task.execute(null);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index 04fd5f8..52c4d36 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -179,7 +179,7 @@ public class DefsTest extends SchemaLoader
         assert store != null;
         store.forceBlockingFlush();
 
-        ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, cf, ByteBufferUtil.bytes("col0")));
+        ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, cf, ByteBufferUtil.bytes("col0"), System.currentTimeMillis()));
         assert cfam.getColumn(ByteBufferUtil.bytes("col0")) != null;
         Column col = cfam.getColumn(ByteBufferUtil.bytes("col0"));
         assert ByteBufferUtil.bytes("value0").equals(col.value());
@@ -252,7 +252,7 @@ public class DefsTest extends SchemaLoader
         assert store != null;
         store.forceBlockingFlush();
 
-        ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, newCf.cfName, ByteBufferUtil.bytes("col0")));
+        ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, newCf.cfName, ByteBufferUtil.bytes("col0"), System.currentTimeMillis()));
         assert cfam.getColumn(ByteBufferUtil.bytes("col0")) != null;
         Column col = cfam.getColumn(ByteBufferUtil.bytes("col0"));
         assert ByteBufferUtil.bytes("value0").equals(col.value());
@@ -360,7 +360,7 @@ public class DefsTest extends SchemaLoader
         assert store != null;
         store.forceBlockingFlush();
 
-        ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, newCf.cfName, ByteBufferUtil.bytes("col0")));
+        ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, newCf.cfName, ByteBufferUtil.bytes("col0"), System.currentTimeMillis()));
         assert cfam.getColumn(ByteBufferUtil.bytes("col0")) != null;
         Column col = cfam.getColumn(ByteBufferUtil.bytes("col0"));
         assert ByteBufferUtil.bytes("value0").equals(col.value());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 513db98..4c02a75 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -115,7 +115,7 @@ public class CleanupTest extends SchemaLoader
         IDiskAtomFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        rows = table.getColumnFamilyStore(CF1).search(clause, range, Integer.MAX_VALUE, filter);
+        rows = table.getColumnFamilyStore(CF1).search(range, clause, filter, Integer.MAX_VALUE);
         assertEquals(LOOPS, rows.size());
 
         // we don't allow cleanup when the local host has no range to avoid wipping up all data when a node has not join the ring.
@@ -137,7 +137,7 @@ public class CleanupTest extends SchemaLoader
         assert cfs.getSSTables().isEmpty();
 
         // 2ary indexes should result in no results, too (although tombstones won't be gone until compacted)
-        rows = cfs.search(clause, range, Integer.MAX_VALUE, filter);
+        rows = cfs.search(range, clause, filter, Integer.MAX_VALUE);
         assertEquals(0, rows.size());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/CollationControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CollationControllerTest.java b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
index f835ad3..ca0ec6e 100644
--- a/test/unit/org/apache/cassandra/db/CollationControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
@@ -69,7 +69,7 @@ public class CollationControllerTest extends SchemaLoader
 
         // A NamesQueryFilter goes down one code path (through collectTimeOrderedData())
         // It should only iterate the last flushed sstable, since it probably contains the most recent value for Column1
-        QueryFilter filter = QueryFilter.getNamesFilter(dk, "Standard1", ByteBufferUtil.bytes("Column1"));
+        QueryFilter filter = QueryFilter.getNamesFilter(dk, "Standard1", ByteBufferUtil.bytes("Column1"), System.currentTimeMillis());
         CollationController controller = new CollationController(store, filter, Integer.MIN_VALUE);
         controller.getTopLevelColumns();
         assertEquals(1, controller.getSstablesIterated());
@@ -77,7 +77,7 @@ public class CollationControllerTest extends SchemaLoader
         // SliceQueryFilter goes down another path (through collectAllData())
         // We will read "only" the last sstable in that case, but because the 2nd sstable has a tombstone that is more
         // recent than the maxTimestamp of the very first sstable we flushed, we should only read the 2 first sstables.
-        filter = QueryFilter.getIdentityFilter(dk, "Standard1");
+        filter = QueryFilter.getIdentityFilter(dk, "Standard1", System.currentTimeMillis());
         controller = new CollationController(store, filter, Integer.MIN_VALUE);
         controller.getTopLevelColumns();
         assertEquals(2, controller.getSstablesIterated());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index a0f64de..39802cc 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -94,7 +94,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         cfs.forceBlockingFlush();
 
         cfs.getRecentSSTablesPerReadHistogram(); // resets counts
-        cfs.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key1"), "Standard1", ByteBufferUtil.bytes("Column1")));
+        cfs.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key1"), "Standard1", ByteBufferUtil.bytes("Column1"), System.currentTimeMillis()));
         assertEquals(1, cfs.getRecentSSTablesPerReadHistogram()[0]);
     }
 
@@ -116,7 +116,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         List<SSTableReader> ssTables = table.getAllSSTables();
         assertEquals(1, ssTables.size());
         ssTables.get(0).forceFilterFailures();
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), "Standard1"));
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), "Standard1", System.currentTimeMillis()));
         assertNull(cf);
     }
 
@@ -135,12 +135,21 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         {
             public void runMayThrow() throws IOException
             {
-                QueryFilter sliceFilter = QueryFilter.getSliceFilter(Util.dk("key1"), "Standard2", ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+                QueryFilter sliceFilter = QueryFilter.getSliceFilter(Util.dk("key1"),
+                                                                     "Standard2",
+                                                                     ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                                     ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                                     false,
+                                                                     1,
+                                                                     System.currentTimeMillis());
                 ColumnFamily cf = store.getColumnFamily(sliceFilter);
                 assert cf.isMarkedForDelete();
                 assert cf.getColumnCount() == 0;
 
-                QueryFilter namesFilter = QueryFilter.getNamesFilter(Util.dk("key1"), "Standard2", ByteBufferUtil.bytes("a"));
+                QueryFilter namesFilter = QueryFilter.getNamesFilter(Util.dk("key1"),
+                                                                     "Standard2",
+                                                                     ByteBufferUtil.bytes("a"),
+                                                                     System.currentTimeMillis());
                 cf = store.getColumnFamily(namesFilter);
                 assert cf.isMarkedForDelete();
                 assert cf.getColumnCount() == 0;
@@ -157,9 +166,9 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         IPartitioner p = StorageService.getPartitioner();
         List<Row> result = cfs.getRangeSlice(Util.range(p, "key1", "key2"),
-                                             10,
+                                             null,
                                              new NamesQueryFilter(ByteBufferUtil.bytes("asdf")),
-                                             null);
+                                             10);
         assertEquals(1, result.size());
         assert result.get(0).key.key.equals(ByteBufferUtil.bytes("key2"));
     }
@@ -195,7 +204,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         IDiskAtomFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
 
         assert rows != null;
         assert rows.size() == 2 : StringUtils.join(rows, ",");
@@ -212,14 +221,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         // add a second expression
         IndexExpression expr2 = new IndexExpression(ByteBufferUtil.bytes("notbirthdate"), IndexOperator.GTE, ByteBufferUtil.bytes(2L));
         clause = Arrays.asList(expr, expr2);
-        rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+        rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
 
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
         assert "k3".equals( key );
 
         // same query again, but with resultset not including the subordinate expression
-        rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, new NamesQueryFilter(ByteBufferUtil.bytes("birthdate")));
+        rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, new NamesQueryFilter(ByteBufferUtil.bytes("birthdate")), 100);
 
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
@@ -229,7 +238,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         // once more, this time with a slice rowset that needs to be expanded
         SliceQueryFilter emptyFilter = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 0);
-        rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, emptyFilter);
+        rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, emptyFilter, 100);
 
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
@@ -241,7 +250,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         // doesn't tell the scan loop that it's done
         IndexExpression expr3 = new IndexExpression(ByteBufferUtil.bytes("notbirthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(-1L));
         clause = Arrays.asList(expr, expr3);
-        rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+        rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
 
         assert rows.isEmpty();
     }
@@ -264,7 +273,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         IDiskAtomFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
 
         assert rows != null;
         assert rows.size() == 50 : rows.size();
@@ -290,7 +299,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         IDiskAtomFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = cfs.search(clause, range, 100, filter);
+        List<Row> rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         String key = ByteBufferUtil.string(rows.get(0).key.key);
         assert "k1".equals( key );
@@ -299,7 +308,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
         rm.delete("Indexed1", ByteBufferUtil.bytes("birthdate"), 1);
         rm.apply();
-        rows = cfs.search(clause, range, 100, filter);
+        rows = cfs.search(range, clause, filter, 100);
         assert rows.isEmpty();
 
         // verify that it's not being indexed under the deletion column value either
@@ -307,14 +316,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         ByteBuffer deletionLong = ByteBufferUtil.bytes((long) ByteBufferUtil.toInt(deletion.value()));
         IndexExpression expr0 = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, deletionLong);
         List<IndexExpression> clause0 = Arrays.asList(expr0);
-        rows = cfs.search(clause0, range, 100, filter);
+        rows = cfs.search(range, clause0, filter, 100);
         assert rows.isEmpty();
 
         // resurrect w/ a newer timestamp
         rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
         rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 2);
         rm.apply();
-        rows = cfs.search(clause, range, 100, filter);
+        rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         key = ByteBufferUtil.string(rows.get(0).key.key);
         assert "k1".equals( key );
@@ -323,7 +332,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
         rm.delete("Indexed1", 1);
         rm.apply();
-        rows = cfs.search(clause, range, 100, filter);
+        rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         key = ByteBufferUtil.string(rows.get(0).key.key);
         assert "k1".equals( key );
@@ -332,7 +341,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
         rm.delete("Indexed1", ByteBufferUtil.bytes("birthdate"), 1);
         rm.apply();
-        rows = cfs.search(clause, range, 100, filter);
+        rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         key = ByteBufferUtil.string(rows.get(0).key.key);
         assert "k1".equals( key );
@@ -341,14 +350,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
         rm.delete("Indexed1", 3);
         rm.apply();
-        rows = cfs.search(clause, range, 100, filter);
+        rows = cfs.search(range, clause, filter, 100);
         assert rows.isEmpty() : StringUtils.join(rows, ",");
 
         // make sure obsolete mutations don't generate an index entry
         rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
         rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 3);
         rm.apply();
-        rows = cfs.search(clause, range, 100, filter);
+        rows = cfs.search(range, clause, filter, 100);
         assert rows.isEmpty() : StringUtils.join(rows, ",");
 
         // try insert followed by row delete in the same mutation
@@ -356,7 +365,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 1);
         rm.delete("Indexed1", 2);
         rm.apply();
-        rows = cfs.search(clause, range, 100, filter);
+        rows = cfs.search(range, clause, filter, 100);
         assert rows.isEmpty() : StringUtils.join(rows, ",");
 
         // try row delete followed by insert in the same mutation
@@ -364,7 +373,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.delete("Indexed1", 3);
         rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 4);
         rm.apply();
-        rows = cfs.search(clause, range, 100, filter);
+        rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         key = ByteBufferUtil.string(rows.get(0).key.key);
         assert "k1".equals( key );
@@ -389,12 +398,12 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         IDiskAtomFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = table.getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+        List<Row> rows = table.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
         assert rows.size() == 0;
 
         expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(2L));
         clause = Arrays.asList(expr);
-        rows = table.getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
         String key = ByteBufferUtil.string(rows.get(0).key.key);
         assert "k1".equals( key );
 
@@ -403,7 +412,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(3L), 0);
         rm.apply();
 
-        rows = table.getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
         key = ByteBufferUtil.string(rows.get(0).key.key);
         assert "k1".equals( key );
 
@@ -433,7 +442,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         List<IndexExpression> clause = Arrays.asList(expr);
         IDiskAtomFilter filter = new IdentityQueryFilter();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        List<Row> rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(1, rows.size());
 
         // force a flush, so our index isn't being read from a memtable
@@ -448,14 +457,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         // because the new value was not indexed and the old value should be ignored
         // (and in fact purged from the index cf).
         // first check for the old value
-        rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(0, rows.size());
         // now check for the updated value
         expr = new IndexExpression(colName, IndexOperator.EQ, val2);
         clause = Arrays.asList(expr);
         filter = new IdentityQueryFilter();
         range = Util.range("", "");
-        rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(0, rows.size());
 
         // now, reset back to the original value, still skipping the index update, to
@@ -468,7 +477,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         clause = Arrays.asList(expr);
         filter = new IdentityQueryFilter();
         range = Util.range("", "");
-        rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(0, rows.size());
     }
 
@@ -505,12 +514,12 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         List<IndexExpression> clause = Arrays.asList(expr);
         IDiskAtomFilter filter = new IdentityQueryFilter();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        List<Row> rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(1, rows.size());
 
         // force a flush and retry the query, so our index isn't being read from a memtable
         table.getColumnFamilyStore(cfName).forceBlockingFlush();
-        rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(1, rows.size());
 
         // now apply another update, but force the index update to be skipped
@@ -522,14 +531,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         // because the new value was not indexed and the old value should be ignored
         // (and in fact purged from the index cf).
         // first check for the old value
-        rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(0, rows.size());
         // now check for the updated value
         expr = new IndexExpression(colName, IndexOperator.EQ, val2);
         clause = Arrays.asList(expr);
         filter = new IdentityQueryFilter();
         range = Util.range("", "");
-        rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(0, rows.size());
 
         // now, reset back to the original value, still skipping the index update, to
@@ -542,7 +551,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         clause = Arrays.asList(expr);
         filter = new IdentityQueryFilter();
         range = Util.range("", "");
-        rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+        rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
         assertEquals(0, rows.size());
     }
 
@@ -579,7 +588,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         IDiskAtomFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range<RowPosition> range = Util.range("", "");
-        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 1, filter);
+        List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 1);
 
         assert rows != null;
         assert rows.size() == 1 : StringUtils.join(rows, ",");
@@ -623,7 +632,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         List<IndexExpression> clause = Arrays.asList(expr);
         IDiskAtomFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
-        List<Row> rows = table.getColumnFamilyStore("Indexed2").search(clause, Util.range("", ""), 100, filter);
+        List<Row> rows = table.getColumnFamilyStore("Indexed2").search(Util.range("", ""), clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
         assertEquals("k1", ByteBufferUtil.string(rows.get(0).key.key));
     }
@@ -635,9 +644,9 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         IPartitioner p = StorageService.getPartitioner();
         List<Row> result = cfs.getRangeSlice(Util.bounds("key1", "key2"),
-                                             10,
+                                             null,
                                              new NamesQueryFilter(ByteBufferUtil.bytes("asdf")),
-                                             null);
+                                             10);
         assertEquals(2, result.size());
         assert result.get(0).key.key.equals(ByteBufferUtil.bytes("key1"));
     }
@@ -672,7 +681,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
         sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
 
-        assertRowAndColCount(1, 6, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+        assertRowAndColCount(1, 6, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
 
         // delete
         RowMutation rm = new RowMutation(table.getName(), key.key);
@@ -680,13 +689,13 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.apply();
 
         // verify delete.
-        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
 
         // flush
         cfs.forceBlockingFlush();
 
         // re-verify delete.
-        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
 
         // late insert.
         putColsSuper(cfs, key, scfName,
@@ -694,14 +703,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                 new Column(getBytes(7L), ByteBufferUtil.bytes("val7"), 1L));
 
         // re-verify delete.
-        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+        assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
 
         // make sure new writes are recognized.
         putColsSuper(cfs, key, scfName,
                 new Column(getBytes(3L), ByteBufferUtil.bytes("val3"), 3),
                 new Column(getBytes(8L), ByteBufferUtil.bytes("val8"), 3),
                 new Column(getBytes(9L), ByteBufferUtil.bytes("val9"), 3));
-        assertRowAndColCount(1, 3, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+        assertRowAndColCount(1, 3, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
     }
 
     private static void assertRowAndColCount(int rowCount, int colCount, ByteBuffer sc, boolean isDeleted, Collection<Row> rows) throws CharacterCodingException
@@ -760,14 +769,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         // insert
         putColsStandard(cfs, key, column("col1", "val1", 1), column("col2", "val2", 1));
-        assertRowAndColCount(1, 2, null, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+        assertRowAndColCount(1, 2, null, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
 
         // flush.
         cfs.forceBlockingFlush();
 
         // insert, don't flush
         putColsStandard(cfs, key, column("col3", "val3", 1), column("col4", "val4", 1));
-        assertRowAndColCount(1, 4, null, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+        assertRowAndColCount(1, 4, null, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
 
         // delete (from sstable and memtable)
         RowMutation rm = new RowMutation(table.getName(), key.key);
@@ -775,27 +784,27 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.apply();
 
         // verify delete
-        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
 
         // flush
         cfs.forceBlockingFlush();
 
         // re-verify delete. // first breakage is right here because of CASSANDRA-1837.
-        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
 
         // simulate a 'late' insertion that gets put in after the deletion. should get inserted, but fail on read.
         putColsStandard(cfs, key, column("col5", "val5", 1), column("col2", "val2", 1));
 
         // should still be nothing there because we deleted this row. 2nd breakage, but was undetected because of 1837.
-        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+        assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
 
         // make sure that new writes are recognized.
         putColsStandard(cfs, key, column("col6", "val6", 3), column("col7", "val7", 3));
-        assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+        assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
 
         // and it remains so after flush. (this wasn't failing before, but it's good to check.)
         cfs.forceBlockingFlush();
-        assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+        assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
     }
 
 
@@ -844,7 +853,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                                              new Column(ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("B"), 1));
 
         // Get the entire supercolumn like normal
-        ColumnFamily cfGet = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+        ColumnFamily cfGet = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assertEquals(ByteBufferUtil.bytes("A"), cfGet.getColumn(CompositeType.build(superColName, ByteBufferUtil.bytes("a"))).value());
         assertEquals(ByteBufferUtil.bytes("B"), cfGet.getColumn(CompositeType.build(superColName, ByteBufferUtil.bytes("b"))).value());
 
@@ -852,7 +861,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         SortedSet<ByteBuffer> sliceColNames = new TreeSet<ByteBuffer>(cfs.metadata.comparator);
         sliceColNames.add(CompositeType.build(superColName, ByteBufferUtil.bytes("a")));
         sliceColNames.add(CompositeType.build(superColName, ByteBufferUtil.bytes("b")));
-        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(tableName, key.key, cfName, new NamesQueryFilter(sliceColNames));
+        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(tableName, key.key, cfName, System.currentTimeMillis(), new NamesQueryFilter(sliceColNames));
         ColumnFamily cfSliced = cmd.getRow(table).cf;
 
         // Make sure the slice returns the same as the straight get
@@ -888,7 +897,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         putColsStandard(cfs, key, new Column(cname, ByteBufferUtil.bytes("b"), 1));
 
         // Test fetching the column by name returns the first column
-        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(tableName, key.key, cfName, new NamesQueryFilter(cname));
+        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(tableName, key.key, cfName, System.currentTimeMillis(), new NamesQueryFilter(cname));
         ColumnFamily cf = cmd.getRow(table).cf;
         Column column = (Column) cf.getColumn(cname);
         assert column.value().equals(ByteBufferUtil.bytes("a")) : "expecting a, got " + ByteBufferUtil.string(column.value());
@@ -899,7 +908,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         int columns = 0;
         for (Row row : rows)
         {
-            columns += row.getLiveCount(new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, expectedCount));
+            columns += row.getLiveCount(new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, expectedCount), System.currentTimeMillis());
         }
         assert columns == expectedCount : "Expected " + expectedCount + " live columns but got " + columns + ": " + rows;
     }
@@ -929,11 +938,46 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
         sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
 
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 3);
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 5, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 5);
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 8, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 8);
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 10, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 10);
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 11);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              3,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            3);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              5,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            5);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              8,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            8);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              10,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            10);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              100,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            11);
 
         // Check that when querying by name, we always include all names for a
         // gien row even if it means returning more columns than requested (this is necesseray for CQL)
@@ -944,11 +988,46 @@ public class ColumnFamilyStoreTest extends SchemaLoader
             ByteBufferUtil.bytes("c2")
         ));
 
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 1, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 3);
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 4, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 5);
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 5, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 5);
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 6, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 8);
-        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 8);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              1,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            3);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              4,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            5);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              5,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            5);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              6,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            8);
+        assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+                                              null,
+                                              ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                              100,
+                                              System.currentTimeMillis(),
+                                              true,
+                                              false),
+                            8);
     }
 
     @Test
@@ -975,13 +1054,25 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
         sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
 
-        Collection<Row> rows = cfs.getRangeSlice(Util.range("", ""), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, true);
+        Collection<Row> rows = cfs.getRangeSlice(Util.range("", ""),
+                                                 null,
+                                                 ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                                 3,
+                                                 System.currentTimeMillis(),
+                                                 true,
+                                                 true);
         assert rows.size() == 1 : "Expected 1 row, got " + rows;
         Row row = rows.iterator().next();
         assertColumnNames(row, "c0", "c1", "c2");
 
         sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
-        rows = cfs.getRangeSlice(Util.range("", ""), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, true);
+        rows = cfs.getRangeSlice(Util.range("", ""),
+                                 null,
+                                 ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                 3,
+                                 System.currentTimeMillis(),
+                                 true,
+                                 true);
         assert rows.size() == 2 : "Expected 2 rows, got " + rows;
         Iterator<Row> iter = rows.iterator();
         Row row1 = iter.next();
@@ -990,13 +1081,25 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         assertColumnNames(row2, "c0");
 
         sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c0")));
-        rows = cfs.getRangeSlice(new Bounds<RowPosition>(row2.key, Util.rp("")), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, true);
+        rows = cfs.getRangeSlice(new Bounds<RowPosition>(row2.key, Util.rp("")),
+                                 null,
+                                 ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                 3,
+                                 System.currentTimeMillis(),
+                                 true,
+                                 true);
         assert rows.size() == 1 : "Expected 1 row, got " + rows;
         row = rows.iterator().next();
         assertColumnNames(row, "c0", "c1", "c2");
 
         sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
-        rows = cfs.getRangeSlice(new Bounds<RowPosition>(row.key, Util.rp("")), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, true);
+        rows = cfs.getRangeSlice(new Bounds<RowPosition>(row.key, Util.rp("")),
+                                 null,
+                                 ThriftValidation.asIFilter(sp, cfs.metadata, null),
+                                 3,
+                                 System.currentTimeMillis(),
+                                 true,
+                                 true);
         assert rows.size() == 2 : "Expected 2 rows, got " + rows;
         iter = rows.iterator();
         row1 = iter.next();
@@ -1057,25 +1160,25 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         List<Row> rows;
 
         // Start and end inclusive
-        rows = cfs.getRangeSlice(new Bounds<RowPosition>(rp("2"), rp("7")), 100, qf, null);
+        rows = cfs.getRangeSlice(new Bounds<RowPosition>(rp("2"), rp("7")), null, qf, 100);
         assert rows.size() == 6;
         assert rows.get(0).key.equals(idk(2));
         assert rows.get(rows.size() - 1).key.equals(idk(7));
 
         // Start and end excluded
-        rows = cfs.getRangeSlice(new ExcludingBounds<RowPosition>(rp("2"), rp("7")), 100, qf, null);
+        rows = cfs.getRangeSlice(new ExcludingBounds<RowPosition>(rp("2"), rp("7")), null, qf, 100);
         assert rows.size() == 4;
         assert rows.get(0).key.equals(idk(3));
         assert rows.get(rows.size() - 1).key.equals(idk(6));
 
         // Start excluded, end included
-        rows = cfs.getRangeSlice(new Range<RowPosition>(rp("2"), rp("7")), 100, qf, null);
+        rows = cfs.getRangeSlice(new Range<RowPosition>(rp("2"), rp("7")), null, qf, 100);
         assert rows.size() == 5;
         assert rows.get(0).key.equals(idk(3));
         assert rows.get(rows.size() - 1).key.equals(idk(7));
 
         // Start included, end excluded
-        rows = cfs.getRangeSlice(new IncludingExcludingBounds<RowPosition>(rp("2"), rp("7")), 100, qf, null);
+        rows = cfs.getRangeSlice(new IncludingExcludingBounds<RowPosition>(rp("2"), rp("7")), null, qf, 100);
         assert rows.size() == 5;
         assert rows.get(0).key.equals(idk(2));
         assert rows.get(rows.size() - 1).key.equals(idk(6));
@@ -1102,7 +1205,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, LongType.instance.decompose(1L));
         // explicitly tell to the KeysSearcher to use column limiting for rowsPerQuery to trigger bogus columnsRead--; (CASSANDRA-3996)
-        List<Row> rows = store.search(Arrays.asList(expr), Util.range("", ""), 10, new IdentityQueryFilter(), true);
+        List<Row> rows = store.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), 10, System.currentTimeMillis(), true);
 
         assert rows.size() == 10;
     }
@@ -1485,8 +1588,12 @@ public class ColumnFamilyStoreTest extends SchemaLoader
             String... colNames)
     {
         List<Row> rows = cfs.getRangeSlice(new Bounds<RowPosition>(rp(rowKey), rp(rowKey)),
-                Integer.MAX_VALUE,
-                filter, null, false, false);
+                                           null,
+                                           filter,
+                                           Integer.MAX_VALUE,
+                                           System.currentTimeMillis(),
+                                           false,
+                                           false);
         assertSame("unexpected number of rows ", 1, rows.size());
         Row row = rows.get(0);
         Collection<Column> cols = !filter.isReversed() ? row.cf.getSortedColumns() : row.cf.getReverseSortedColumns();
@@ -1513,7 +1620,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
     {
         DecoratedKey ROW = Util.dk(rowKey);
         System.err.println("Original:");
-        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(ROW, "Standard1"));
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(ROW, "Standard1", System.currentTimeMillis()));
         System.err.println("Row key: " + rowKey + " Cols: "
                 + Iterables.transform(cf.getSortedColumns(), new Function<Column, String>()
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
index 95caac6..a01c25c 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
@@ -142,9 +142,9 @@ public class ColumnFamilyTest extends SchemaLoader
 
         // check that tombstone wins timestamp ties
         cf_result.addTombstone(ByteBufferUtil.bytes("col1"), 0, 3);
-        assert cf_result.getColumn(ByteBufferUtil.bytes("col1")).isMarkedForDelete();
+        assert cf_result.getColumn(ByteBufferUtil.bytes("col1")).isMarkedForDelete(System.currentTimeMillis());
         cf_result.addColumn(ByteBufferUtil.bytes("col1"), val2, 3);
-        assert cf_result.getColumn(ByteBufferUtil.bytes("col1")).isMarkedForDelete();
+        assert cf_result.getColumn(ByteBufferUtil.bytes("col1")).isMarkedForDelete(System.currentTimeMillis());
 
         // check that column value wins timestamp ties in absence of tombstone
         cf_result.addColumn(ByteBufferUtil.bytes("col3"), val, 2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 7f57aae..922e255 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -119,14 +119,16 @@ public class KeyCacheTest extends SchemaLoader
                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                        false,
-                                                       10));
+                                                       10,
+                                                       System.currentTimeMillis()));
 
         cfs.getColumnFamily(QueryFilter.getSliceFilter(key2,
                                                        COLUMN_FAMILY1,
                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                        false,
-                                                       10));
+                                                       10,
+                                                       System.currentTimeMillis()));
 
         assertEquals(2, CacheService.instance.keyCache.size());
 
@@ -141,14 +143,16 @@ public class KeyCacheTest extends SchemaLoader
                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                        false,
-                                                       10));
+                                                       10,
+                                                       System.currentTimeMillis()));
 
         cfs.getColumnFamily(QueryFilter.getSliceFilter(key2,
                                                        COLUMN_FAMILY1,
                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                        false,
-                                                       10));
+                                                       10,
+                                                       System.currentTimeMillis()));
 
         assert CacheService.instance.keyCache.size() == 4;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCollisionTest.java b/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
index 48838b9..a6fbecf 100644
--- a/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
@@ -71,7 +71,7 @@ public class KeyCollisionTest extends SchemaLoader
         insert("key1", "key2", "key3"); // token = 4
         insert("longKey1", "longKey2"); // token = 8
 
-        List<Row> rows = cfs.getRangeSlice(new Bounds<RowPosition>(dk("k2"), dk("key2")), 10000, new IdentityQueryFilter(), null);
+        List<Row> rows = cfs.getRangeSlice(new Bounds<RowPosition>(dk("k2"), dk("key2")), null, new IdentityQueryFilter(), 10000);
         assert rows.size() == 4 : "Expecting 4 keys, got " + rows.size();
         assert rows.get(0).key.key.equals(ByteBufferUtil.bytes("k2"));
         assert rows.get(1).key.key.equals(ByteBufferUtil.bytes("k3"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index e5f0acb..0ed0c4a 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -78,7 +78,7 @@ public class RangeTombstoneTest extends SchemaLoader
             columns.add(b(i));
         for (int i : dead)
             columns.add(b(i));
-        cf = cfs.getColumnFamily(QueryFilter.getNamesFilter(dk(key), CFNAME, columns));
+        cf = cfs.getColumnFamily(QueryFilter.getNamesFilter(dk(key), CFNAME, columns, System.currentTimeMillis()));
 
         for (int i : live)
             assert isLive(cf, cf.getColumn(b(i))) : "Column " + i + " should be live";
@@ -86,7 +86,7 @@ public class RangeTombstoneTest extends SchemaLoader
             assert !isLive(cf, cf.getColumn(b(i))) : "Column " + i + " shouldn't be live";
 
         // Queries by slices
-        cf = cfs.getColumnFamily(QueryFilter.getSliceFilter(dk(key), CFNAME, b(7), b(30), false, Integer.MAX_VALUE));
+        cf = cfs.getColumnFamily(QueryFilter.getSliceFilter(dk(key), CFNAME, b(7), b(30), false, Integer.MAX_VALUE, System.currentTimeMillis()));
 
         for (int i : new int[]{ 7, 8, 9, 11, 13, 15, 17, 28, 29, 30 })
             assert isLive(cf, cf.getColumn(b(i))) : "Column " + i + " should be live";
@@ -130,7 +130,7 @@ public class RangeTombstoneTest extends SchemaLoader
         rm.apply();
         cfs.forceBlockingFlush();
 
-        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(key), CFNAME));
+        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(key), CFNAME, System.currentTimeMillis()));
 
         for (int i = 0; i < 5; i++)
             assert isLive(cf, cf.getColumn(b(i))) : "Column " + i + " should be live";
@@ -141,7 +141,7 @@ public class RangeTombstoneTest extends SchemaLoader
 
         // Compact everything and re-test
         CompactionManager.instance.performMaximal(cfs);
-        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(key), CFNAME));
+        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(key), CFNAME, System.currentTimeMillis()));
 
         for (int i = 0; i < 5; i++)
             assert isLive(cf, cf.getColumn(b(i))) : "Column " + i + " should be live";
@@ -153,7 +153,7 @@ public class RangeTombstoneTest extends SchemaLoader
 
     private static boolean isLive(ColumnFamily cf, Column c)
     {
-        return c != null && !c.isMarkedForDelete() && !cf.deletionInfo().isDeleted(c);
+        return c != null && !c.isMarkedForDelete(System.currentTimeMillis()) && !cf.deletionInfo().isDeleted(c);
     }
 
     private static ByteBuffer b(int i)