You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/18 18:16:05 UTC

[4/4] git commit: Include a timestamp with all read commands to determine column expiration

Include a timestamp with all read commands to determine column expiration

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-5149


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f7628ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f7628ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f7628ce

Branch: refs/heads/trunk
Commit: 1f7628ce7c1b3f820717eaa44df9b182158eb49e
Parents: 62295f6
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Jun 18 19:15:02 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Jun 18 19:15:02 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/config/CFMetaData.java |   2 +-
 .../cassandra/config/ColumnDefinition.java      |   3 +-
 .../apache/cassandra/cql/QueryProcessor.java    |  17 +-
 .../cql3/statements/ColumnGroupMap.java         |   6 +-
 .../cql3/statements/ModificationStatement.java  |   4 +-
 .../cql3/statements/SelectStatement.java        |  49 ++--
 .../cassandra/cql3/statements/Selection.java    |  12 +-
 .../cassandra/db/CollationController.java       |   6 +-
 src/java/org/apache/cassandra/db/Column.java    |  25 +-
 .../org/apache/cassandra/db/ColumnFamily.java   |   9 +-
 .../cassandra/db/ColumnFamilySerializer.java    |  22 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 107 ++++----
 .../apache/cassandra/db/ColumnSerializer.java   |   2 +-
 .../org/apache/cassandra/db/CounterColumn.java  |   8 +-
 .../apache/cassandra/db/CounterMutation.java    |   7 +-
 .../cassandra/db/CounterUpdateColumn.java       |   2 +-
 src/java/org/apache/cassandra/db/DefsTable.java |   4 +-
 .../org/apache/cassandra/db/DeletedColumn.java  |   4 +-
 .../org/apache/cassandra/db/DeletionTime.java   |   4 +-
 .../org/apache/cassandra/db/ExpiringColumn.java |  15 +-
 .../cassandra/db/HintedHandOffManager.java      |  23 +-
 .../org/apache/cassandra/db/OnDiskAtom.java     |   2 +-
 .../apache/cassandra/db/RangeSliceCommand.java  |  83 +++---
 .../org/apache/cassandra/db/RangeTombstone.java |   4 +-
 .../org/apache/cassandra/db/ReadCommand.java    |  18 +-
 .../db/RetriedSliceFromReadCommand.java         |   9 +-
 src/java/org/apache/cassandra/db/Row.java       |   4 +-
 .../apache/cassandra/db/RowIteratorFactory.java |  16 +-
 .../cassandra/db/SliceByNamesReadCommand.java   |  28 +-
 .../cassandra/db/SliceFromReadCommand.java      |  30 ++-
 .../apache/cassandra/db/SliceQueryPager.java    |   7 +-
 .../org/apache/cassandra/db/SuperColumns.java   |   4 +-
 .../org/apache/cassandra/db/SystemTable.java    |  20 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../db/compaction/LazilyCompactedRow.java       |   4 +-
 .../db/compaction/PrecompactedRow.java          |   4 +-
 .../cassandra/db/filter/ColumnCounter.java      |  17 +-
 .../cassandra/db/filter/ExtendedFilter.java     |  49 +++-
 .../cassandra/db/filter/IDiskAtomFilter.java    |   4 +-
 .../cassandra/db/filter/NamesQueryFilter.java   |   8 +-
 .../apache/cassandra/db/filter/QueryFilter.java |  31 ++-
 .../cassandra/db/filter/SliceQueryFilter.java   |  20 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |   2 +-
 .../db/index/SecondaryIndexManager.java         |  20 +-
 .../db/index/SecondaryIndexSearcher.java        |  17 +-
 .../db/index/composites/CompositesIndex.java    |   2 +-
 .../CompositesIndexOnClusteringKey.java         |   4 +-
 .../CompositesIndexOnPartitionKey.java          |   4 +-
 .../composites/CompositesIndexOnRegular.java    |   4 +-
 .../db/index/composites/CompositesSearcher.java |  31 ++-
 .../cassandra/db/index/keys/KeysIndex.java      |   4 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |  27 +-
 .../io/sstable/SSTableIdentityIterator.java     |   5 +-
 .../cassandra/io/sstable/SSTableReader.java     |  10 +-
 .../cassandra/service/ActiveRepairService.java  |   2 +-
 .../apache/cassandra/service/CacheService.java  |   4 +-
 .../service/RangeSliceResponseResolver.java     |   6 +-
 .../service/RangeSliceVerbHandler.java          |  15 +-
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../cassandra/service/RowDataResolver.java      |  12 +-
 .../apache/cassandra/service/StorageProxy.java  |  35 ++-
 .../cassandra/thrift/CassandraServer.java       | 211 ++++++++-------
 .../serialization/2.0/db.RangeSliceCommand.bin  | Bin 753 -> 801 bytes
 .../2.0/db.SliceByNamesReadCommand.bin          | Bin 437 -> 485 bytes
 .../2.0/db.SliceFromReadCommand.bin             | Bin 437 -> 485 bytes
 .../org/apache/cassandra/db/LongTableTest.java  |   5 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |   2 +-
 test/unit/org/apache/cassandra/Util.java        |   9 +-
 .../org/apache/cassandra/config/DefsTest.java   |   6 +-
 .../org/apache/cassandra/db/CleanupTest.java    |   4 +-
 .../cassandra/db/CollationControllerTest.java   |   4 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     | 259 +++++++++++++------
 .../apache/cassandra/db/ColumnFamilyTest.java   |   4 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  12 +-
 .../apache/cassandra/db/KeyCollisionTest.java   |   2 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  10 +-
 .../apache/cassandra/db/ReadMessageTest.java    |  13 +-
 .../db/RecoveryManagerTruncateTest.java         |   6 +-
 .../cassandra/db/RemoveColumnFamilyTest.java    |   2 +-
 .../db/RemoveColumnFamilyWithFlush1Test.java    |   2 +-
 .../db/RemoveColumnFamilyWithFlush2Test.java    |   2 +-
 .../apache/cassandra/db/RemoveColumnTest.java   |  18 +-
 .../cassandra/db/RemoveSubColumnTest.java       |  10 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  28 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   4 +-
 .../apache/cassandra/db/SerializationsTest.java |  24 +-
 .../unit/org/apache/cassandra/db/TableTest.java |  94 ++++---
 .../org/apache/cassandra/db/TimeSortTest.java   |  13 +-
 .../db/compaction/CompactionsPurgeTest.java     |  22 +-
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |   2 +-
 .../db/index/PerRowSecondaryIndexTest.java      |   7 +-
 .../cassandra/db/marshal/CompositeTypeTest.java |   2 +-
 .../db/marshal/DynamicCompositeTypeTest.java    |   2 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 .../service/AntiEntropyServiceTestAbstract.java |   2 +-
 .../cassandra/service/RowResolverTest.java      |  14 +-
 .../streaming/StreamingTransferTest.java        |  12 +-
 .../cassandra/tools/SSTableExportTest.java      |   4 +-
 .../cassandra/tools/SSTableImportTest.java      |  12 +-
 102 files changed, 1008 insertions(+), 730 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a620d31..46b67e3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,8 @@
  * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
  * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
  * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
+ * Include a timestamp with all read commands to determine column expiration
+   (CASSANDRA-5149)
 
 1.2.6
  * Reduce SSTableLoader memory usage (CASSANDRA-5555)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b5ece0c..15713bb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1251,7 +1251,7 @@ public final class CFMetaData
 
     public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, Descriptor.Version version)
     {
-        return getOnDiskIterator(in, count, ColumnSerializer.Flag.LOCAL, (int) (System.currentTimeMillis() / 1000), version);
+        return getOnDiskIterator(in, count, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
     }
 
     public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 9d21435..470e7d7 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -306,7 +306,8 @@ public class ColumnDefinition
                                                        DefsTable.searchComposite(cfName, true),
                                                        DefsTable.searchComposite(cfName, false),
                                                        false,
-                                                       Integer.MAX_VALUE);
+                                                       Integer.MAX_VALUE,
+                                                       System.currentTimeMillis());
         return new Row(key, cf);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 57fec4a..a40bfea 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -71,7 +71,7 @@ public class QueryProcessor
 
     public static final String DEFAULT_KEY_NAME = bufferToString(CFMetaData.DEFAULT_KEY_NAME);
 
-    private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables)
+    private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now)
     throws InvalidRequestException, ReadTimeoutException, UnavailableException, IsBootstrappingException, WriteTimeoutException
     {
         List<ReadCommand> commands = new ArrayList<ReadCommand>();
@@ -87,7 +87,7 @@ public class QueryProcessor
                 ByteBuffer key = rawKey.getByteBuffer(metadata.getKeyValidator(),variables);
 
                 validateKey(key);
-                commands.add(new SliceByNamesReadCommand(metadata.ksName, key, select.getColumnFamily(), new NamesQueryFilter(columnNames)));
+                commands.add(new SliceByNamesReadCommand(metadata.ksName, key, select.getColumnFamily(), now, new NamesQueryFilter(columnNames)));
             }
         }
         // ...a range (slice) of column names
@@ -106,6 +106,7 @@ public class QueryProcessor
                 commands.add(new SliceFromReadCommand(metadata.ksName,
                                                       key,
                                                       select.getColumnFamily(),
+                                                      now,
                                                       new SliceQueryFilter(start, finish, select.isColumnsReversed(), select.getColumnsLimit())));
             }
         }
@@ -128,7 +129,7 @@ public class QueryProcessor
         return columnNames;
     }
 
-    private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables)
+    private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now)
     throws ReadTimeoutException, UnavailableException, InvalidRequestException
     {
         IPartitioner<?> p = StorageService.getPartitioner();
@@ -175,6 +176,7 @@ public class QueryProcessor
 
         List<org.apache.cassandra.db.Row> rows = StorageProxy.getRangeSlice(new RangeSliceCommand(metadata.ksName,
                                                                                                   select.getColumnFamily(),
+                                                                                                  now,
                                                                                                   columnFilter,
                                                                                                   bounds,
                                                                                                   expressions,
@@ -379,14 +381,15 @@ public class QueryProcessor
 
                 List<org.apache.cassandra.db.Row> rows;
 
+                long now = System.currentTimeMillis();
                 // By-key
                 if (!select.isKeyRange() && (select.getKeys().size() > 0))
                 {
-                    rows = getSlice(metadata, select, variables);
+                    rows = getSlice(metadata, select, variables, now);
                 }
                 else
                 {
-                    rows = multiRangeSlice(metadata, select, variables);
+                    rows = multiRangeSlice(metadata, select, variables, now);
                 }
 
                 // count resultset is a single column named "count"
@@ -429,7 +432,7 @@ public class QueryProcessor
                         {
                             for (org.apache.cassandra.db.Column c : row.cf.getSortedColumns())
                             {
-                                if (c.isMarkedForDelete())
+                                if (c.isMarkedForDelete(now))
                                     continue;
 
                                 ColumnDefinition cd = metadata.getColumnDefinitionFromColumnName(c.name());
@@ -474,7 +477,7 @@ public class QueryProcessor
                             if (cd != null)
                                 result.schema.value_types.put(name, TypeParser.getShortName(cd.getValidator()));
                             org.apache.cassandra.db.Column c = row.cf.getColumn(name);
-                            if (c == null || c.isMarkedForDelete())
+                            if (c == null || c.isMarkedForDelete(now))
                                 thriftColumns.add(new Column().setName(name));
                             else
                                 thriftColumns.add(thriftify(c));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
index 20fa3bd..8974523 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -104,20 +104,22 @@ public class ColumnGroupMap
     {
         private final CompositeType composite;
         private final int idx;
+        private final long now;
         private ByteBuffer[] previous;
 
         private final List<ColumnGroupMap> groups = new ArrayList<ColumnGroupMap>();
         private ColumnGroupMap currentGroup;
 
-        public Builder(CompositeType composite, boolean hasCollections)
+        public Builder(CompositeType composite, boolean hasCollections, long now)
         {
             this.composite = composite;
             this.idx = composite.types.size() - (hasCollections ? 2 : 1);
+            this.now = now;
         }
 
         public void add(Column c)
         {
-            if (c.isMarkedForDelete())
+            if (c.isMarkedForDelete(now))
                 return;
 
             ByteBuffer[] current = composite.split(c.name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index f6b7140..62f7fbd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -297,10 +297,12 @@ public abstract class ModificationStatement implements CQLStatement
         }
 
         List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
+        long now = System.currentTimeMillis();
         for (ByteBuffer key : partitionKeys)
             commands.add(new SliceFromReadCommand(keyspace(),
                                                   key,
                                                   columnFamily(),
+                                                  now,
                                                   new SliceQueryFilter(slices, false, Integer.MAX_VALUE)));
 
         List<Row> rows = local
@@ -313,7 +315,7 @@ public abstract class ModificationStatement implements CQLStatement
             if (row.cf == null || row.cf.getColumnCount() == 0)
                 continue;
 
-            ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true);
+            ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true, now);
             for (Column column : row.cf)
                 groupBuilder.add(column);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a96eb1d..fd47ba8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -127,17 +127,18 @@ public class SelectStatement implements CQLStatement
         cl.validateForRead(keyspace());
 
         int limit = getLimit(variables);
+        long now = System.currentTimeMillis();
         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit), cl)
-                       : StorageProxy.read(getSliceCommands(variables, limit), cl);
+                       ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit, now), cl)
+                       : StorageProxy.read(getSliceCommands(variables, limit, now), cl);
 
-        return processResults(rows, variables, limit);
+        return processResults(rows, variables, limit, now);
     }
 
-    private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit) throws RequestValidationException
+    private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
     {
         // Even for count, we need to process the result as it'll group some column together in sparse column families
-        ResultSet rset = process(rows, variables, limit);
+        ResultSet rset = process(rows, variables, limit, now);
         rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset;
         return new ResultMessage.Rows(rset);
     }
@@ -153,19 +154,20 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = Collections.<ByteBuffer>emptyList();
+        List<ByteBuffer> variables = Collections.emptyList();
         int limit = getLimit(variables);
+        long now = System.currentTimeMillis();
         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit))
-                       : readLocally(keyspace(), getSliceCommands(variables, limit));
+                       ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit, now))
+                       : readLocally(keyspace(), getSliceCommands(variables, limit, now));
 
-        return processResults(rows, variables, limit);
+        return processResults(rows, variables, limit, now);
     }
 
     public ResultSet process(List<Row> rows) throws InvalidRequestException
     {
         assert !parameters.isCount; // not yet needed
-        return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()));
+        return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()), System.currentTimeMillis());
     }
 
     public String keyspace()
@@ -178,7 +180,7 @@ public class SelectStatement implements CQLStatement
         return cfDef.cfm.cfName;
     }
 
-    private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit) throws RequestValidationException
+    private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
     {
         Collection<ByteBuffer> keys = getKeys(variables);
         List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
@@ -192,25 +194,18 @@ public class SelectStatement implements CQLStatement
             // We should not share the slice filter amongst the commands (hence the cloneShallow), due to
             // SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method
             // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly)
-            commands.add(ReadCommand.create(keyspace(), key, columnFamily(), filter.cloneShallow()));
+            commands.add(ReadCommand.create(keyspace(), key, columnFamily(), now, filter.cloneShallow()));
         }
         return commands;
     }
 
-    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit) throws RequestValidationException
+    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
     {
         IDiskAtomFilter filter = makeFilter(variables, limit);
         List<IndexExpression> expressions = getIndexExpressions(variables);
         // The LIMIT provided by the user is the number of CQL row he wants returned.
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
-        return new RangeSliceCommand(keyspace(),
-                                     columnFamily(),
-                                     filter,
-                                     getKeyBounds(variables),
-                                     expressions,
-                                     limit,
-                                     true,
-                                     false);
+        return new RangeSliceCommand(keyspace(), columnFamily(), now,  filter, getKeyBounds(variables), expressions, limit, true, false);
     }
 
     private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException
@@ -661,9 +656,9 @@ public class SelectStatement implements CQLStatement
         };
     }
 
-    private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit) throws InvalidRequestException
+    private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws InvalidRequestException
     {
-        Selection.ResultSetBuilder result = selection.resultSetBuilder();
+        Selection.ResultSetBuilder result = selection.resultSetBuilder(now);
         for (org.apache.cassandra.db.Row row : rows)
         {
             // Not columns match the query, skip
@@ -679,7 +674,7 @@ public class SelectStatement implements CQLStatement
                 // One cqlRow per column
                 for (Column c : columnsInOrder(row.cf, variables))
                 {
-                    if (c.isMarkedForDelete())
+                    if (c.isMarkedForDelete(now))
                         continue;
 
                     ByteBuffer[] components = null;
@@ -728,11 +723,11 @@ public class SelectStatement implements CQLStatement
                 // Sparse case: group column in cqlRow when composite prefix is equal
                 CompositeType composite = (CompositeType)cfDef.cfm.comparator;
 
-                ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections);
+                ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections, now);
 
                 for (Column c : row.cf)
                 {
-                    if (c.isMarkedForDelete())
+                    if (c.isMarkedForDelete(now))
                         continue;
 
                     builder.add(c);
@@ -743,7 +738,7 @@ public class SelectStatement implements CQLStatement
             }
             else
             {
-                if (row.cf.hasOnlyTombstones())
+                if (row.cf.hasOnlyTombstones(now))
                     continue;
 
                 // Static case: One cqlRow for all columns

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index d3018e5..cf2b62e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -213,9 +213,9 @@ public abstract class Selection
         return columnsList;
     }
 
-    public ResultSetBuilder resultSetBuilder()
+    public ResultSetBuilder resultSetBuilder(long now)
     {
-        return new ResultSetBuilder();
+        return new ResultSetBuilder(now);
     }
 
     private static ByteBuffer value(Column c)
@@ -240,12 +240,14 @@ public abstract class Selection
         List<ByteBuffer> current;
         final long[] timestamps;
         final int[] ttls;
+        final long now;
 
-        private ResultSetBuilder()
+        private ResultSetBuilder(long now)
         {
             this.resultSet = new ResultSet(metadata);
             this.timestamps = collectTimestamps ? new long[columnsList.size()] : null;
             this.ttls = collectTTLs ? new int[columnsList.size()] : null;
+            this.now = now;
         }
 
         public void add(ByteBuffer v)
@@ -264,14 +266,14 @@ public abstract class Selection
             {
                 int ttl = -1;
                 if (!isDead(c) && c instanceof ExpiringColumn)
-                    ttl = c.getLocalDeletionTime() - (int) (System.currentTimeMillis() / 1000);
+                    ttl = c.getLocalDeletionTime() - (int) (now / 1000);
                 ttls[current.size() - 1] = ttl;
             }
         }
 
         private boolean isDead(Column c)
         {
-            return c == null || c.isMarkedForDelete();
+            return c == null || c.isMarkedForDelete(now);
         }
 
         public void newRow() throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index f1fccef..d0d22c5 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -21,8 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
@@ -37,8 +35,6 @@ import org.apache.cassandra.utils.HeapAllocator;
 
 public class CollationController
 {
-    private static final Logger logger = LoggerFactory.getLogger(CollationController.class);
-
     private final ColumnFamilyStore cfs;
     private final QueryFilter filter;
     private final int gcBefore;
@@ -104,7 +100,7 @@ public class CollationController
             // (reduceNameFilter removes columns that are known to be irrelevant)
             NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
             TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns);
-            QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns));
+            QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
 
             /* add the SSTables on disk */
             Collections.sort(view.sstables, SSTable.maxTimestampComparator);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index b42097c..b210d22 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -143,14 +143,20 @@ public class Column implements OnDiskAtom
         return timestamp;
     }
 
-    public boolean isMarkedForDelete()
+    public boolean isMarkedForDelete(long now)
     {
-        return (int) (System.currentTimeMillis() / 1000) >= getLocalDeletionTime();
+        return false;
     }
 
+    public boolean isLive(long now)
+    {
+        return !isMarkedForDelete(now);
+    }
+
+    // Don't call unless the column is actually marked for delete.
     public long getMarkedForDeleteAt()
     {
-        throw new IllegalStateException("column is not marked for delete");
+        return Long.MAX_VALUE;
     }
 
     public int dataSize()
@@ -186,9 +192,7 @@ public class Column implements OnDiskAtom
     public Column diff(Column column)
     {
         if (timestamp() < column.timestamp())
-        {
             return column;
-        }
         return null;
     }
 
@@ -223,9 +227,9 @@ public class Column implements OnDiskAtom
     public Column reconcile(Column column, Allocator allocator)
     {
         // tombstones take precedence.  (if both are tombstones, then it doesn't matter which one we use.)
-        if (isMarkedForDelete())
+        if (isMarkedForDelete(System.currentTimeMillis()))
             return timestamp() < column.timestamp() ? column : this;
-        if (column.isMarkedForDelete())
+        if (column.isMarkedForDelete(System.currentTimeMillis()))
             return timestamp() > column.timestamp() ? this : column;
         // break ties by comparing values.
         if (timestamp() == column.timestamp())
@@ -276,7 +280,7 @@ public class Column implements OnDiskAtom
         StringBuilder sb = new StringBuilder();
         sb.append(comparator.getString(name));
         sb.append(":");
-        sb.append(isMarkedForDelete());
+        sb.append(isMarkedForDelete(System.currentTimeMillis()));
         sb.append(":");
         sb.append(value.remaining());
         sb.append("@");
@@ -284,11 +288,6 @@ public class Column implements OnDiskAtom
         return sb.toString();
     }
 
-    public boolean isLive()
-    {
-        return !isMarkedForDelete();
-    }
-
     protected void validateName(CFMetaData metadata) throws MarshalException
     {
         metadata.comparator.validate(name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index cda2d9b..36396f8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -31,7 +31,6 @@ import java.util.UUID;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.collect.ImmutableMap;
-
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
 import org.apache.cassandra.cache.IRowCacheEntry;
@@ -194,7 +193,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
      *   </code>
      *  but is potentially faster.
      */
-     public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
+    public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
 
     /**
      * Replace oldColumn if present by newColumn.
@@ -426,13 +425,11 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         return metadata.comparator;
     }
 
-    public boolean hasOnlyTombstones()
+    public boolean hasOnlyTombstones(long now)
     {
         for (Column column : this)
-        {
-            if (column.isLive())
+            if (column.isLive(now))
                 return false;
-        }
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index f5cf3d4..411b040 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -102,11 +102,10 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
             return null;
 
         ColumnFamily cf = factory.create(Schema.instance.getCFMetaData(deserializeCfId(in, version)));
-        int expireBefore = (int) (System.currentTimeMillis() / 1000);
 
         if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
         {
-            SuperColumns.deserializerSuperColumnFamily(in, cf, flag, expireBefore, version);
+            SuperColumns.deserializerSuperColumnFamily(in, cf, flag, version);
         }
         else
         {
@@ -115,9 +114,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
             ColumnSerializer columnSerializer = Column.serializer;
             int size = in.readInt();
             for (int i = 0; i < size; ++i)
-            {
-                cf.addColumn(columnSerializer.deserialize(in, flag, expireBefore));
-            }
+                cf.addColumn(columnSerializer.deserialize(in, flag));
         }
         return cf;
     }
@@ -170,21 +167,6 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
         throw new UnsupportedOperationException();
     }
 
-    public void deserializeColumnsFromSSTable(DataInput in, ColumnFamily cf, int size, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
-    {
-        Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(in, size, flag, expireBefore, version);
-        while (iter.hasNext())
-            cf.addAtom(iter.next());
-    }
-
-    public void deserializeFromSSTable(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, Descriptor.Version version) throws IOException
-    {
-        cf.delete(DeletionInfo.serializer().deserializeFromSSTable(in, version));
-        int size = in.readInt();
-        int expireBefore = (int) (System.currentTimeMillis() / 1000);
-        deserializeColumnsFromSSTable(in, cf, size, flag, expireBefore, version);
-    }
-
     public void serializeCfId(UUID cfId, DataOutput out, int version) throws IOException
     {
         UUIDSerializer.serializer.serialize(cfId, out, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4287df6..648c25a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,10 +33,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.db.compaction.*;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +48,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -73,6 +70,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import static org.apache.cassandra.config.CFMetaData.Caching;
 
@@ -1180,24 +1178,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return metric.writeLatency.recentLatencyHistogram.getBuckets(true);
     }
 
-    public ColumnFamily getColumnFamily(DecoratedKey key, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+    public ColumnFamily getColumnFamily(DecoratedKey key,
+                                        ByteBuffer start,
+                                        ByteBuffer finish,
+                                        boolean reversed,
+                                        int limit,
+                                        long timestamp)
     {
-        return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit));
-    }
-
-    /**
-     * get a list of columns starting from a given column, in a specified order.
-     * only the latest version of a column is returned.
-     * @return null if there is no data and no tombstones; otherwise a ColumnFamily
-     */
-    public ColumnFamily getColumnFamily(QueryFilter filter)
-    {
-        return getColumnFamily(filter, gcBefore());
-    }
-
-    public int gcBefore()
-    {
-        return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
+        return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
     }
 
     /**
@@ -1236,7 +1224,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         try
         {
-            ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name), Integer.MIN_VALUE);
+            ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp),
+                                                   Integer.MIN_VALUE);
             if (sentinelSuccess && data != null)
                 CacheService.instance.rowCache.replace(key, sentinel, data);
 
@@ -1249,7 +1238,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore)
+    public int gcBefore(long now)
+    {
+        return (int) (now / 1000) - metadata.getGcGraceSeconds();
+    }
+
+    /**
+     * get a list of columns starting from a given column, in a specified order.
+     * only the latest version of a column is returned.
+     * @return null if there is no data and no tombstones; otherwise a ColumnFamily
+     */
+    public ColumnFamily getColumnFamily(QueryFilter filter)
     {
         assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName();
 
@@ -1258,6 +1257,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         long start = System.nanoTime();
         try
         {
+            int gcBefore = gcBefore(filter.timestamp);
             if (isRowCacheEnabled())
             {
                 UUID cfId = Schema.instance.getId(table.getName(), name);
@@ -1274,7 +1274,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     return null;
                 }
 
-                result = filterColumnFamily(cached, filter, gcBefore);
+                result = filterColumnFamily(cached, filter);
             }
             else
             {
@@ -1301,12 +1301,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      *  tombstones that are no longer relevant.
      *  The returned column family won't be thread safe.
      */
-    ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter, int gcBefore)
+    ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
     {
         ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
         OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
-        filter.collateOnDiskAtom(cf, ci, gcBefore);
-        return removeDeletedCF(cf, gcBefore);
+        filter.collateOnDiskAtom(cf, ci, gcBefore(filter.timestamp));
+        return removeDeletedCF(cf, gcBefore(filter.timestamp));
     }
 
     /**
@@ -1440,7 +1440,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
             return files;
         }
-        finally {
+        finally
+        {
             SSTableReader.releaseReferences(view.sstables);
         }
     }
@@ -1468,14 +1469,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
       * @param range Either a Bounds, which includes start key, or a Range, which does not.
       * @param columnFilter description of the columns we're interested in for each row
      */
-    public AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+    private AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range,
+                                                       IDiskAtomFilter columnFilter,
+                                                       long timestamp)
     {
         assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
 
         final RowPosition startWith = range.left;
         final RowPosition stopAt = range.right;
 
-        QueryFilter filter = new QueryFilter(null, name, columnFilter);
+        QueryFilter filter = new QueryFilter(null, name, columnFilter, timestamp);
 
         final ViewFragment view = markReferenced(range);
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
@@ -1524,25 +1527,43 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter)
+    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
+                                   List<IndexExpression> rowFilter,
+                                   IDiskAtomFilter columnFilter,
+                                   int maxResults)
     {
-        return getRangeSlice(range, maxResults, columnFilter, rowFilter, false, false);
+        return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis(), false, false);
     }
 
-    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean countCQL3Rows, boolean isPaging)
+    public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
+                                   List<IndexExpression> rowFilter,
+                                   IDiskAtomFilter columnFilter,
+                                   int maxResults,
+                                   long now,
+                                   boolean countCQL3Rows,
+                                   boolean isPaging)
     {
-        return filter(getSequentialIterator(range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging));
+        return filter(getSequentialIterator(range, columnFilter, now),
+                      ExtendedFilter.create(this, rowFilter, columnFilter, maxResults, now, countCQL3Rows, isPaging));
     }
 
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter columnFilter,
+                            int maxResults)
     {
-        return search(clause, range, maxResults, dataFilter, false);
+        return search(range, clause, columnFilter, maxResults, System.currentTimeMillis(), false);
     }
 
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter columnFilter,
+                            int maxResults,
+                            long now,
+                            boolean countCQL3Rows)
     {
         Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator()));
-        return indexManager.search(clause, range, maxResults, dataFilter, countCQL3Rows);
+        return indexManager.search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
     }
 
     public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
@@ -1566,7 +1587,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
                     if (extraFilter != null)
                     {
-                        ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter));
+                        ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
                         if (cf != null)
                             data.addAll(cf, HeapAllocator.instance);
                     }
@@ -1751,14 +1772,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return Iterables.concat(stores);
     }
 
-    public static List<ColumnFamilyStore> allUserDefined()
-    {
-        List<ColumnFamilyStore> cfses = new ArrayList<ColumnFamilyStore>();
-        for (Table table : Sets.difference(ImmutableSet.copyOf(Table.all()), Schema.systemKeyspaceNames))
-            cfses.addAll(table.getColumnFamilyStores());
-        return cfses;
-    }
-
     public Iterable<DecoratedKey> keySamples(Range<Token> range)
     {
         Collection<SSTableReader> sstables = getSSTables();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 353dda9..fb38b5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -88,7 +88,7 @@ public class ColumnSerializer implements ISerializer<Column>
      */
     public Column deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
     {
-        return deserialize(in, flag, (int) (System.currentTimeMillis() / 1000));
+        return deserialize(in, flag, Integer.MIN_VALUE);
     }
 
     public Column deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 15da1df..207ded6 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -169,9 +169,11 @@ public class CounterColumn extends Column
     {
         assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
 
-        if (column.isMarkedForDelete()) // live + tombstone: track last tombstone
+        // live + tombstone: track last tombstone
+        if (column.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired column, so the current time is irrelevant
         {
-            if (timestamp() < column.timestamp()) // live < tombstone
+            // live < tombstone
+            if (timestamp() < column.timestamp())
             {
                 return column;
             }
@@ -230,7 +232,7 @@ public class CounterColumn extends Column
         StringBuilder sb = new StringBuilder();
         sb.append(comparator.getString(name));
         sb.append(":");
-        sb.append(isMarkedForDelete());
+        sb.append(false);
         sb.append(":");
         sb.append(contextManager.toString(value));
         sb.append("@");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 6f60a26..9ace314 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -84,11 +84,12 @@ public class CounterMutation implements IMutation
     public RowMutation makeReplicationMutation()
     {
         List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
+        long timestamp = System.currentTimeMillis();
         for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
         {
             if (!columnFamily.metadata().getReplicateOnWrite())
                 continue;
-            addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, readCommands);
+            addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, timestamp, readCommands);
         }
 
         // create a replication RowMutation
@@ -106,11 +107,11 @@ public class CounterMutation implements IMutation
         return replicationMutation;
     }
 
-    private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, List<ReadCommand> commands)
+    private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
     {
         SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(columnFamily.metadata().comparator);
         Iterables.addAll(s, columnFamily.getColumnNames());
-        commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, new NamesQueryFilter(s)));
+        commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
     }
 
     public MessageOut<CounterMutation> makeMutationMessage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 9d9530e..1ae7dd7 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -64,7 +64,7 @@ public class CounterUpdateColumn extends Column
         assert (column instanceof CounterUpdateColumn) || (column instanceof DeletedColumn) : "Wrong class type.";
 
         // tombstones take precedence
-        if (column.isMarkedForDelete())
+        if (column.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired column, so the current time is irrelevant
             return timestamp() > column.timestamp() ? this : column;
 
         // neither is tombstoned

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index df551e0..4b6c1c2 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -152,7 +152,9 @@ public class DefsTable
     private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
     {
         ColumnFamilyStore cfsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
-        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, SystemTable.SCHEMA_COLUMNFAMILIES_CF)));
+        return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
+                                                                                         SystemTable.SCHEMA_COLUMNFAMILIES_CF,
+                                                                                         System.currentTimeMillis())));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
index f9bda78..57c9bf9 100644
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ b/src/java/org/apache/cassandra/db/DeletedColumn.java
@@ -53,10 +53,8 @@ public class DeletedColumn extends Column
     }
 
     @Override
-    public boolean isMarkedForDelete()
+    public boolean isMarkedForDelete(long now)
     {
-        // We don't rely on the column implementation because it could mistakenly return false if
-        // some node are not exactly synchronized, which is problematic (see #4307)
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index d1ce0eb..5296529 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -83,9 +83,9 @@ public class DeletionTime implements Comparable<DeletionTime>
         return localDeletionTime < gcBefore;
     }
 
-    public boolean isDeleted(Column column)
+    public boolean isDeleted(Column column, long now)
     {
-        return column.isMarkedForDelete() && column.getMarkedForDeleteAt() <= markedForDeleteAt;
+        return column.isMarkedForDelete(now) && column.getMarkedForDeleteAt() <= markedForDeleteAt;
     }
 
     public long memorySize()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index 7660bad..f342310 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -158,16 +158,15 @@ public class ExpiringColumn extends Column
     }
 
     @Override
+    public boolean isMarkedForDelete(long now)
+    {
+        return (int) (now / 1000) >= getLocalDeletionTime();
+    }
+
+    @Override
     public long getMarkedForDeleteAt()
     {
-        if (isMarkedForDelete())
-        {
-            return timestamp;
-        }
-        else
-        {
-            throw new IllegalStateException("column is not marked for delete");
-        }
+        return timestamp;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index c10bed6..e89c769 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -326,15 +326,16 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         delivery:
         while (true)
         {
+            long now = System.currentTimeMillis();
             QueryFilter filter = QueryFilter.getSliceFilter(epkey,
                                                             SystemTable.HINTS_CF,
                                                             startColumn,
                                                             ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                             false,
-                                                            pageSize);
+                                                            pageSize,
+                                                            now);
 
-            ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
-                                                                     (int) (System.currentTimeMillis() / 1000));
+            ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int) (now / 1000));
 
             if (pagingFinished(hintsPage, startColumn))
                 break;
@@ -362,7 +363,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 // in which the local deletion timestamp was generated on the last column in the old page, in which
                 // case the hint will have no columns (since it's deleted) but will still be included in the resultset
                 // since (even with gcgs=0) it's still a "relevant" tombstone.
-                if (!hint.isLive())
+                if (!hint.isLive(System.currentTimeMillis()))
                     continue;
 
                 startColumn = hint.name();
@@ -479,7 +480,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         RowPosition minPos = p.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
         IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
-        List<Row> rows = hintStore.getRangeSlice(range, Integer.MAX_VALUE, filter, null);
+        List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE);
         for (Row row : rows)
         {
             UUID hostId = UUIDGen.getUUID(row.key.key);
@@ -576,18 +577,22 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         RowPosition minPos = partitioner.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos);
 
-        // Get a bunch of rows!
-        List<Row> rows;
         try
         {
-            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(Table.SYSTEM_KS, SystemTable.HINTS_CF, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
+            RangeSliceCommand cmd = new RangeSliceCommand(Table.SYSTEM_KS,
+                                                          SystemTable.HINTS_CF,
+                                                          System.currentTimeMillis(),
+                                                          predicate,
+                                                          range,
+                                                          null,
+                                                          LARGE_NUMBER);
+            return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE);
         }
         catch (Exception e)
         {
             logger.info("HintsCF getEPPendingHints timed out.");
             throw new RuntimeException(e);
         }
-        return rows;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 2fdb7ad..14a21c8 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -66,7 +66,7 @@ public interface OnDiskAtom
 
         public OnDiskAtom deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException
         {
-            return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, (int)(System.currentTimeMillis() / 1000), version);
+            return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
         }
 
         public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 4020a15..c7e71f3 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -15,24 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
@@ -46,8 +28,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -57,8 +37,6 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class RangeSliceCommand implements IReadCommand
@@ -69,6 +47,8 @@ public class RangeSliceCommand implements IReadCommand
 
     public final String column_family;
 
+    public final long timestamp;
+
     public final IDiskAtomFilter predicate;
     public final List<IndexExpression> row_filter;
 
@@ -77,20 +57,40 @@ public class RangeSliceCommand implements IReadCommand
     public final boolean countCQL3Rows;
     public final boolean isPaging;
 
-    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
+    public RangeSliceCommand(String keyspace,
+                             String column_family,
+                             long timestamp,
+                             IDiskAtomFilter predicate,
+                             AbstractBounds<RowPosition> range,
+                             int maxResults)
     {
-        this(keyspace, column_family, predicate, range, null, maxResults, false, false);
+        this(keyspace, column_family, timestamp, predicate, range, null, maxResults, false, false);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
+    public RangeSliceCommand(String keyspace,
+                             String column_family,
+                             long timestamp,
+                             IDiskAtomFilter predicate,
+                             AbstractBounds<RowPosition> range,
+                             List<IndexExpression> row_filter,
+                             int maxResults)
     {
-        this(keyspace, column_family, predicate, range, row_filter, maxResults, false, false);
+        this(keyspace, column_family, timestamp, predicate, range, row_filter, maxResults, false, false);
     }
 
-    public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+    public RangeSliceCommand(String keyspace,
+                             String column_family,
+                             long timestamp,
+                             IDiskAtomFilter predicate,
+                             AbstractBounds<RowPosition> range,
+                             List<IndexExpression> row_filter,
+                             int maxResults,
+                             boolean countCQL3Rows,
+                             boolean isPaging)
     {
         this.keyspace = keyspace;
         this.column_family = column_family;
+        this.timestamp = timestamp;
         this.predicate = predicate;
         this.range = range;
         this.row_filter = row_filter;
@@ -110,6 +110,7 @@ public class RangeSliceCommand implements IReadCommand
         return "RangeSliceCommand{" +
                "keyspace='" + keyspace + '\'' +
                ", column_family='" + column_family + '\'' +
+               ", timestamp='" + timestamp + '\'' +
                ", predicate=" + predicate +
                ", range=" + range +
                ", row_filter =" + row_filter +
@@ -131,27 +132,14 @@ public class RangeSliceCommand implements IReadCommand
 
 class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
 {
-    // For compatibility with pre-1.2 sake. We should remove at some point.
-    public static SlicePredicate asSlicePredicate(IDiskAtomFilter predicate)
-    {
-        SlicePredicate sp = new SlicePredicate();
-        if (predicate instanceof NamesQueryFilter)
-        {
-            sp.setColumn_names(new ArrayList<ByteBuffer>(((NamesQueryFilter)predicate).columns));
-        }
-        else
-        {
-            SliceQueryFilter sqf = (SliceQueryFilter)predicate;
-            sp.setSlice_range(new SliceRange(sqf.start(), sqf.finish(), sqf.reversed, sqf.count));
-        }
-        return sp;
-    }
-
     public void serialize(RangeSliceCommand sliceCommand, DataOutput out, int version) throws IOException
     {
         out.writeUTF(sliceCommand.keyspace);
         out.writeUTF(sliceCommand.column_family);
 
+        if (version >= MessagingService.VERSION_20)
+            out.writeLong(sliceCommand.timestamp);
+
         IDiskAtomFilter filter = sliceCommand.predicate;
         if (version < MessagingService.VERSION_20)
         {
@@ -199,6 +187,8 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         String keyspace = in.readUTF();
         String columnFamily = in.readUTF();
 
+        long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 
         IDiskAtomFilter predicate;
@@ -234,7 +224,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator);
         }
 
-        List<IndexExpression> rowFilter = null;
+        List<IndexExpression> rowFilter;
         int filterCount = in.readInt();
         rowFilter = new ArrayList<IndexExpression>(filterCount);
         for (int i = 0; i < filterCount; i++)
@@ -250,7 +240,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         int maxResults = in.readInt();
         boolean countCQL3Rows = in.readBoolean();
         boolean isPaging = in.readBoolean();
-        return new RangeSliceCommand(keyspace, columnFamily, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
+        return new RangeSliceCommand(keyspace, columnFamily, timestamp, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
     }
 
     public long serializedSize(RangeSliceCommand rsc, int version)
@@ -258,6 +248,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
         size += TypeSizes.NATIVE.sizeof(rsc.column_family);
 
+        if (version >= MessagingService.VERSION_20)
+            size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
+
         IDiskAtomFilter filter = rsc.predicate;
         if (version < MessagingService.VERSION_20)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 9342570..e30cd5b 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -237,11 +237,11 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
             }
         }
 
-        public boolean isDeleted(Column column)
+        public boolean isDeleted(Column column, long now)
         {
             for (RangeTombstone tombstone : ranges)
             {
-                if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column))
+                if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column, now))
                     return true;
             }
             return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 3cff8b6..61a2478 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -35,10 +35,10 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.RowDataResolver;
 
-
 public abstract class ReadCommand implements IReadCommand
 {
-    public enum Type {
+    public enum Type
+    {
         GET_BY_NAMES((byte)1),
         GET_SLICES((byte)2);
 
@@ -65,23 +65,25 @@ public abstract class ReadCommand implements IReadCommand
     public final String table;
     public final String cfName;
     public final ByteBuffer key;
+    public final long timestamp;
     private boolean isDigestQuery = false;
     protected final Type commandType;
 
-    protected ReadCommand(String table, ByteBuffer key, String cfName, Type cmdType)
+    protected ReadCommand(String table, ByteBuffer key, String cfName, long timestamp, Type cmdType)
     {
         this.table = table;
         this.key = key;
         this.cfName = cfName;
+        this.timestamp = timestamp;
         this.commandType = cmdType;
     }
 
-    public static ReadCommand create(String table, ByteBuffer key, String cfName, IDiskAtomFilter filter)
+    public static ReadCommand create(String table, ByteBuffer key, String cfName, long timestamp, IDiskAtomFilter filter)
     {
         if (filter instanceof SliceQueryFilter)
-            return new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+            return new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
         else
-            return new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+            return new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
     }
 
     public boolean isDigestQuery()
@@ -143,7 +145,7 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
-                newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+                newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
                 newCommand.setDigestQuery(command.isDigestQuery());
                 superColumn = scFilter.scName;
             }
@@ -187,7 +189,7 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
-                newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+                newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
                 newCommand.setDigestQuery(command.isDigestQuery());
                 superColumn = scFilter.scName;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index 52bad89..7ca57a8 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -19,25 +19,26 @@ package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.filter.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+
 public class RetriedSliceFromReadCommand extends SliceFromReadCommand
 {
     static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class);
     public final int originalCount;
 
-    public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter, int originalCount)
+    public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter, int originalCount)
     {
-        super(table, key, cfName, filter);
+        super(table, key, cfName, timestamp, filter);
         this.originalCount = originalCount;
     }
 
     @Override
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, filter, originalCount);
+        ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, timestamp, filter, originalCount);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index 49aa426..13e6f67 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -54,9 +54,9 @@ public class Row
                ')';
     }
 
-    public int getLiveCount(IDiskAtomFilter filter)
+    public int getLiveCount(IDiskAtomFilter filter, long now)
     {
-        return cf == null ? 0 : filter.getLiveCount(cf);
+        return cf == null ? 0 : filter.getLiveCount(cf, now);
     }
 
     public static class RowSerializer implements IVersionedSerializer<Row>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 4588146..c9f715c 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -52,11 +52,11 @@ public class RowIteratorFactory
      * @return A row iterator following all the given restrictions
      */
     public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
-                                          final Collection<SSTableReader> sstables,
-                                          final RowPosition startWith,
-                                          final RowPosition stopAt,
-                                          final QueryFilter filter,
-                                          final ColumnFamilyStore cfs)
+                                                     final Collection<SSTableReader> sstables,
+                                                     final RowPosition startWith,
+                                                     final RowPosition stopAt,
+                                                     final QueryFilter filter,
+                                                     final ColumnFamilyStore cfs)
     {
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.
         final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<CloseableIterator<OnDiskAtomIterator>>();
@@ -76,7 +76,7 @@ public class RowIteratorFactory
         // reduce rows from all sources into a single row
         return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
         {
-            private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+            private final int gcBefore = cfs.gcBefore(filter.timestamp);
             private final List<OnDiskAtomIterator> colIters = new ArrayList<OnDiskAtomIterator>();
             private DecoratedKey key;
             private ColumnFamily returnCF;
@@ -106,8 +106,8 @@ public class RowIteratorFactory
                 }
                 else
                 {
-                    QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter);
-                    returnCF = cfs.filterColumnFamily(cached, keyFilter, gcBefore);
+                    QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter, filter.timestamp);
+                    returnCF = cfs.filterColumnFamily(cached, keyFilter);
                 }
 
                 Row rv = new Row(key, returnCF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 909ba76..2942249 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -36,15 +36,15 @@ public class SliceByNamesReadCommand extends ReadCommand
 
     public final NamesQueryFilter filter;
 
-    public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, NamesQueryFilter filter)
+    public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter)
     {
-        super(table, key, cfName, Type.GET_BY_NAMES);
+        super(table, key, cfName, timestamp, Type.GET_BY_NAMES);
         this.filter = filter;
     }
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, filter);
+        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }
@@ -52,7 +52,7 @@ public class SliceByNamesReadCommand extends ReadCommand
     public Row getRow(Table table)
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(new QueryFilter(dk, cfName, filter));
+        return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
     }
 
     @Override
@@ -62,6 +62,7 @@ public class SliceByNamesReadCommand extends ReadCommand
                "table='" + table + '\'' +
                ", key=" + ByteBufferUtil.bytesToHex(key) +
                ", cfName='" + cfName + '\'' +
+               ", timestamp='" + timestamp + '\'' +
                ", filter=" + filter +
                ')';
     }
@@ -91,6 +92,9 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         else
             out.writeUTF(command.cfName);
 
+        if (version >= MessagingService.VERSION_20)
+            out.writeLong(cmd.timestamp);
+
         NamesQueryFilter.serializer.serialize(command.filter, out, version);
     }
 
@@ -113,6 +117,8 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
             cfName = in.readUTF();
         }
 
+        long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
         CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
         ReadCommand command;
         if (version < MessagingService.VERSION_20)
@@ -135,14 +141,14 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
 
             // Due to SC compat, it's possible we get back a slice filter at this point
             if (filter instanceof NamesQueryFilter)
-                command = new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+                command = new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
             else
-                command = new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+                command = new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
         }
         else
         {
             NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator);
-            command = new SliceByNamesReadCommand(table, key, cfName, filter);
+            command = new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
         }
 
         command.setDigestQuery(isDigest);
@@ -165,15 +171,15 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         size += sizes.sizeof((short)keySize) + keySize;
 
         if (version < MessagingService.VERSION_20)
-        {
             size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
-        }
         else
-        {
             size += sizes.sizeof(command.cfName);
-        }
+
+        if (version >= MessagingService.VERSION_20)
+            size += sizes.sizeof(cmd.timestamp);
 
         size += NamesQueryFilter.serializer.serializedSize(command.filter, version);
+
         return size;
     }
 }