You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/18 18:16:04 UTC

[3/4] Include a timestamp with all read commands to determine column expiration

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index be64ae1..5c42de5 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -46,15 +46,15 @@ public class SliceFromReadCommand extends ReadCommand
 
     public final SliceQueryFilter filter;
 
-    public SliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter)
+    public SliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
     {
-        super(table, key, cfName, Type.GET_SLICES);
+        super(table, key, cfName, timestamp, Type.GET_SLICES);
         this.filter = filter;
     }
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, filter);
+        ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
         readCommand.setDigestQuery(isDigestQuery());
         return readCommand;
     }
@@ -62,7 +62,7 @@ public class SliceFromReadCommand extends ReadCommand
     public Row getRow(Table table)
     {
         DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return table.getRow(new QueryFilter(dk, cfName, filter));
+        return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
     }
 
     @Override
@@ -77,7 +77,7 @@ public class SliceFromReadCommand extends ReadCommand
         if (maxLiveColumns < count)
             return null;
 
-        int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf);
+        int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp);
         if (liveCountInRow < getOriginalRequestedCount())
         {
             // We asked t (= count) live columns and got l (=liveCountInRow) ones.
@@ -86,7 +86,7 @@ public class SliceFromReadCommand extends ReadCommand
             // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
             int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
             SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
-            return new RetriedSliceFromReadCommand(table, key, cfName, newFilter, getOriginalRequestedCount());
+            return new RetriedSliceFromReadCommand(table, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
         }
 
         return null;
@@ -98,7 +98,7 @@ public class SliceFromReadCommand extends ReadCommand
         if ((row == null) || (row.cf == null))
             return;
 
-        filter.trim(row.cf, getOriginalRequestedCount());
+        filter.trim(row.cf, getOriginalRequestedCount(), timestamp);
     }
 
     public IDiskAtomFilter filter()
@@ -123,6 +123,7 @@ public class SliceFromReadCommand extends ReadCommand
                "table='" + table + '\'' +
                ", key='" + ByteBufferUtil.bytesToHex(key) + '\'' +
                ", cfName='" + cfName + '\'' +
+               ", timestamp='" + timestamp + '\'' +
                ", filter='" + filter + '\'' +
                ')';
     }
@@ -147,6 +148,9 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         else
             out.writeUTF(realRM.cfName);
 
+        if (version >= MessagingService.VERSION_20)
+            out.writeLong(realRM.timestamp);
+
         SliceQueryFilter.serializer.serialize(realRM.filter, out, version);
     }
 
@@ -169,6 +173,8 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
             cfName = in.readUTF();
         }
 
+        long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
         CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
         SliceQueryFilter filter;
         if (version < MessagingService.VERSION_20)
@@ -183,7 +189,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
             filter = SliceQueryFilter.serializer.deserialize(in, version);
         }
 
-        ReadCommand command = new SliceFromReadCommand(table, key, cfName, filter);
+        ReadCommand command = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
         command.setDigestQuery(isDigest);
         return command;
     }
@@ -204,15 +210,15 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         size += sizes.sizeof((short) keySize) + keySize;
 
         if (version < MessagingService.VERSION_20)
-        {
             size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
-        }
         else
-        {
             size += sizes.sizeof(command.cfName);
-        }
+
+        if (version >= MessagingService.VERSION_20)
+            size += sizes.sizeof(cmd.timestamp);
 
         size += SliceQueryFilter.serializer.serializedSize(command.filter, version);
+
         return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
index 7b8e4d4..c1933ad 100644
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -51,10 +51,11 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
         if (exhausted)
             return null;
 
+        long now = System.currentTimeMillis();
         SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
-        QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter);
+        QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter, now);
         ColumnFamily cf = cfs.getColumnFamily(filter);
-        if (cf == null || sliceFilter.getLiveCount(cf) < DEFAULT_PAGE_SIZE)
+        if (cf == null || sliceFilter.getLiveCount(cf, now) < DEFAULT_PAGE_SIZE)
         {
             exhausted = true;
         }
@@ -62,7 +63,7 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
         {
             Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
             Column lastColumn = iter.next();
-            while (lastColumn.isMarkedForDelete())
+            while (lastColumn.isMarkedForDelete(now))
                 lastColumn = iter.next();
 
             int i = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
index 753cd91..54f59d6 100644
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -102,13 +102,13 @@ public class SuperColumns
         return scMap;
     }
 
-    public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int expireBefore, int version) throws IOException
+    public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int version) throws IOException
     {
         // Note that there was no way to insert a range tombstone in a SCF in 1.2
         cf.delete(DeletionInfo.serializer().deserialize(in, version, cf.getComparator()));
         assert !cf.deletionInfo().rangeIterator().hasNext();
 
-        Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, expireBefore);
+        Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, Integer.MIN_VALUE);
         while (iter.hasNext())
             cf.addAtom(iter.next());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index e62b258..90bfd8d 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -535,7 +535,8 @@ public class SystemTable
         ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
         QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(table)),
                                                         INDEX_CF,
-                                                        ByteBufferUtil.bytes(indexName));
+                                                        ByteBufferUtil.bytes(indexName),
+                                                        System.currentTimeMillis());
         return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
     }
 
@@ -596,7 +597,8 @@ public class SystemTable
                                                         ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                         ByteBufferUtil.EMPTY_BYTE_BUFFER,
                                                         true,
-                                                        1);
+                                                        1,
+                                                        System.currentTimeMillis());
         ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
         if (cf != null && cf.getColumnCount() != 0)
             return CounterId.wrap(cf.iterator().next().name());
@@ -629,7 +631,7 @@ public class SystemTable
         List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
 
         Table table = Table.open(Table.SYSTEM_KS);
-        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF);
+        QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
         ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
 
         CounterId previous = null;
@@ -673,11 +675,10 @@ public class SystemTable
     {
         Token minToken = StorageService.getPartitioner().getMinimumToken();
 
-        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(),
-                                                                            minToken.maxKeyBound()),
-                                                     Integer.MAX_VALUE,
+        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
+                                                     null,
                                                      new IdentityQueryFilter(),
-                                                     null);
+                                                     Integer.MAX_VALUE);
     }
 
     public static Collection<RowMutation> serializeSchema()
@@ -729,7 +730,7 @@ public class SystemTable
         DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 
         ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_KEYSPACES_CF);
-        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF));
+        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
 
         return new Row(key, result);
     }
@@ -743,7 +744,8 @@ public class SystemTable
                                                         DefsTable.searchComposite(cfName, true),
                                                         DefsTable.searchComposite(cfName, false),
                                                         false,
-                                                        Integer.MAX_VALUE);
+                                                        Integer.MAX_VALUE,
+                                                        System.currentTimeMillis());
 
         return new Row(key, result);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 90f6dfa..6c9f50d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -628,7 +628,7 @@ public class CompactionManager implements CompactionManagerMBean
             // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
             // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
             // 'as good as in the non-snapshot' case)
-            gcBefore = (int) (cfs.getSnapshotCreationTime(validator.request.sessionid) / 1000) - cfs.metadata.getGcGraceSeconds();
+            gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(validator.request.sessionid));
         }
         else
         {
@@ -731,9 +731,7 @@ public class CompactionManager implements CompactionManagerMBean
     {
         // 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
         // add any GcGrace however since 2ndary indexes are local to a node.
-        return cfs.isIndex()
-               ? (int) (System.currentTimeMillis() / 1000)
-               : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+        return cfs.isIndex() ? (int) (System.currentTimeMillis() / 1000) : cfs.gcBefore(System.currentTimeMillis());
     }
 
     private static class ValidationCompactionIterable extends CompactionIterable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index e9cd2f1..32361b2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -209,7 +209,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 Column column = (Column) current;
                 container.addColumn(column);
                 if (indexer != SecondaryIndexManager.nullUpdater
-                    && !column.isMarkedForDelete()
+                    && !column.isMarkedForDelete(System.currentTimeMillis())
                     && !container.getColumn(column.name()).equals(column))
                 {
                     indexer.remove(column);
@@ -247,7 +247,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
                 // not the range tombstone. For that we use the columnIndexer tombstone tracker.
                 // Note that this doesn't work for super columns.
-                if (indexBuilder.tombstoneTracker().isDeleted(reduced))
+                if (indexBuilder.tombstoneTracker().isDeleted(reduced, System.currentTimeMillis()))
                     return null;
 
                 columns++;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index f69a955..85f2a23 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -133,7 +133,7 @@ public class PrecompactedRow extends AbstractCompactedRow
             {
                 container.addColumn(column);
                 if (indexer != SecondaryIndexManager.nullUpdater
-                    && !column.isMarkedForDelete()
+                    && !column.isMarkedForDelete(System.currentTimeMillis())
                     && !container.getColumn(column.name()).equals(column))
                 {
                     indexer.remove(column);
@@ -149,7 +149,7 @@ public class PrecompactedRow extends AbstractCompactedRow
         };
 
         Iterator<Column> reduced = MergeIterator.get(data, fcomp, reducer);
-        filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC);
+        filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC, System.currentTimeMillis());
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index 697ad02..a5d54ce 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -31,18 +31,24 @@ public class ColumnCounter
 {
     protected int live;
     protected int ignored;
+    protected final long timestamp;
+
+    public ColumnCounter(long timestamp)
+    {
+        this.timestamp = timestamp;
+    }
 
     public void count(Column column, ColumnFamily container)
     {
-        if (!isLive(column, container))
+        if (!isLive(column, container, timestamp))
             ignored++;
         else
             live++;
     }
 
-    protected static boolean isLive(Column column, ColumnFamily container)
+    protected static boolean isLive(Column column, ColumnFamily container, long timestamp)
     {
-        return column.isLive() && (!container.deletionInfo().isDeleted(column));
+        return column.isLive(timestamp) && (!container.deletionInfo().isDeleted(column));
     }
 
     public int live()
@@ -71,8 +77,9 @@ public class ColumnCounter
          *                column. If 0, all columns are grouped, otherwise we group
          *                those for which the {@code toGroup} first component are equals.
          */
-        public GroupByPrefix(CompositeType type, int toGroup)
+        public GroupByPrefix(long timestamp, CompositeType type, int toGroup)
         {
+            super(timestamp);
             this.type = type;
             this.toGroup = toGroup;
 
@@ -81,7 +88,7 @@ public class ColumnCounter
 
         public void count(Column column, ColumnFamily container)
         {
-            if (!isLive(column, container))
+            if (!isLive(column, container, timestamp))
             {
                 ignored++;
                 return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 064cc6e..d061ac2 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -44,34 +44,42 @@ public abstract class ExtendedFilter
     private static final Logger logger = LoggerFactory.getLogger(ExtendedFilter.class);
 
     public final ColumnFamilyStore cfs;
+    public final long timestamp;
     protected final IDiskAtomFilter originalFilter;
     private final int maxResults;
     private final boolean countCQL3Rows;
     private final boolean isPaging;
 
-    public static ExtendedFilter create(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows, boolean isPaging)
+    public static ExtendedFilter create(ColumnFamilyStore cfs,
+                                        List<IndexExpression> clause,
+                                        IDiskAtomFilter filter,
+                                        int maxResults,
+                                        long timestamp,
+                                        boolean countCQL3Rows,
+                                        boolean isPaging)
     {
         if (clause == null || clause.isEmpty())
         {
-            return new EmptyClauseFilter(cfs, filter, maxResults, countCQL3Rows, isPaging);
+            return new EmptyClauseFilter(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
         }
         else
         {
             if (isPaging)
                 throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
             return cfs.getComparator() instanceof CompositeType
-                 ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, countCQL3Rows)
-                 : new FilterWithClauses(cfs, filter, clause, maxResults, countCQL3Rows);
+                 ? new FilterWithCompositeClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows)
+                 : new FilterWithClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
         }
     }
 
-    protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+    protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, long timestamp, boolean countCQL3Rows, boolean isPaging)
     {
         assert cfs != null;
         assert filter != null;
         this.cfs = cfs;
         this.originalFilter = filter;
         this.maxResults = maxResults;
+        this.timestamp = timestamp;
         this.countCQL3Rows = countCQL3Rows;
         this.isPaging = isPaging;
         if (countCQL3Rows)
@@ -112,7 +120,7 @@ public abstract class ExtendedFilter
         if (initialFilter() instanceof SliceQueryFilter)
             return ((SliceQueryFilter)initialFilter()).lastCounted();
         else
-            return initialFilter().getLiveCount(data);
+            return initialFilter().getLiveCount(data, timestamp);
     }
 
     /** The initial filter we'll do our first slice with (either the original or a superset of it) */
@@ -167,9 +175,14 @@ public abstract class ExtendedFilter
         protected final List<IndexExpression> clause;
         protected final IDiskAtomFilter initialFilter;
 
-        public FilterWithClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
+        public FilterWithClauses(ColumnFamilyStore cfs,
+                                 List<IndexExpression> clause,
+                                 IDiskAtomFilter filter,
+                                 int maxResults,
+                                 long timestamp,
+                                 boolean countCQL3Rows)
         {
-            super(cfs, filter, maxResults, countCQL3Rows, false);
+            super(cfs, filter, maxResults, timestamp, countCQL3Rows, false);
             assert clause != null;
             this.clause = clause;
             this.initialFilter = computeInitialFilter();
@@ -271,7 +284,7 @@ public abstract class ExtendedFilter
                 return data;
             ColumnFamily pruned = data.cloneMeShallow();
             OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
-            originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore());
+            originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
             return pruned;
         }
 
@@ -335,9 +348,14 @@ public abstract class ExtendedFilter
 
     private static class FilterWithCompositeClauses extends FilterWithClauses
     {
-        public FilterWithCompositeClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
+        public FilterWithCompositeClauses(ColumnFamilyStore cfs,
+                                          List<IndexExpression> clause,
+                                          IDiskAtomFilter filter,
+                                          int maxResults,
+                                          long timestamp,
+                                          boolean countCQL3Rows)
         {
-            super(cfs, filter, clause, maxResults, countCQL3Rows);
+            super(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
         }
 
         /*
@@ -359,9 +377,14 @@ public abstract class ExtendedFilter
 
     private static class EmptyClauseFilter extends ExtendedFilter
     {
-        public EmptyClauseFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+        public EmptyClauseFilter(ColumnFamilyStore cfs,
+                                 IDiskAtomFilter filter,
+                                 int maxResults,
+                                 long timestamp,
+                                 boolean countCQL3Rows,
+                                 boolean isPaging)
         {
-            super(cfs, filter, maxResults, countCQL3Rows, isPaging);
+            super(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
         }
 
         public IDiskAtomFilter initialFilter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index e032f1d..35f71e5 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -66,14 +66,14 @@ public interface IDiskAtomFilter
      * by the filter code, which should have some limit on the number of columns
      * to avoid running out of memory on large rows.
      */
-    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore);
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now);
 
     public Comparator<Column> getColumnComparator(AbstractType<?> comparator);
 
     public boolean isReversed();
     public void updateColumnsLimit(int newLimit);
 
-    public int getLiveCount(ColumnFamily cf);
+    public int getLiveCount(ColumnFamily cf, long now);
 
     public IDiskAtomFilter cloneShallow();
     public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 570eb29..297f227 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -90,7 +90,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return new SSTableNamesIterator(sstable, file, key, columns, indexEntry);
     }
 
-    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
     {
         while (reducedColumns.hasNext())
             container.addIfRelevant(reducedColumns.next(), gcBefore);
@@ -118,15 +118,15 @@ public class NamesQueryFilter implements IDiskAtomFilter
     {
     }
 
-    public int getLiveCount(ColumnFamily cf)
+    public int getLiveCount(ColumnFamily cf, long now)
     {
         if (countCQL3Rows)
-            return cf.hasOnlyTombstones() ? 0 : 1;
+            return cf.hasOnlyTombstones(now) ? 0 : 1;
 
         int count = 0;
         for (Column column : cf)
         {
-            if (column.isLive())
+            if (column.isLive(now))
                 count++;
         }
         return count;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 126b240..ac0c632 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -33,12 +33,14 @@ public class QueryFilter
     public final DecoratedKey key;
     public final String cfName;
     public final IDiskAtomFilter filter;
+    public final long timestamp;
 
-    public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter)
+    public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter, long timestamp)
     {
         this.key = key;
         this.cfName = cfName;
         this.filter = filter;
+        this.timestamp = timestamp;
     }
 
     public OnDiskAtomIterator getMemtableColumnIterator(Memtable memtable)
@@ -79,7 +81,7 @@ public class QueryFilter
     public void collateOnDiskAtom(ColumnFamily returnCF, Iterator<? extends OnDiskAtom> toCollate, int gcBefore)
     {
         Iterator<Column> columns = gatherTombstones(returnCF, toCollate);
-        filter.collectReducedColumns(returnCF, columns, gcBefore);
+        filter.collectReducedColumns(returnCF, columns, gcBefore, timestamp);
     }
 
     public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, final int gcBefore)
@@ -107,7 +109,7 @@ public class QueryFilter
         };
         Iterator<Column> reduced = MergeIterator.get(toCollate, fcomp, reducer);
 
-        filter.collectReducedColumns(returnCF, reduced, gcBefore);
+        filter.collectReducedColumns(returnCF, reduced, gcBefore, timestamp);
     }
 
     /**
@@ -178,19 +180,26 @@ public class QueryFilter
      * @param finish column to stop slice at, inclusive; empty for "the last column"
      * @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
      * @param limit maximum number of non-deleted columns to return
+     * @param timestamp time to use for determining expiring columns' state
      */
-    public static QueryFilter getSliceFilter(DecoratedKey key, String cfName, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+    public static QueryFilter getSliceFilter(DecoratedKey key,
+                                             String cfName,
+                                             ByteBuffer start,
+                                             ByteBuffer finish,
+                                             boolean reversed,
+                                             int limit,
+                                             long timestamp)
     {
-        return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit));
+        return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit), timestamp);
     }
 
     /**
      * return a QueryFilter object that includes every column in the row.
      * This is dangerous on large rows; avoid except for test code.
      */
-    public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName)
+    public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName, long timestamp)
     {
-        return new QueryFilter(key, cfName, new IdentityQueryFilter());
+        return new QueryFilter(key, cfName, new IdentityQueryFilter(), timestamp);
     }
 
     /**
@@ -199,17 +208,17 @@ public class QueryFilter
      * @param cfName column family to query
      * @param columns the column names to restrict the results to, sorted in comparator order
      */
-    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns)
+    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns, long timestamp)
     {
-        return new QueryFilter(key, cfName, new NamesQueryFilter(columns));
+        return new QueryFilter(key, cfName, new NamesQueryFilter(columns), timestamp);
     }
 
     /**
      * convenience method for creating a name filter matching a single column
      */
-    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column)
+    public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column, long timestamp)
     {
-        return new QueryFilter(key, cfName, new NamesQueryFilter(column));
+        return new QueryFilter(key, cfName, new NamesQueryFilter(column), timestamp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index fda7ac5..b76ce04 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -120,9 +120,9 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return reversed ? comparator.columnReverseComparator : comparator.columnComparator;
     }
 
-    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
+    public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
     {
-        columnCounter = getColumnCounter(container);
+        columnCounter = getColumnCounter(container, now);
 
         while (reducedColumns.hasNext())
         {
@@ -142,28 +142,28 @@ public class SliceQueryFilter implements IDiskAtomFilter
         Tracing.trace("Read {} live and {} tombstoned cells", columnCounter.live(), columnCounter.ignored());
     }
 
-    public int getLiveCount(ColumnFamily cf)
+    public int getLiveCount(ColumnFamily cf, long now)
     {
-        ColumnCounter counter = getColumnCounter(cf);
+        ColumnCounter counter = getColumnCounter(cf, now);
         for (Column column : cf)
             counter.count(column, cf);
         return counter.live();
     }
 
-    private ColumnCounter getColumnCounter(ColumnFamily container)
+    private ColumnCounter getColumnCounter(ColumnFamily container, long now)
     {
         AbstractType<?> comparator = container.getComparator();
         if (compositesToGroup < 0)
-            return new ColumnCounter();
+            return new ColumnCounter(now);
         else if (compositesToGroup == 0)
-            return new ColumnCounter.GroupByPrefix(null, 0);
+            return new ColumnCounter.GroupByPrefix(now, null, 0);
         else
-            return new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+            return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup);
     }
 
-    public void trim(ColumnFamily cf, int trimTo)
+    public void trim(ColumnFamily cf, int trimTo, long now)
     {
-        ColumnCounter counter = getColumnCounter(cf);
+        ColumnCounter counter = getColumnCounter(cf, now);
 
         Collection<Column> columns = reversed
                                    ? cf.getReverseSortedColumns()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index f12acdc..4d0914a 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -91,7 +91,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     public void delete(ByteBuffer rowKey, Column column)
     {
-        if (column.isMarkedForDelete())
+        if (column.isMarkedForDelete(System.currentTimeMillis()))
             return;
 
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, column));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index af370d5..17ac81f 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -509,12 +509,17 @@ public class SecondaryIndexManager
      * Performs a search across a number of column indexes
      * TODO: add support for querying across index types
      *
-     * @param clause the index query clause
      * @param range the row range to restrict to
-     * @param dataFilter the column range to restrict to
+     * @param clause the index query clause
+     * @param columnFilter the column range to restrict to
      * @return found indexed rows
      */
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter columnFilter,
+                            int maxResults,
+                            long now,
+                            boolean countCQL3Rows)
     {
         List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
 
@@ -525,8 +530,7 @@ public class SecondaryIndexManager
         if (indexSearchers.size() > 1)
             throw new RuntimeException("Unable to search across multiple secondary index types");
 
-
-        return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, countCQL3Rows);
+        return indexSearchers.get(0).search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
     }
 
     public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
@@ -584,7 +588,7 @@ public class SecondaryIndexManager
 
         public void insert(Column column)
         {
-            if (column.isMarkedForDelete())
+            if (column.isMarkedForDelete(System.currentTimeMillis()))
                 return;
 
             for (SecondaryIndex index : indexFor(column.name()))
@@ -605,7 +609,7 @@ public class SecondaryIndexManager
                 {
                     // insert the new value before removing the old one, so we never have a period
                     // where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
-                    if (!column.isMarkedForDelete())
+                    if (!column.isMarkedForDelete(System.currentTimeMillis()))
                         ((PerColumnSecondaryIndex) index).insert(key.key, column);
                     ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
                 }
@@ -614,7 +618,7 @@ public class SecondaryIndexManager
 
         public void remove(Column column)
         {
-            if (column.isMarkedForDelete())
+            if (column.isMarkedForDelete(System.currentTimeMillis()))
                 return;
 
             for (SecondaryIndex index : indexFor(column.name()))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 16ac091..ddd79dd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -39,7 +39,12 @@ public abstract class SecondaryIndexSearcher
         this.baseCfs = indexManager.baseCfs;
     }
 
-    public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows);
+    public abstract List<Row> search(AbstractBounds<RowPosition> range,
+                                     List<IndexExpression> clause,
+                                     IDiskAtomFilter dataFilter,
+                                     int maxResults,
+                                     long now,
+                                     boolean countCQL3Rows);
 
     /**
      * @return true this index is able to handle given clauses.
@@ -49,16 +54,6 @@ public abstract class SecondaryIndexSearcher
         return highestSelectivityPredicate(clause) != null;
     }
 
-    protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue)
-    {
-        Column liveColumn = liveData.getColumn(indexedColumnName);
-        if (liveColumn == null || liveColumn.isMarkedForDelete())
-            return true;
-        
-        ByteBuffer liveValue = liveColumn.value();
-        return 0 != liveData.metadata().getValueValidator(indexedColumnName).compare(indexedValue, liveValue);
-    }
-
     protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
     {
         IndexExpression best = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 131b8c6..0720e83 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -93,7 +93,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry);
 
-    public abstract boolean isStale(IndexedEntry entry, ColumnFamily data);
+    public abstract boolean isStale(IndexedEntry entry, ColumnFamily data, long now);
 
     public void delete(IndexedEntry entry)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 8ad990e..b767d6c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -107,9 +107,9 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
         return true;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
-        return data == null || data.hasOnlyTombstones();
+        return data == null || data.hasOnlyTombstones(now);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index 2034c71..7a00de8 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -95,9 +95,9 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
         return true;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
-        return data == null || data.hasOnlyTombstones();
+        return data == null || data.hasOnlyTombstones(now);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index 40a2ee1..7159c23 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -91,11 +91,11 @@ public class CompositesIndexOnRegular extends CompositesIndex
             && comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
     }
 
-    public boolean isStale(IndexedEntry entry, ColumnFamily data)
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
         ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).build();
         Column liveColumn = data.getColumn(bb);
-        if (liveColumn == null || liveColumn.isMarkedForDelete())
+        if (liveColumn == null || liveColumn.isMarkedForDelete(now))
             return true;
 
         ByteBuffer liveValue = liveColumn.value();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index e16d94d..e8c0a09 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -21,18 +21,18 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class CompositesSearcher extends SecondaryIndexSearcher
 {
@@ -44,10 +44,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter dataFilter,
+                            int maxResults,
+                            long now,
+                            boolean countCQL3Rows)
     {
         assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
+        ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
         return baseCfs.filter(getIndexedIterator(range, filter), filter);
     }
 
@@ -69,7 +74,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         return isStart ? builder.build() : builder.buildAsEndOfRange();
     }
 
-    public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
     {
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
@@ -80,8 +85,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
 
         if (logger.isDebugEnabled())
-            logger.debug("Most-selective indexed predicate is {}",
-                         ((AbstractSimplePerColumnSecondaryIndex) index).expressionString(primary));
+            logger.debug("Most-selective indexed predicate is {}", index.expressionString(primary));
 
         /*
          * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
@@ -155,14 +159,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                         if (logger.isTraceEnabled())
                             logger.trace("Scanning index {} starting with {}",
-                                         ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), indexComparator.getString(startPrefix));
+                                         index.expressionString(primary), indexComparator.getString(startPrefix));
 
                         QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
                                                                              index.getIndexCfs().name,
                                                                              lastSeenPrefix,
                                                                              endPrefix,
                                                                              false,
-                                                                             rowsPerQuery);
+                                                                             rowsPerQuery,
+                                                                             filter.timestamp);
                         ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
                         if (indexRow == null || indexRow.getColumnCount() == 0)
                             return makeReturn(currentKey, data);
@@ -185,7 +190,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                     {
                         Column column = indexColumns.poll();
                         lastSeenPrefix = column.name();
-                        if (column.isMarkedForDelete())
+                        if (column.isMarkedForDelete(filter.timestamp))
                         {
                             logger.trace("skipping {}", column.name());
                             continue;
@@ -242,8 +247,8 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                                                                            false,
                                                                            Integer.MAX_VALUE,
                                                                            baseCfs.metadata.clusteringKeyColumns().size());
-                        ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter));
-                        if (index.isStale(entry, newData))
+                        ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
+                        if (index.isStale(entry, newData, filter.timestamp))
                         {
                             index.delete(entry);
                             continue;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index cd89e92..f47b1e1 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -48,10 +48,10 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
         return new KeysSearcher(baseCfs.indexManager, columns);
     }
 
-    public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data)
+    public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
     {
         Column liveColumn = data.getColumn(columnDef.name);
-        if (liveColumn == null || liveColumn.isMarkedForDelete())
+        if (liveColumn == null || liveColumn.isMarkedForDelete(now))
             return true;
 
         ByteBuffer liveValue = liveColumn.value();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 8901bf5..e919d8a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
@@ -33,8 +36,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.HeapAllocator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class KeysSearcher extends SecondaryIndexSearcher
 {
@@ -46,14 +47,19 @@ public class KeysSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+    public List<Row> search(AbstractBounds<RowPosition> range,
+                            List<IndexExpression> clause,
+                            IDiskAtomFilter dataFilter,
+                            int maxResults,
+                            long now,
+                            boolean countCQL3Rows)
     {
         assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
+        ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
         return baseCfs.filter(getIndexedIterator(range, filter), filter);
     }
 
-    public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
     {
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
@@ -106,7 +112,8 @@ public class KeysSearcher extends SecondaryIndexSearcher
                                                                              lastSeenKey,
                                                                              endKey,
                                                                              false,
-                                                                             rowsPerQuery);
+                                                                             rowsPerQuery,
+                                                                             filter.timestamp);
                         ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
                         logger.trace("fetched {}", indexRow);
                         if (indexRow == null)
@@ -139,7 +146,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                     {
                         Column column = indexColumns.next();
                         lastSeenKey = column.name();
-                        if (column.isMarkedForDelete())
+                        if (column.isMarkedForDelete(filter.timestamp))
                         {
                             logger.trace("skipping {}", column.name());
                             continue;
@@ -158,7 +165,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         }
 
                         logger.trace("Returning index hit for {}", dk);
-                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter()));
+                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter(), filter.timestamp));
                         // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
                         if (data == null)
                             data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
@@ -168,12 +175,12 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
                         if (extraFilter != null)
                         {
-                            ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter));
+                            ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));
                             if (cf != null)
                                 data.addAll(cf, HeapAllocator.instance);
                         }
 
-                        if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data))
+                        if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data, filter.timestamp))
                         {
                             // delete the index entry w/ its own timestamp
                             Column dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index d35e14d..65164ef 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -181,7 +181,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
     {
         ColumnFamily cf = columnFamily.cloneMeShallow(containerFactory, false);
         // since we already read column count, just pass that value and continue deserialization
-        columnFamily.serializer.deserializeColumnsFromSSTable(in, cf, columnCount, flag, expireBefore, dataVersion);
+        Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(in, columnCount, flag, expireBefore, dataVersion);
+        while (iter.hasNext())
+            cf.addAtom(iter.next());
+
         if (validateColumns)
         {
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8119388..f63ea6a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1076,12 +1076,12 @@ public class SSTableReader extends SSTable
         return getScanner((RateLimiter) null);
     }
 
-   public SSTableScanner getScanner(RateLimiter limiter)
-   {
-       return new SSTableScanner(this, null, limiter);
-   }
+    public SSTableScanner getScanner(RateLimiter limiter)
+    {
+        return new SSTableScanner(this, null, limiter);
+    }
 
-   /**
+    /**
     * Direct I/O SSTableScanner over a defined range of tokens.
     *
     * @param range the range of keys to cover

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b692ab0..82ecbe9 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -836,7 +836,7 @@ public class ActiveRepairService
                 if (isSequential)
                     makeSnapshots(endpoints);
 
-                int gcBefore = (int)(System.currentTimeMillis()/1000) - Table.open(tablename).getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+                int gcBefore = Table.open(tablename).getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
 
                 for (InetAddress endpoint : allEndpoints)
                     treeRequests.add(new TreeRequest(getName(), endpoint, range, gcBefore, new CFPair(tablename, cfname)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index d301507..b3cef49 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -300,7 +300,7 @@ public class CacheService implements CacheServiceMBean
                 public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
                 {
                     DecoratedKey key = cfs.partitioner.decorateKey(buffer);
-                    ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name), Integer.MIN_VALUE);
+                    ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE);
                     return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
                 }
             });
@@ -311,7 +311,7 @@ public class CacheService implements CacheServiceMBean
             for (ByteBuffer key : buffers)
             {
                 DecoratedKey dk = cfs.partitioner.decorateKey(key);
-                ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name), Integer.MIN_VALUE);
+                ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE);
                 if (data != null)
                     rowCache.put(new RowCacheKey(cfs.metadata.cfId, dk), data);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 5ffcdbe..4aab87d 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -48,13 +48,15 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
     };
 
     private final String table;
+    private final long timestamp;
     private List<InetAddress> sources;
     protected final Collection<MessageIn<RangeSliceReply>> responses = new ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>();
     public final List<AsyncOneResponse> repairResults = new ArrayList<AsyncOneResponse>();
 
-    public RangeSliceResponseResolver(String table)
+    public RangeSliceResponseResolver(String table, long timestamp)
     {
         this.table = table;
+        this.timestamp = timestamp;
     }
 
     public void setSources(List<InetAddress> endpoints)
@@ -142,7 +144,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
         protected Row getReduced()
         {
             ColumnFamily resolved = versions.size() > 1
-                                  ? RowDataResolver.resolveSuperset(versions)
+                                  ? RowDataResolver.resolveSuperset(versions, timestamp)
                                   : versions.get(0);
             if (versions.size() < sources.size())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index b6f257e..f63fcb1 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -35,9 +35,20 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
     {
         ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
         if (cfs.indexManager.hasIndexFor(command.row_filter))
-            return cfs.search(command.row_filter, command.range, command.maxResults, command.predicate, command.countCQL3Rows);
+            return cfs.search(command.range,
+                              command.row_filter,
+                              command.predicate,
+                              command.maxResults,
+                              command.timestamp,
+                              command.countCQL3Rows);
         else
-            return cfs.getRangeSlice(command.range, command.maxResults, command.predicate, command.row_filter, command.countCQL3Rows, command.isPaging);
+            return cfs.getRangeSlice(command.range,
+                                     command.row_filter,
+                                     command.predicate,
+                                     command.maxResults,
+                                     command.timestamp,
+                                     command.countCQL3Rows,
+                                     command.isPaging);
     }
 
     public void doVerb(MessageIn<RangeSliceCommand> message, int id)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index fe7f4d7..7cb5a23 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -186,7 +186,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
                 ReadRepairMetrics.repairedBackground.mark();
                 
                 ReadCommand readCommand = (ReadCommand) command;
-                final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter());
+                final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter(), readCommand.timestamp);
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
                 MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
index 8533f4f..69cd381 100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -39,11 +39,13 @@ public class RowDataResolver extends AbstractRowResolver
     private int maxLiveCount = 0;
     public List<AsyncOneResponse> repairResults = Collections.emptyList();
     private final IDiskAtomFilter filter;
+    private final long timestamp;
 
-    public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
+    public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter, long timestamp)
     {
         super(key, table);
         this.filter = qFilter;
+        this.timestamp = timestamp;
     }
 
     /*
@@ -74,12 +76,12 @@ public class RowDataResolver extends AbstractRowResolver
                 endpoints.add(message.from);
 
                 // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
-                int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
+                int liveCount = cf == null ? 0 : filter.getLiveCount(cf, timestamp);
                 if (liveCount > maxLiveCount)
                     maxLiveCount = liveCount;
             }
 
-            resolved = resolveSuperset(versions);
+            resolved = resolveSuperset(versions, timestamp);
             if (logger.isDebugEnabled())
                 logger.debug("versions merged");
 
@@ -125,7 +127,7 @@ public class RowDataResolver extends AbstractRowResolver
         return results;
     }
 
-    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
+    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions, long now)
     {
         assert Iterables.size(versions) > 0;
 
@@ -146,7 +148,7 @@ public class RowDataResolver extends AbstractRowResolver
         // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
         // this will handle removing columns and subcolumns that are supressed by a row or
         // supercolumn tombstone.
-        QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter());
+        QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter(), now);
         List<CloseableIterator<Column>> iters = new ArrayList<CloseableIterator<Column>>();
         for (ColumnFamily version : versions)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1383be7..9d2f6c1 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -212,9 +212,13 @@ public class StorageProxy implements StorageProxyMBean
 
             // read the current value and compare with expected
             Tracing.trace("Reading existing values for CAS precondition");
-            ReadCommand readCommand = expected == null
-                                    ? new SliceFromReadCommand(table, key, cfName, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1))
-                                    : new SliceByNamesReadCommand(table, key, cfName, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
+            long timestamp = System.currentTimeMillis();
+            IDiskAtomFilter filter = expected == null
+                                   ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
+                                   : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames()));
+            ReadCommand readCommand = filter instanceof SliceQueryFilter
+                                    ? new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter) filter)
+                                    : new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter) filter);
             List<Row> rows = read(Arrays.asList(readCommand), ConsistencyLevel.QUORUM);
             ColumnFamily current = rows.get(0).cf;
             if (!casApplies(expected, current))
@@ -245,16 +249,18 @@ public class StorageProxy implements StorageProxyMBean
         throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1);
     }
 
-    private static boolean hasLiveColumns(ColumnFamily cf)
+    private static boolean hasLiveColumns(ColumnFamily cf, long now)
     {
-        return cf != null && !cf.hasOnlyTombstones();
+        return cf != null && !cf.hasOnlyTombstones(now);
     }
 
     private static boolean casApplies(ColumnFamily expected, ColumnFamily current)
     {
-        if (!hasLiveColumns(expected))
-            return !hasLiveColumns(current);
-        else if (!hasLiveColumns(current))
+        long now = System.currentTimeMillis();
+
+        if (!hasLiveColumns(expected, now))
+            return !hasLiveColumns(current, now);
+        else if (!hasLiveColumns(current, now))
             return false;
 
         // current has been built from expected, so we know that it can't have columns
@@ -264,14 +270,14 @@ public class StorageProxy implements StorageProxyMBean
         for (Column e : expected)
         {
             Column c = current.getColumn(e.name());
-            if (e.isLive())
+            if (e.isLive(now))
             {
-                if (!(c != null && c.isLive() && c.value().equals(e.value())))
+                if (!(c != null && c.isLive(now) && c.value().equals(e.value())))
                     return false;
             }
             else
             {
-                if (c != null && c.isLive())
+                if (c != null && c.isLive(now))
                     return false;
             }
         }
@@ -1179,7 +1185,7 @@ public class StorageProxy implements StorageProxyMBean
                     ReadRepairMetrics.repairedBlocking.mark();
 
                     // Do a full data read to resolve the correct response (and repair node that need be)
-                    RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
+                    RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter(), exec.command.timestamp);
                     ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
 
                     if (repairCommands == null)
@@ -1397,6 +1403,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
                                                                   command.column_family,
+                                                                  command.timestamp,
                                                                   commandPredicate,
                                                                   range,
                                                                   command.row_filter,
@@ -1405,7 +1412,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                   command.isPaging);
 
                 // collect replies and resolve according to consistency level
-                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
+                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
                 ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback(resolver, consistency_level, nodeCmd, filteredEndpoints);
                 handler.assureSufficientLiveNodes();
                 resolver.setSources(filteredEndpoints);
@@ -1431,7 +1438,7 @@ public class StorageProxy implements StorageProxyMBean
                     {
                         rows.add(row);
                         if (nodeCmd.countCQL3Rows)
-                            cql3RowCount += row.getLiveCount(commandPredicate);
+                            cql3RowCount += row.getLiveCount(commandPredicate, command.timestamp);
                     }
                     FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                 }