You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/11/18 10:56:16 UTC

[06/13] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eb41380c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eb41380c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eb41380c

Branch: refs/heads/cassandra-3.X
Commit: eb41380cc27277e34edf2c74f535588fd1382a9a
Parents: 14f36fc 7d2fdfe
Author: Branimir Lambov <br...@datastax.com>
Authored: Fri Nov 18 12:35:32 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Fri Nov 18 12:36:26 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ReadCommand.java    |   5 +-
 .../db/compaction/CompactionController.java     |  50 ++++---
 .../db/compaction/CompactionIterator.java       |  22 +--
 .../db/compaction/CompactionManager.java        |   5 +-
 .../db/compaction/SSTableSplitter.java          |   5 +-
 .../cassandra/db/compaction/Upgrader.java       |   5 +-
 .../cassandra/db/compaction/Verifier.java       |   5 +-
 .../cassandra/db/partitions/PurgeFunction.java  |   6 +-
 .../db/compaction/CompactionControllerTest.java |  21 ++-
 .../db/compaction/CompactionsPurgeTest.java     | 138 ++++++++++++++++++-
 11 files changed, 213 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index efc681d,54dc4b5..8a3ac65
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
 -2.2.9
 +3.0.11
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 +
 +3.0.10
 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
 + * Fix partition count log during compaction (CASSANDRA-12184)
 + * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
 + * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
 + * Include SSTable filename in compacting large row message (CASSANDRA-12384)
 + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
 + * Fix ViewTest.testCompaction (CASSANDRA-12789)
 + * Improve avg aggregate functions (CASSANDRA-12417)
 + * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
 + * nodetool stopdaemon errors out (CASSANDRA-12646)
 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
 + * mx4j does not work in 3.0.8 (CASSANDRA-12274)
 + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
 + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
 + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
 + * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
 +Merged from 2.2:
+  * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
   * Fix Util.spinAssertEquals (CASSANDRA-12283)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 70c770d,cd86336..64da428
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -17,124 -17,39 +17,125 @@@
   */
  package org.apache.cassandra.db;
  
 -import java.io.DataInput;
  import java.io.IOException;
  import java.nio.ByteBuffer;
 +import java.util.*;
++import java.util.function.Predicate;
  
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.filter.IDiskAtomFilter;
 -import org.apache.cassandra.db.filter.NamesQueryFilter;
 -import org.apache.cassandra.db.filter.SliceQueryFilter;
 +import com.google.common.collect.Lists;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexNotAvailableException;
 +import org.apache.cassandra.io.ForwardingVersionedSerializer;
  import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.metrics.TableMetrics;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.service.IReadCommand;
 -import org.apache.cassandra.service.RowDataResolver;
 -import org.apache.cassandra.service.pager.Pageable;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.UnknownIndexException;
 +import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Pair;
  
 -public abstract class ReadCommand implements IReadCommand, Pageable
 +/**
 + * General interface for storage-engine read commands (common to both range and
 + * single partition commands).
 + * <p>
 + * This contains all the informations needed to do a local read.
 + */
 +public abstract class ReadCommand implements ReadQuery
  {
 -    public enum Type
 +    protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
 +
 +    public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
 +
 +    // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version.
 +    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
 +    public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>()
      {
 -        GET_BY_NAMES((byte)1),
 -        GET_SLICES((byte)2);
 +        protected IVersionedSerializer<ReadCommand> delegate(int version)
 +        {
 +            return version < MessagingService.VERSION_30
 +                    ? legacyReadCommandSerializer : serializer;
 +        }
 +    };
  
 -        public final byte serializedValue;
 +    // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
 +    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
 +    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadCommand>()
 +    {
 +        protected IVersionedSerializer<ReadCommand> delegate(int version)
 +        {
 +            return version < MessagingService.VERSION_30
 +                    ? legacyRangeSliceCommandSerializer : serializer;
 +        }
 +    };
  
 -        private Type(byte b)
 +    // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer' for earlier version.
 +    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
 +    public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new ForwardingVersionedSerializer<ReadCommand>()
 +    {
 +        protected IVersionedSerializer<ReadCommand> delegate(int version)
          {
 -            this.serializedValue = b;
 +            return version < MessagingService.VERSION_30
 +                    ? legacyPagedRangeCommandSerializer : serializer;
          }
 +    };
 +
 +    public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
 +    public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
 +    public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer();
 +
 +    private final Kind kind;
 +    private final CFMetaData metadata;
 +    private final int nowInSec;
 +
 +    private final ColumnFilter columnFilter;
 +    private final RowFilter rowFilter;
 +    private final DataLimits limits;
 +
 +    // SecondaryIndexManager will attempt to provide the most selective of any available indexes
 +    // during execution. Here we also store an the results of that lookup to repeating it over
 +    // the lifetime of the command.
 +    protected Optional<IndexMetadata> index = Optional.empty();
  
 -        public static Type fromSerializedValue(byte b)
 +    // Flag to indicate whether the index manager has been queried to select an index for this
 +    // command. This is necessary as the result of that lookup may be null, in which case we
 +    // still don't want to repeat it.
 +    private boolean indexManagerQueried = false;
 +
 +    private boolean isDigestQuery;
 +    // if a digest query, the version for which the digest is expected. Ignored if not a digest.
 +    private int digestVersion;
 +    private final boolean isForThrift;
 +
 +    protected static abstract class SelectionDeserializer
 +    {
 +        public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
 +    }
 +
 +    protected enum Kind
 +    {
 +        SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
 +        PARTITION_RANGE  (PartitionRangeReadCommand.selectionDeserializer);
 +
 +        private final SelectionDeserializer selectionDeserializer;
 +
 +        Kind(SelectionDeserializer selectionDeserializer)
          {
 -            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +            this.selectionDeserializer = selectionDeserializer;
          }
      }
  
@@@ -263,699 -95,55 +264,699 @@@
          return this;
      }
  
 -    public String getColumnFamilyName()
 +    /**
 +     * Sets the digest version, for when digest for that command is requested.
 +     * <p>
 +     * Note that we allow setting this independently of setting the command as a digest query as
 +     * this allows us to use the command as a carrier of the digest version even if we only call
 +     * setIsDigestQuery on some copy of it.
 +     *
 +     * @param digestVersion the version for the digest is this command is used for digest query..
 +     * @return this read command.
 +     */
 +    public ReadCommand setDigestVersion(int digestVersion)
      {
 -        return cfName;
 +        this.digestVersion = digestVersion;
 +        return this;
      }
  
 +    /**
 +     * Whether this query is for thrift or not.
 +     *
 +     * @return whether this query is for thrift.
 +     */
 +    public boolean isForThrift()
 +    {
 +        return isForThrift;
 +    }
 +
 +    /**
 +     * The clustering index filter this command to use for the provided key.
 +     * <p>
 +     * Note that that method should only be called on a key actually queried by this command
 +     * and in practice, this will almost always return the same filter, but for the sake of
 +     * paging, the filter on the first key of a range command might be slightly different.
 +     *
 +     * @param key a partition key queried by this command.
 +     *
 +     * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
 +     */
 +    public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
 +
 +    /**
 +     * Returns a copy of this command.
 +     *
 +     * @return a copy of this command.
 +     */
      public abstract ReadCommand copy();
  
 -    public abstract Row getRow(Keyspace keyspace);
 +    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
 +
 +    protected abstract int oldestUnrepairedTombstone();
  
 -    public abstract IDiskAtomFilter filter();
 +    public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
 +    {
 +        return isDigestQuery()
 +             ? ReadResponse.createDigestResponse(iterator, this)
 +             : ReadResponse.createDataResponse(iterator, this);
 +    }
  
 -    public String getKeyspace()
 +    public long indexSerializedSize(int version)
      {
 -        return ksName;
 +        if (index.isPresent())
 +            return IndexMetadata.serializer.serializedSize(index.get(), version);
 +        else
 +            return 0;
      }
  
 -    // maybeGenerateRetryCommand is used to generate a retry for short reads
 -    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
 +    public Index getIndex(ColumnFamilyStore cfs)
      {
 -        return null;
 +        // if we've already consulted the index manager, and it returned a valid index
 +        // the result should be cached here.
 +        if(index.isPresent())
 +            return cfs.indexManager.getIndex(index.get());
 +
 +        // if no cached index is present, but we've already consulted the index manager
 +        // then no registered index is suitable for this command, so just return null.
 +        if (indexManagerQueried)
 +            return null;
 +
 +        // do the lookup, set the flag to indicate so and cache the result if not null
 +        Index selected = cfs.indexManager.getBestIndexFor(this);
 +        indexManagerQueried = true;
 +
 +        if (selected == null)
 +            return null;
 +
 +        index = Optional.of(selected.getIndexMetadata());
 +        return selected;
      }
  
 -    // maybeTrim removes columns from a response that is too long
 -    public Row maybeTrim(Row row)
 +    /**
 +     * If the index manager for the CFS determines that there's an applicable
 +     * 2i that can be used to execute this command, call its (optional)
 +     * validation method to check that nothing in this command's parameters
 +     * violates the implementation specific validation rules.
 +     */
 +    public void maybeValidateIndex()
      {
 -        return row;
 +        Index index = getIndex(Keyspace.openAndGetStore(metadata));
 +        if (null != index)
 +            index.validate(this);
      }
  
 -    public long getTimeout()
 +    /**
 +     * Executes this command on the local host.
 +     *
 +     * @param orderGroup the operation group spanning this command
 +     *
 +     * @return an iterator over the result of executing this command locally.
 +     */
 +    @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
 +                                  // iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
 +    public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
      {
 -        return DatabaseDescriptor.getReadRpcTimeout();
 +        long startTimeNanos = System.nanoTime();
 +
 +        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
 +        Index index = getIndex(cfs);
 +
 +        Index.Searcher searcher = null;
 +        if (index != null)
 +        {
 +            if (!cfs.indexManager.isIndexQueryable(index))
 +                throw new IndexNotAvailableException(index);
 +
 +            searcher = index.searcherFor(this);
 +            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
 +        }
 +
 +        UnfilteredPartitionIterator resultIterator = searcher == null
 +                                         ? queryStorage(cfs, orderGroup)
 +                                         : searcher.search(orderGroup);
 +
 +        try
 +        {
 +            resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
 +
 +            // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
 +            // no point in checking it again.
 +            RowFilter updatedFilter = searcher == null
 +                                    ? rowFilter()
 +                                    : index.getPostIndexQueryFilter(rowFilter());
 +
 +            // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
 +            // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
 +            // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
 +            // processing we do on it).
 +            return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            resultIterator.close();
 +            throw e;
 +        }
      }
 -}
  
 -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 -{
 -    public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +    protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 +
 +    public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
 +    {
 +        return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
 +    }
 +
 +    public ReadOrderGroup startOrderGroup()
 +    {
 +        return ReadOrderGroup.forCommand(this);
 +    }
 +
 +    /**
 +     * Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
 +     * This also log warning/trow TombstoneOverwhelmingException if appropriate.
 +     */
 +    private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
 +    {
 +        class MetricRecording extends Transformation<UnfilteredRowIterator>
 +        {
 +            private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
 +            private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
 +
 +            private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
 +
 +            private int liveRows = 0;
 +            private int tombstones = 0;
 +
 +            private DecoratedKey currentKey;
 +
 +            @Override
 +            public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
 +            {
 +                currentKey = iter.partitionKey();
 +                return Transformation.apply(iter, this);
 +            }
 +
 +            @Override
 +            public Row applyToStatic(Row row)
 +            {
 +                return applyToRow(row);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                if (row.hasLiveData(ReadCommand.this.nowInSec()))
 +                    ++liveRows;
 +
 +                for (Cell cell : row.cells())
 +                {
 +                    if (!cell.isLive(ReadCommand.this.nowInSec()))
 +                        countTombstone(row.clustering());
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +            {
 +                countTombstone(marker.clustering());
 +                return marker;
 +            }
 +
 +            private void countTombstone(ClusteringPrefix clustering)
 +            {
 +                ++tombstones;
 +                if (tombstones > failureThreshold && respectTombstoneThresholds)
 +                {
 +                    String query = ReadCommand.this.toCQLString();
 +                    Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
 +                    throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
 +                }
 +            }
 +
 +            @Override
 +            public void onClose()
 +            {
 +                recordLatency(metric, System.nanoTime() - startTimeNanos);
 +
 +                metric.tombstoneScannedHistogram.update(tombstones);
 +                metric.liveScannedHistogram.update(liveRows);
 +
 +                boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
 +                if (warnTombstones)
 +                {
 +                    String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
 +                    ClientWarn.instance.warn(msg);
 +                    logger.warn(msg);
 +                }
 +
 +                Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
 +            }
 +        };
 +
 +        return Transformation.apply(iter, new MetricRecording());
 +    }
 +
 +    /**
 +     * Creates a message for this command.
 +     */
 +    public abstract MessageOut<ReadCommand> createMessage(int version);
 +
 +    protected abstract void appendCQLWhereClause(StringBuilder sb);
 +
 +    // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
 +    // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
 +    // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
 +    protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
 +    {
 +        final boolean isForThrift = iterator.isForThrift();
 +        class WithoutPurgeableTombstones extends PurgeFunction
 +        {
 +            public WithoutPurgeableTombstones()
 +            {
 +                super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
 +            }
 +
-             protected long getMaxPurgeableTimestamp()
++            protected Predicate<Long> getPurgeEvaluator()
 +            {
-                 return Long.MAX_VALUE;
++                return time -> true;
 +            }
 +        }
 +        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
 +    }
 +
 +    /**
 +     * Recreate the CQL string corresponding to this query.
 +     * <p>
 +     * Note that in general the returned string will not be exactly the original user string, first
 +     * because there isn't always a single syntax for a given query,  but also because we don't have
 +     * all the information needed (we know the non-PK columns queried but not the PK ones as internally
 +     * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
 +     * debugging purpose which is what this is for.
 +     */
 +    public String toCQLString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("SELECT ").append(columnFilter());
 +        sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
 +        appendCQLWhereClause(sb);
 +
 +        if (limits() != DataLimits.NONE)
 +            sb.append(' ').append(limits());
 +        return sb.toString();
 +    }
 +
 +    private static class Serializer implements IVersionedSerializer<ReadCommand>
 +    {
 +        private static int digestFlag(boolean isDigest)
 +        {
 +            return isDigest ? 0x01 : 0;
 +        }
 +
 +        private static boolean isDigest(int flags)
 +        {
 +            return (flags & 0x01) != 0;
 +        }
 +
 +        private static int thriftFlag(boolean isForThrift)
 +        {
 +            return isForThrift ? 0x02 : 0;
 +        }
 +
 +        private static boolean isForThrift(int flags)
 +        {
 +            return (flags & 0x02) != 0;
 +        }
 +
 +        private static int indexFlag(boolean hasIndex)
 +        {
 +            return hasIndex ? 0x04 : 0;
 +        }
 +
 +        private static boolean hasIndex(int flags)
 +        {
 +            return (flags & 0x04) != 0;
 +        }
 +
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            assert version >= MessagingService.VERSION_30;
 +
 +            out.writeByte(command.kind.ordinal());
 +            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
 +            if (command.isDigestQuery())
 +                out.writeUnsignedVInt(command.digestVersion());
 +            CFMetaData.serializer.serialize(command.metadata(), out, version);
 +            out.writeInt(command.nowInSec());
 +            ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
 +            RowFilter.serializer.serialize(command.rowFilter(), out, version);
 +            DataLimits.serializer.serialize(command.limits(), out, version);
 +            if (command.index.isPresent())
 +                IndexMetadata.serializer.serialize(command.index.get(), out, version);
 +
 +            command.serializeSelection(out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            assert version >= MessagingService.VERSION_30;
 +
 +            Kind kind = Kind.values()[in.readByte()];
 +            int flags = in.readByte();
 +            boolean isDigest = isDigest(flags);
 +            boolean isForThrift = isForThrift(flags);
 +            boolean hasIndex = hasIndex(flags);
 +            int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
 +            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
 +            int nowInSec = in.readInt();
 +            ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
 +            RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
 +            DataLimits limits = DataLimits.serializer.deserialize(in, version);
 +            Optional<IndexMetadata> index = hasIndex
 +                                            ? deserializeIndexMetadata(in, version, metadata)
 +                                            : Optional.empty();
 +
 +            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
 +        }
 +
 +        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
 +        {
 +            try
 +            {
 +                return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
 +            }
 +            catch (UnknownIndexException e)
 +            {
 +                String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
 +                                               "If an index was just created, this is likely due to the schema not " +
 +                                               "being fully propagated. Local read will proceed without using the " +
 +                                               "index. Please wait for schema agreement after index creation.",
 +                                               cfm.ksName, cfm.cfName, e.indexId.toString());
 +                logger.info(message);
 +                return Optional.empty();
 +            }
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            assert version >= MessagingService.VERSION_30;
 +
 +            return 2 // kind + flags
 +                 + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
 +                 + CFMetaData.serializer.serializedSize(command.metadata(), version)
 +                 + TypeSizes.sizeof(command.nowInSec())
 +                 + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
 +                 + RowFilter.serializer.serializedSize(command.rowFilter(), version)
 +                 + DataLimits.serializer.serializedSize(command.limits(), version)
 +                 + command.selectionSerializedSize(version)
 +                 + command.indexSerializedSize(version);
 +        }
 +    }
 +
 +    private enum LegacyType
 +    {
 +        GET_BY_NAMES((byte)1),
 +        GET_SLICES((byte)2);
 +
 +        public final byte serializedValue;
 +
 +        LegacyType(byte b)
 +        {
 +            this.serializedValue = b;
 +        }
 +
 +        public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
 +        {
 +            return kind == ClusteringIndexFilter.Kind.SLICE
 +                   ? GET_SLICES
 +                   : GET_BY_NAMES;
 +        }
 +
 +        public static LegacyType fromSerializedValue(byte b)
 +        {
 +            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +        }
 +    }
 +
 +    /**
 +     * Serializer for pre-3.0 RangeSliceCommands.
 +     */
 +    private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
      {
 -        out.writeByte(command.commandType.serializedValue);
 -        switch (command.commandType)
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            assert !rangeCommand.dataRange().isPaging();
 +
 +            // convert pre-3.0 incompatible names filters to slice filters
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            out.writeUTF(metadata.ksName);
 +            out.writeUTF(metadata.cfName);
 +            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from seconds to millis
 +
 +            // begin DiskAtomFilterSerializer.serialize()
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                out.writeByte(1);  // 0 for slices, 1 for names
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
 +            }
 +            else
 +            {
 +                out.writeByte(0);  // 0 for slices, 1 for names
 +
 +                // slice filter serialization
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
 +
 +                out.writeBoolean(filter.isReversed());
 +
 +                // limit
 +                DataLimits limits = rangeCommand.limits();
 +                if (limits.isDistinct())
 +                    out.writeInt(1);
 +                else
 +                    out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
 +
 +                int compositesToGroup;
 +                boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
 +                    compositesToGroup = -1;
 +                else if (limits.isDistinct() && !selectsStatics)
 +                    compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
 +                else
 +                    compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
 +
 +                out.writeInt(compositesToGroup);
 +            }
 +
 +            serializeRowFilter(out, rangeCommand.rowFilter());
 +            AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
 +
 +            // maxResults
 +            out.writeInt(rangeCommand.limits().count());
 +
 +            // countCQL3Rows
 +            if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1)  // if for Thrift or DISTINCT
 +                out.writeBoolean(false);
 +            else
 +                out.writeBoolean(true);
 +
 +            // isPaging
 +            out.writeBoolean(false);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            String keyspace = in.readUTF();
 +            String columnFamily = in.readUTF();
 +
 +            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 +            if (metadata == null)
 +            {
 +                String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
 +                throw new UnknownColumnFamilyException(message, null);
 +            }
 +
 +            int nowInSec = (int) (in.readLong() / 1000);  // convert from millis to seconds
 +
 +            ClusteringIndexFilter filter;
 +            ColumnFilter selection;
 +            int compositesToGroup = 0;
 +            int perPartitionLimit = -1;
 +            byte readType = in.readByte();  // 0 for slices, 1 for names
 +            if (readType == 1)
 +            {
 +                Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
 +                selection = selectionAndFilter.left;
 +                filter = selectionAndFilter.right;
 +            }
 +            else
 +            {
 +                Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
 +                filter = p.left;
 +                perPartitionLimit = in.readInt();
 +                compositesToGroup = in.readInt();
 +                selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
 +            }
 +
 +            RowFilter rowFilter = deserializeRowFilter(in, metadata);
 +
 +            AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
 +            int maxResults = in.readInt();
 +
 +            boolean countCQL3Rows = in.readBoolean();  // countCQL3Rows (not needed)
 +            in.readBoolean();  // isPaging (not needed)
 +
 +            boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
 +            // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
 +            // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
 +            // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
 +            // that fact.
 +            boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
 +            DataLimits limits;
 +            if (isDistinct)
 +                limits = DataLimits.distinctLimits(maxResults);
 +            else if (compositesToGroup == -1)
 +                limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
 +            else
 +                limits = DataLimits.cqlLimits(maxResults);
 +
 +            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
 +        }
 +
 +        static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
 +        {
 +            ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
 +            out.writeInt(indexExpressions.size());
 +            for (RowFilter.Expression expression : indexExpressions)
 +            {
 +                ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
 +                expression.operator().writeTo(out);
 +                ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
 +            }
 +        }
 +
 +        static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
 +        {
 +            int numRowFilters = in.readInt();
 +            if (numRowFilters == 0)
 +                return RowFilter.NONE;
 +
 +            RowFilter rowFilter = RowFilter.create(numRowFilters);
 +            for (int i = 0; i < numRowFilters; i++)
 +            {
 +                ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
 +                ColumnDefinition column = metadata.getColumnDefinition(columnName);
 +                Operator op = Operator.readFrom(in);
 +                ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
 +                rowFilter.add(column, op, indexValue);
 +            }
 +            return rowFilter;
 +        }
 +
 +        static long serializedRowFilterSize(RowFilter rowFilter)
 +        {
 +            long size = TypeSizes.sizeof(0);  // rowFilterCount
 +            for (RowFilter.Expression expression : rowFilter)
 +            {
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                size += TypeSizes.sizeof(0);  // operator int value
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +            }
 +            return size;
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            assert version < MessagingService.VERSION_30;
 +            assert command.kind == Kind.PARTITION_RANGE;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            long size = TypeSizes.sizeof(metadata.ksName);
 +            size += TypeSizes.sizeof(metadata.cfName);
 +            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
 +
 +            size += 1;  // single byte flag: 0 for slices, 1 for names
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
 +            }
 +            else
 +            {
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
 +                size += TypeSizes.sizeof(filter.isReversed());
 +                size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
 +                size += TypeSizes.sizeof(0); // compositesToGroup
 +            }
 +
 +            if (rangeCommand.rowFilter().equals(RowFilter.NONE))
 +            {
 +                size += TypeSizes.sizeof(0);
 +            }
 +            else
 +            {
 +                ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
 +                size += TypeSizes.sizeof(indexExpressions.size());
 +                for (RowFilter.Expression expression : indexExpressions)
 +                {
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                    size += TypeSizes.sizeof(expression.operator().ordinal());
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +                }
 +            }
 +
 +            size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
 +            size += TypeSizes.sizeof(rangeCommand.limits().count());
 +            size += TypeSizes.sizeof(!rangeCommand.isForThrift());
 +            return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
 +        }
 +
 +        static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
          {
 -            case GET_BY_NAMES:
 -                SliceByNamesReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            case GET_SLICES:
 -                SliceFromReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            default:
 -                throw new AssertionError();
 +            if (!command.dataRange().isNamesQuery())
 +                return command;
 +
 +            CFMetaData metadata = command.metadata();
 +            if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
 +                return command;
 +
 +            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
 +            ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
 +            DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
 +            return new PartitionRangeReadCommand(
 +                    command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
 +                    command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
 +        }
 +
 +        static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
 +        {
 +            // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
 +            // In that case, we'll basically be querying the first row of the partition, but we must make sure we include
 +            // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise.
 +            if (compositesToGroup == -2)
 +                return ColumnFilter.all(metadata);
 +
 +            // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
 +            PartitionColumns columns = selectsStatics
 +                                     ? metadata.partitionColumns()
 +                                     : metadata.partitionColumns().withoutStatics();
 +            return ColumnFilter.selectionBuilder().addAll(columns).build();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index fbf29e3,e895573..34d093e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -18,12 -18,10 +18,12 @@@
  package org.apache.cassandra.db.compaction;
  
  import java.util.*;
++import java.util.function.Predicate;
  
 -import com.google.common.base.Predicate;
 -import com.google.common.base.Predicates;
 +import org.apache.cassandra.db.Memtable;
- import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import com.google.common.collect.Iterables;
  
 +import org.apache.cassandra.db.partitions.Partition;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -194,36 -189,59 +194,52 @@@ public class CompactionController imple
      }
  
      /**
-      * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
-      * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
-      * in other sstables.  This returns the minimum timestamp for any SSTable that contains this partition and is not
-      * participating in this compaction, or memtable that contains this partition,
-      * or LONG.MAX_VALUE if no SSTable or memtable exist.
+      * @param key
+      * @return a predicate for whether tombstones marked for deletion at the given time for the given partition are
+      * purgeable; we calculate this by checking whether the deletion time is less than the min timestamp of all SSTables
+      * containing his partition and not participating in the compaction. This means there isn't any data in those
+      * sstables that might still need to be suppressed by a tombstone at this timestamp.
       */
-     public long maxPurgeableTimestamp(DecoratedKey key)
+     public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
      {
 -        if (NEVER_PURGE_TOMBSTONES)
 -            return Predicates.alwaysFalse();
 +        if (!compactingRepaired() || NEVER_PURGE_TOMBSTONES)
-             return Long.MIN_VALUE;
++            return time -> false;
  
-         long min = Long.MAX_VALUE;
          overlapIterator.update(key);
-         for (SSTableReader sstable : overlapIterator.overlaps())
+         Set<SSTableReader> filteredSSTables = overlapIterator.overlaps();
+         Iterable<Memtable> memtables = cfs.getTracker().getView().getAllMemtables();
+         long minTimestampSeen = Long.MAX_VALUE;
+         boolean hasTimestamp = false;
+ 
 -        for (SSTableReader sstable: filteredSSTables)
++        for (SSTableReader sstable : filteredSSTables)
          {
              // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
              // we check index file instead.
-             if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
-                 min = Math.min(min, sstable.getMinTimestamp());
-             else if (sstable.getBloomFilter().isPresent(key))
-                 min = Math.min(min, sstable.getMinTimestamp());
+             if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null
+                 || sstable.getBloomFilter().isPresent(key))
+             {
+                 minTimestampSeen = Math.min(minTimestampSeen, sstable.getMinTimestamp());
+                 hasTimestamp = true;
+             }
 -
          }
  
-         for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+         for (Memtable memtable : memtables)
          {
 -            ColumnFamily cf = memtable.getColumnFamily(key);
 -            if (cf != null)
 +            Partition partition = memtable.getPartition(key);
 +            if (partition != null)
-                 min = Math.min(min, partition.stats().minTimestamp);
+             {
 -                minTimestampSeen = Math.min(minTimestampSeen, memtable.getMinTimestamp());
++                minTimestampSeen = Math.min(minTimestampSeen, partition.stats().minTimestamp);
+                 hasTimestamp = true;
+             }
+         }
+ 
+         if (!hasTimestamp)
 -            return Predicates.alwaysTrue();
++            return time -> true;
+         else
+         {
+             final long finalTimestamp = minTimestampSeen;
 -            return new Predicate<Long>()
 -            {
 -                public boolean apply(Long time)
 -                {
 -                    return time < finalTimestamp;
 -                }
 -            };
++            return time -> time < finalTimestamp;
          }
-         return min;
      }
  
      public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index f8f620c,0000000..9f0984f
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -1,309 -1,0 +1,309 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.compaction;
 +
 +import java.util.List;
 +import java.util.UUID;
++import java.util.function.Predicate;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.partitions.PurgeFunction;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.index.transactions.CompactionTransaction;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.metrics.CompactionMetrics;
 +
 +/**
 + * Merge multiple iterators over the content of sstable into a "compacted" iterator.
 + * <p>
 + * On top of the actual merging the source iterators, this class:
 + * <ul>
 + *   <li>purge gc-able tombstones if possible (see PurgeIterator below).</li>
 + *   <li>update 2ndary indexes if necessary (as we don't read-before-write on index updates, index entries are
 + *       not deleted on deletion of the base table data, which is ok because we'll fix index inconsistency
 + *       on reads. This however mean that potentially obsolete index entries could be kept a long time for
 + *       data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly
 + *       an optimization).</li>
 + *   <li>invalidate cached partitions that are empty post-compaction. This avoids keeping partitions with
 + *       only purgable tombstones in the row cache.</li>
 + *   <li>keep tracks of the compaction progress.</li>
 + * </ul>
 + */
 +public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
 +    private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
 +
 +    private final OperationType type;
 +    private final CompactionController controller;
 +    private final List<ISSTableScanner> scanners;
 +    private final int nowInSec;
 +    private final UUID compactionId;
 +
 +    private final long totalBytes;
 +    private long bytesRead;
 +
 +    /*
 +     * counters for merged rows.
 +     * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row),
 +     * index 1 is counter for 2 rows merged, and so on.
 +     */
 +    private final long[] mergeCounters;
 +
 +    private final UnfilteredPartitionIterator compacted;
 +    private final CompactionMetrics metrics;
 +
 +    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
 +    {
 +        this(type, scanners, controller, nowInSec, compactionId, null);
 +    }
 +
 +    @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
 +    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics)
 +    {
 +        this.controller = controller;
 +        this.type = type;
 +        this.scanners = scanners;
 +        this.nowInSec = nowInSec;
 +        this.compactionId = compactionId;
 +        this.bytesRead = 0;
 +
 +        long bytes = 0;
 +        for (ISSTableScanner scanner : scanners)
 +            bytes += scanner.getLengthInBytes();
 +        this.totalBytes = bytes;
 +        this.mergeCounters = new long[scanners.size()];
 +        this.metrics = metrics;
 +
 +        if (metrics != null)
 +            metrics.beginCompaction(this);
 +
 +        UnfilteredPartitionIterator merged = scanners.isEmpty()
 +                                             ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
 +                                             : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
 +        boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
 +        this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
 +    }
 +
 +    public boolean isForThrift()
 +    {
 +        return false;
 +    }
 +
 +    public CFMetaData metadata()
 +    {
 +        return controller.cfs.metadata;
 +    }
 +
 +    public CompactionInfo getCompactionInfo()
 +    {
 +        return new CompactionInfo(controller.cfs.metadata,
 +                                  type,
 +                                  bytesRead,
 +                                  totalBytes,
 +                                  compactionId);
 +    }
 +
 +    private void updateCounterFor(int rows)
 +    {
 +        assert rows > 0 && rows - 1 < mergeCounters.length;
 +        mergeCounters[rows - 1] += 1;
 +    }
 +
 +    public long[] getMergedRowCounts()
 +    {
 +        return mergeCounters;
 +    }
 +
 +    private UnfilteredPartitionIterators.MergeListener listener()
 +    {
 +        return new UnfilteredPartitionIterators.MergeListener()
 +        {
 +            public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +            {
 +                int merged = 0;
 +                for (UnfilteredRowIterator iter : versions)
 +                {
 +                    if (iter != null)
 +                        merged++;
 +                }
 +
 +                assert merged > 0;
 +
 +                CompactionIterator.this.updateCounterFor(merged);
 +
 +                if (type != OperationType.COMPACTION || !controller.cfs.indexManager.hasIndexes())
 +                    return null;
 +
 +                Columns statics = Columns.NONE;
 +                Columns regulars = Columns.NONE;
 +                for (UnfilteredRowIterator iter : versions)
 +                {
 +                    if (iter != null)
 +                    {
 +                        statics = statics.mergeTo(iter.columns().statics);
 +                        regulars = regulars.mergeTo(iter.columns().regulars);
 +                    }
 +                }
 +                final PartitionColumns partitionColumns = new PartitionColumns(statics, regulars);
 +
 +                // If we have a 2ndary index, we must update it with deleted/shadowed cells.
 +                // we can reuse a single CleanupTransaction for the duration of a partition.
 +                // Currently, it doesn't do any batching of row updates, so every merge event
 +                // for a single partition results in a fresh cycle of:
 +                // * Get new Indexer instances
 +                // * Indexer::start
 +                // * Indexer::onRowMerge (for every row being merged by the compaction)
 +                // * Indexer::commit
 +                // A new OpOrder.Group is opened in an ARM block wrapping the commits
 +                // TODO: this should probably be done asynchronously and batched.
 +                final CompactionTransaction indexTransaction =
 +                    controller.cfs.indexManager.newCompactionTransaction(partitionKey,
 +                                                                         partitionColumns,
 +                                                                         versions.size(),
 +                                                                         nowInSec);
 +
 +                return new UnfilteredRowIterators.MergeListener()
 +                {
 +                    public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
 +                    {
 +                    }
 +
 +                    public void onMergedRows(Row merged, Row[] versions)
 +                    {
 +                        indexTransaction.start();
 +                        indexTransaction.onRowMerge(merged, versions);
 +                        indexTransaction.commit();
 +                    }
 +
 +                    public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
 +                    {
 +                    }
 +
 +                    public void close()
 +                    {
 +                    }
 +                };
 +            }
 +
 +            public void close()
 +            {
 +            }
 +        };
 +    }
 +
 +    private void updateBytesRead()
 +    {
 +        long n = 0;
 +        for (ISSTableScanner scanner : scanners)
 +            n += scanner.getCurrentPosition();
 +        bytesRead = n;
 +    }
 +
 +    public boolean hasNext()
 +    {
 +        return compacted.hasNext();
 +    }
 +
 +    public UnfilteredRowIterator next()
 +    {
 +        return compacted.next();
 +    }
 +
 +    public void remove()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void close()
 +    {
 +        try
 +        {
 +            compacted.close();
 +        }
 +        finally
 +        {
 +            if (metrics != null)
 +                metrics.finishCompaction(this);
 +        }
 +    }
 +
 +    public String toString()
 +    {
 +        return this.getCompactionInfo().toString();
 +    }
 +
 +    private class Purger extends PurgeFunction
 +    {
 +        private final CompactionController controller;
 +
 +        private DecoratedKey currentKey;
-         private long maxPurgeableTimestamp;
-         private boolean hasCalculatedMaxPurgeableTimestamp;
++        private Predicate<Long> purgeEvaluator;
 +
 +        private long compactedUnfiltered;
 +
 +        private Purger(boolean isForThrift, CompactionController controller, int nowInSec)
 +        {
 +            super(isForThrift, nowInSec, controller.gcBefore, controller.compactingRepaired() ? Integer.MAX_VALUE : Integer.MIN_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
 +            this.controller = controller;
 +        }
 +
 +        @Override
 +        protected void onEmptyPartitionPostPurge(DecoratedKey key)
 +        {
 +            if (type == OperationType.COMPACTION)
 +                controller.cfs.invalidateCachedPartition(key);
 +        }
 +
 +        @Override
 +        protected void onNewPartition(DecoratedKey key)
 +        {
 +            currentKey = key;
-             hasCalculatedMaxPurgeableTimestamp = false;
++            purgeEvaluator = null;
 +        }
 +
 +        @Override
 +        protected void updateProgress()
 +        {
 +            if ((++compactedUnfiltered) % UNFILTERED_TO_UPDATE_PROGRESS == 0)
 +                updateBytesRead();
 +        }
 +
 +        /*
-          * Tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable
-          * containing `currentKey` outside of the set of sstables involved in this compaction. This is computed lazily
-          * on demand as we only need this if there is tombstones and this a bit expensive (see #8914).
++         * Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum
++         * timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction.
++         * This is computed lazily on demand as we only need this if there is tombstones and this a bit expensive
++         * (see #8914).
 +         */
-         protected long getMaxPurgeableTimestamp()
++        protected Predicate<Long> getPurgeEvaluator()
 +        {
-             if (!hasCalculatedMaxPurgeableTimestamp)
++            if (purgeEvaluator == null)
 +            {
-                 hasCalculatedMaxPurgeableTimestamp = true;
-                 maxPurgeableTimestamp = controller.maxPurgeableTimestamp(currentKey);
++                purgeEvaluator = controller.getPurgeEvaluator(currentKey);
 +            }
-             return maxPurgeableTimestamp;
++            return purgeEvaluator;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 478b896,8a3c11e..a77cefb
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -22,6 -22,6 +22,7 @@@ import java.io.IOException
  import java.lang.management.ManagementFactory;
  import java.util.*;
  import java.util.concurrent.*;
++import java.util.function.Predicate;
  import javax.management.MBeanServer;
  import javax.management.ObjectName;
  import javax.management.openmbean.OpenDataException;
@@@ -1431,7 -1434,7 +1432,7 @@@ public class CompactionManager implemen
               * a tombstone that could shadow a column in another sstable, but this is doubly not a concern
               * since validation compaction is read-only.
               */
-             return Long.MAX_VALUE;
 -            return Predicates.alwaysTrue();
++            return time -> true;
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 3655a37,6b302d2..fce8c2e
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@@ -18,6 -18,9 +18,7 @@@
  package org.apache.cassandra.db.compaction;
  
  import java.util.*;
 -
 -import com.google.common.base.Predicate;
 -import com.google.common.base.Predicates;
++import java.util.function.Predicate;
  
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
@@@ -97,9 -97,9 +98,9 @@@ public class SSTableSplitter 
          }
  
          @Override
-         public long maxPurgeableTimestamp(DecoratedKey key)
+         public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
          {
-             return Long.MIN_VALUE;
 -            return Predicates.alwaysFalse();
++            return time -> false;
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 822bb85,d6ef60e..77831a7
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@@ -19,10 -19,12 +19,11 @@@ package org.apache.cassandra.db.compact
  
  import java.io.File;
  import java.util.*;
++import java.util.function.Predicate;
  
 -import com.google.common.base.Predicate;
 -import com.google.common.base.Predicates;
  import com.google.common.base.Throwables;
 +import com.google.common.collect.Sets;
  
 -import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@@ -111,9 -120,9 +112,9 @@@ public class Upgrade
          }
  
          @Override
-         public long maxPurgeableTimestamp(DecoratedKey key)
+         public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
          {
-             return Long.MIN_VALUE;
 -            return Predicates.alwaysFalse();
++            return time -> false;
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Verifier.java
index ce04ad3,42302fe..88bc3a7
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@@ -42,6 -43,6 +42,7 @@@ import java.io.IOError
  import java.io.IOException;
  import java.nio.ByteBuffer;
  import java.util.*;
++import java.util.function.Predicate;
  
  public class Verifier implements Closeable
  {
@@@ -281,9 -278,9 +282,9 @@@
          }
  
          @Override
-         public long maxPurgeableTimestamp(DecoratedKey key)
+         public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
          {
-             return Long.MIN_VALUE;
 -            return Predicates.alwaysFalse();
++            return time -> false;
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index d3255d3,0000000..6679bdf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@@ -1,125 -1,0 +1,127 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.partitions;
 +
++import java.util.function.Predicate;
++
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
 +
 +public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator>
 +{
 +    private final boolean isForThrift;
 +    private final DeletionPurger purger;
 +    private final int nowInSec;
 +    private boolean isReverseOrder;
 +
 +    public PurgeFunction(boolean isForThrift, int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
 +    {
 +        this.isForThrift = isForThrift;
 +        this.nowInSec = nowInSec;
 +        this.purger = (timestamp, localDeletionTime) ->
 +                      !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
 +                      && localDeletionTime < gcBefore
-                       && timestamp < getMaxPurgeableTimestamp();
++                      && getPurgeEvaluator().test(timestamp);
 +    }
 +
-     protected abstract long getMaxPurgeableTimestamp();
++    protected abstract Predicate<Long> getPurgeEvaluator();
 +
 +    // Called at the beginning of each new partition
 +    protected void onNewPartition(DecoratedKey partitionKey)
 +    {
 +    }
 +
 +    // Called for each partition that had only purged infos and are empty post-purge.
 +    protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey)
 +    {
 +    }
 +
 +    // Called for every unfiltered. Meant for CompactionIterator to update progress
 +    protected void updateProgress()
 +    {
 +    }
 +
 +    @Override
 +    protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
 +    {
 +        onNewPartition(partition.partitionKey());
 +
 +        isReverseOrder = partition.isReverseOrder();
 +        UnfilteredRowIterator purged = Transformation.apply(partition, this);
 +        if (!isForThrift && purged.isEmpty())
 +        {
 +            onEmptyPartitionPostPurge(purged.partitionKey());
 +            purged.close();
 +            return null;
 +        }
 +
 +        return purged;
 +    }
 +
 +    @Override
 +    protected DeletionTime applyToDeletion(DeletionTime deletionTime)
 +    {
 +        return purger.shouldPurge(deletionTime) ? DeletionTime.LIVE : deletionTime;
 +    }
 +
 +    @Override
 +    protected Row applyToStatic(Row row)
 +    {
 +        updateProgress();
 +        return row.purge(purger, nowInSec);
 +    }
 +
 +    @Override
 +    protected Row applyToRow(Row row)
 +    {
 +        updateProgress();
 +        return row.purge(purger, nowInSec);
 +    }
 +
 +    @Override
 +    protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +    {
 +        updateProgress();
 +        boolean reversed = isReverseOrder;
 +        if (marker.isBoundary())
 +        {
 +            // We can only skip the whole marker if both deletion time are purgeable.
 +            // If only one of them is, filterTombstoneMarker will deal with it.
 +            RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
 +            boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed));
 +            boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed));
 +
 +            if (shouldPurgeClose)
 +            {
 +                if (shouldPurgeOpen)
 +                    return null;
 +
 +                return boundary.createCorrespondingOpenMarker(reversed);
 +            }
 +
 +            return shouldPurgeOpen
 +                   ? boundary.createCorrespondingCloseMarker(reversed)
 +                   : marker;
 +        }
 +        else
 +        {
 +            return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index e781716,3184159..1b400e8
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -20,7 -20,8 +20,8 @@@ package org.apache.cassandra.db.compact
  
  import java.nio.ByteBuffer;
  import java.util.Set;
++import java.util.function.Predicate;
  
 -import com.google.common.base.Predicate;
  import com.google.common.collect.Sets;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -41,7 -40,10 +42,9 @@@ import org.apache.cassandra.schema.Keys
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  
 -import static org.apache.cassandra.Util.cellname;
  import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
  import static org.junit.Assert.assertNotNull;
  
  public class CompactionControllerTest extends SchemaLoader
@@@ -87,26 -83,26 +90,26 @@@
          // check max purgeable timestamp without any sstables
          try(CompactionController controller = new CompactionController(cfs, null, 0))
          {
-             assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+             assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp1); //memtable only
  
              cfs.forceBlockingFlush();
-             assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
 -            assertTrue(controller.getPurgeEvaluator(key).apply(Long.MAX_VALUE)); //no memtables and no sstables
++            assertTrue(controller.getPurgeEvaluator(key).test(Long.MAX_VALUE)); //no memtables and no sstables
          }
  
 -        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
 +        Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting
  
          // create another sstable
 -        applyMutation(CF1, rowKey, timestamp2);
 +        applyMutation(cfs.metadata, key, timestamp2);
          cfs.forceBlockingFlush();
  
          // check max purgeable timestamp when compacting the first sstable with and without a memtable
          try (CompactionController controller = new CompactionController(cfs, compacting, 0))
          {
-             assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+             assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp2);
  
 -            applyMutation(CF1, rowKey, timestamp3);
 +            applyMutation(cfs.metadata, key, timestamp3);
  
-             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+             assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3); //second sstable and second memtable
          }
  
          // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
@@@ -115,11 -111,11 +118,11 @@@
          //newest to oldest
          try (CompactionController controller = new CompactionController(cfs, null, 0))
          {
 -            applyMutation(CF1, rowKey, timestamp1);
 -            applyMutation(CF1, rowKey, timestamp2);
 -            applyMutation(CF1, rowKey, timestamp3);
 +            applyMutation(cfs.metadata, key, timestamp1);
 +            applyMutation(cfs.metadata, key, timestamp2);
 +            applyMutation(cfs.metadata, key, timestamp3);
  
-             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+             assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3); //memtable only
          }
  
          cfs.forceBlockingFlush();
@@@ -127,11 -123,11 +130,11 @@@
          //oldest to newest
          try (CompactionController controller = new CompactionController(cfs, null, 0))
          {
 -            applyMutation(CF1, rowKey, timestamp3);
 -            applyMutation(CF1, rowKey, timestamp2);
 -            applyMutation(CF1, rowKey, timestamp1);
 +            applyMutation(cfs.metadata, key, timestamp3);
 +            applyMutation(cfs.metadata, key, timestamp2);
 +            applyMutation(cfs.metadata, key, timestamp1);
  
-             assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+             assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3);
          }
      }
  
@@@ -176,20 -172,26 +179,26 @@@
          assertEquals(0, expired.size());
      }
  
 -    private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
 +    private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
      {
 -        CellName colName = cellname("birthdate");
          ByteBuffer val = ByteBufferUtil.bytes(1L);
  
 -        Mutation rm = new Mutation(KEYSPACE, rowKey);
 -        rm.add(cf, colName, val, timestamp);
 -        rm.applyUnsafe();
 +        new RowUpdateBuilder(cfm, timestamp, key)
 +        .clustering("ck")
 +        .add("val", val)
 +        .build()
 +        .applyUnsafe();
      }
  
 -    private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
 +    private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
      {
 -        Mutation rm = new Mutation(KEYSPACE, rowKey);
 -        rm.delete(cf, timestamp);
 -        rm.applyUnsafe();
 +        new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds()))
 +        .applyUnsafe();
      }
+ 
+     private void assertPurgeBoundary(Predicate<Long> evaluator, long boundary)
+     {
 -        assertFalse(evaluator.apply(boundary));
 -        assertTrue(evaluator.apply(boundary - 1));
++        assertFalse(evaluator.test(boundary));
++        assertTrue(evaluator.test(boundary - 1));
+     }
  }